Quick Start¶
yads is designed to be a lightweight drop-in for existing workflows and data pipelines. Its loaders and converters handle broad dependency ranges for source and target formats, so it seamlessly slots into your Python jobs.
You can author and interact with the canonical yads specification primarily through yads Python API.
Install yads¶
yads Python API is available on PyPI. Install with pip or uv:
uv add yads
uv add 'yads[pyarrow]'
pip install yads
pip install 'yads[pyarrow]'
Check the converters documentation for install instructions and supported versions of optional depencies.
Author a spec¶
Typical workflows start with a yads spec authored in YAML format or using the Python core API.
# docs/src/specs/submissions_det.yaml
name: "prod.assessments.submissions_det"
version: 1
yads_spec_version: "0.0.2"
description: "Assessments submissions details."
metadata:
owner: "data-team"
sensitive: false
columns:
- name: "submission_id"
type: "bigint"
description: "Submissions unique identifier."
constraints:
primary_key: true
not_null: true
- name: "completion_percent"
type: "decimal"
description: "Assessment completion percentage."
params:
precision: 5
scale: 2
constraints:
default: 0.00
- name: "time_taken_seconds"
type: "integer"
description: "Time taken submitting this assessment in seconds."
- name: "submitted_at"
type: "timestamptz"
description: "Timestamp in UTC when submitted."
params:
tz: "UTC"
Tip
Check the Specification section for a detailed authoring guide of your spec in YAML.
import yads
spec = yads.from_yaml("docs/src/specs/submissions_det.yaml")
print(spec)
spec prod.assessments.submissions_det(version=1)(
description='Assessments submissions details.'
metadata={
owner='data-team',
sensitive=False
}
columns=[
submission_id: integer(bits=64)(
description='Submissions unique identifier.',
constraints=[PrimaryKeyConstraint(), NotNullConstraint()]
)
completion_percent: decimal(precision=5, scale=2)(
description='Assessment completion percentage.',
constraints=[DefaultConstraint(value=0.0)]
)
time_taken_seconds: integer(bits=32)(
description='Time taken submitting this assessment in seconds.'
)
submitted_at: timestamptz(unit=ns, tz=UTC)(
description='Timestamp in UTC when submitted.'
)
]
)
import yads.spec as yspec
import yads.types as ytypes
from yads.constraints import (
PrimaryKeyConstraint,
NotNullConstraint,
DefaultConstraint,
)
spec = yspec.YadsSpec(
name="prod.assessments.submissions",
version=1,
description="Assessments submissions details.",
metadata={"owner": "data-team", "sensitive": False},
columns=[
yspec.Column(
name="submission_id",
type=ytypes.Integer(bits=64),
description="Submissions unique identifier.",
constraints=[PrimaryKeyConstraint(), NotNullConstraint()],
),
yspec.Column(
name="completion_percent",
type=ytypes.Decimal(precision=5, scale=2),
description="Assessment completion percentage.",
constraints=[DefaultConstraint(0.0)],
),
yspec.Column(
name="time_taken_seconds",
type=ytypes.Integer(bits=32),
description="Time taken submitting this assessment in seconds.",
),
yspec.Column(
name="submitted_at",
type=ytypes.TimestampTZ(tz="UTC"),
description="Timestamp in UTC when submitted.",
),
],
)
print(spec)
spec prod.assessments.submissions(version=1)(
description='Assessments submissions details.'
metadata={
owner='data-team',
sensitive=False
}
columns=[
submission_id: integer(bits=64)(
description='Submissions unique identifier.',
constraints=[PrimaryKeyConstraint(), NotNullConstraint()]
)
completion_percent: decimal(precision=5, scale=2)(
description='Assessment completion percentage.',
constraints=[DefaultConstraint(value=0.0)]
)
time_taken_seconds: integer(bits=32)(
description='Time taken submitting this assessment in seconds.'
)
submitted_at: timestamptz(unit=ns, tz=UTC)(
description='Timestamp in UTC when submitted.'
)
]
)
Convert to a target format¶
Turn your spec into typed schemas for your target formats.
submissions_schema = yads.to_pyarrow(spec)
print(submissions_schema)
submission_id: int64 not null
-- field metadata --
description: 'Submissions unique identifier.'
completion_percent: decimal128(5, 2)
-- field metadata --
description: 'Assessment completion percentage.'
time_taken_seconds: int32
-- field metadata --
description: 'Time taken submitting this assessment in seconds.'
submitted_at: timestamp[ns, tz=UTC]
-- field metadata --
description: 'Timestamp in UTC when submitted.'
-- schema metadata --
owner: 'data-team'
sensitive: 'false'
from pprint import pprint
submissions_schema = yads.to_polars(spec)
pprint(submissions_schema, width=120)
Schema([('submission_id', Int64),
('completion_percent', Decimal(precision=5, scale=2)),
('time_taken_seconds', Int32),
('submitted_at', Datetime(time_unit='ns', time_zone='UTC'))])
import json
submissions_schema = yads.to_pyspark(spec)
print(json.dumps(submissions_schema.jsonValue(), indent=2))
{
"type": "struct",
"fields": [
{
"name": "submission_id",
"type": "long",
"nullable": false,
"metadata": {
"description": "Submissions unique identifier."
}
},
{
"name": "completion_percent",
"type": "decimal(5,2)",
"nullable": true,
"metadata": {
"description": "Assessment completion percentage."
}
},
{
"name": "time_taken_seconds",
"type": "integer",
"nullable": true,
"metadata": {
"description": "Time taken submitting this assessment in seconds."
}
},
{
"name": "submitted_at",
"type": "timestamp",
"nullable": true,
"metadata": {
"description": "Timestamp in UTC when submitted."
}
}
]
}
import json
Submission = yads.to_pydantic(spec, model_name="Submission")
print(json.dumps(Submission.model_json_schema(), indent=2))
{
"properties": {
"submission_id": {
"description": "Submissions unique identifier.",
"maximum": 9223372036854775807,
"minimum": -9223372036854775808,
"title": "Submission Id",
"type": "integer",
"yads": {
"primary_key": true
}
},
"completion_percent": {
"anyOf": [
{
"type": "number"
},
{
"pattern": "^(?!^[-+.]*$)[+-]?0*(?:\\d{0,3}|(?=[\\d.]{1,6}0*$)\\d{0,3}\\.\\d{0,2}0*$)",
"type": "string"
},
{
"type": "null"
}
],
"default": 0.0,
"description": "Assessment completion percentage.",
"title": "Completion Percent"
},
"time_taken_seconds": {
"anyOf": [
{
"maximum": 2147483647,
"minimum": -2147483648,
"type": "integer"
},
{
"type": "null"
}
],
"description": "Time taken submitting this assessment in seconds.",
"title": "Time Taken Seconds"
},
"submitted_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"description": "Timestamp in UTC when submitted.",
"title": "Submitted At"
}
},
"required": [
"submission_id",
"time_taken_seconds",
"submitted_at"
],
"title": "Submission",
"type": "object"
}
spark_ddl = yads.to_sql(spec, dialect="spark", pretty=True)
print(spark_ddl)
CREATE TABLE prod.assessments.submissions_det (
submission_id BIGINT PRIMARY KEY NOT NULL,
completion_percent DECIMAL(5, 2) DEFAULT 0.0,
time_taken_seconds INT,
submitted_at TIMESTAMP
)
Tip
Check the complete converters API reference for all available converters and advanced customization options.
Register the spec¶
Use a yads registry to version your spec.
from yads.registries import FileSystemRegistry
registry = FileSystemRegistry("docs/src/specs/registry/")
version = registry.register(spec)
print(f"Registered spec '{spec.name}' as version {version}")
Registered spec 'prod.assessments.submissions_det' as version 1
Load from a typed source¶
Alternatively, a yads spec can be loaded from a knwon typed source with a supported loader.
import yads
import pyarrow as pa
schema = pa.schema(
[
pa.field(
"id",
pa.int64(),
nullable=False,
metadata={"description": "Customer ID"},
),
pa.field(
"name",
pa.string(),
metadata={"description": "Customer preferred name"},
),
pa.field(
"email",
pa.string(),
metadata={"description": "Customer email address"},
),
pa.field(
"created_at",
pa.timestamp("ns", tz="UTC"),
metadata={"description": "Customer creation timestamp"},
),
]
)
spec = yads.from_pyarrow(schema, name="catalog.crm.customers", version=1)
print(spec)
spec catalog.crm.customers(version=1)(
columns=[
id: integer(bits=64)(
description='Customer ID',
constraints=[NotNullConstraint()]
)
name: string(
description='Customer preferred name'
)
email: string(
description='Customer email address'
)
created_at: timestamptz(unit=ns, tz=UTC)(
description='Customer creation timestamp'
)
]
)
And transpiled into other formats.
duckdb_ddl = yads.to_sql(spec, dialect="duckdb", pretty=True)
print(duckdb_ddl)
CREATE TABLE catalog.crm.customers (
id BIGINT NOT NULL,
name TEXT,
email TEXT,
created_at TIMESTAMPTZ
)