Skip to content

SQLGlot Converter

SQLGlotConverter

Bases: BaseConverter[Any], AstConverter

Core converter that transforms yads specs into sqlglot AST expressions.

SQLGlotConverter is the foundational converter that handles the transformation from yads' high-level canonical spec to sqlglot's Abstract Syntax Tree representation. This AST serves as a dialect-agnostic intermediate representation that can then be serialized into SQL for specific database systems.

The converter uses single dispatch methods to handle different yads types, constraints, and spec elements, providing extensible type mapping and constraint conversion. It maintains the full expressiveness of the yads specification while producing valid sqlglot AST nodes.

The converter supports all yads type system features including primitive types, complex nested types, constraints, generated columns, partitioning transforms, and storage properties. It serves as the core engine for all SQL DDL generation in yads.

Source code in src/yads/converters/sql/ast_converter.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
class SQLGlotConverter(BaseConverter[Any], AstConverter):
    """Core converter that transforms yads specs into sqlglot AST expressions.

    SQLGlotConverter is the foundational converter that handles the transformation
    from yads' high-level canonical spec to sqlglot's Abstract Syntax Tree
    representation. This AST serves as a dialect-agnostic intermediate representation
    that can then be serialized into SQL for specific database systems.

    The converter uses single dispatch methods to handle different yads types,
    constraints, and spec elements, providing extensible type mapping and
    constraint conversion. It maintains the full expressiveness of the yads
    specification while producing valid sqlglot AST nodes.

    The converter supports all yads type system features including primitive types,
    complex nested types, constraints, generated columns, partitioning transforms,
    and storage properties. It serves as the core engine for all SQL DDL generation
    in yads.
    """

    def __init__(self, config: SQLGlotConverterConfig | None = None) -> None:
        """Initialize the SQLGlotConverter.

        Args:
            config: Configuration object. If None, uses default SQLGlotConverterConfig.
        """
        self.config: SQLGlotConverterConfig = config or SQLGlotConverterConfig()
        super().__init__(self.config)

    def _format_type_for_display(self, type_obj: Any) -> str:
        """Format sqlglot DataType for display in warnings.

        Extracts the type name from sqlglot's exp.DataType for cleaner
        display in warning messages.
        """
        from sqlglot import exp

        if isinstance(type_obj, exp.DataType) and hasattr(type_obj.this, "name"):
            return type_obj.this.name
        return str(type_obj)

    _TRANSFORM_HANDLERS: dict[str, str] = {
        "bucket": "_handle_bucket_transform",
        "truncate": "_handle_truncate_transform",
        "cast": "_handle_cast_transform",
        "date_trunc": "_handle_date_trunc_transform",
        "trunc": "_handle_date_trunc_transform",
    }

    @requires_dependency("sqlglot", import_name="sqlglot")
    def convert(
        self,
        spec: yspec.YadsSpec,
        *,
        mode: Literal["raise", "coerce"] | None = None,
    ) -> exp.Create:
        """Convert a yads `YadsSpec` into a sqlglot `exp.Create` AST expression.

        The resulting AST is dialect-agnostic and can be serialized to SQL for
        any database system supported by sqlglot. The conversion preserves all
        spec information and applies appropriate sqlglot expression types.

        Args:
            spec: The yads spec as a `YadsSpec` object.
            mode: Optional conversion mode override for this call. When not
                provided, the converter's configured mode is used. If provided:
                - "raise": Raise on any unsupported features.
                - "coerce": Apply adjustments to produce a valid AST and emit warnings.

        Returns:
            sqlglot `exp.Create` expression representing a CREATE TABLE statement.
            The AST includes table schema, constraints, properties, and metadata
            from the yads spec.
        """
        from sqlglot import exp

        # Set mode for this conversion call
        with self.conversion_context(mode=mode):
            self._validate_column_filters(spec)
            table = self._parse_full_table_name(
                spec.name,
                ignore_catalog=self.config.ignore_catalog,
                ignore_database=self.config.ignore_database,
            )
            properties = self._collect_properties(spec)
            expressions = self._collect_expressions(spec)

        return exp.Create(
            this=exp.Schema(this=table, expressions=expressions),
            kind="TABLE",
            exists=self.config.if_not_exists or None,
            replace=self.config.or_replace or None,
            properties=(exp.Properties(expressions=properties) if properties else None),
        )

    # %% ---- Type conversion ---------------------------------------------------------
    @singledispatchmethod
    def _convert_type(self, yads_type: ytypes.YadsType) -> exp.DataType:
        from sqlglot import exp
        from sqlglot.errors import ParseError

        # Fallback to default sqlglot DataType.build method.
        # The following non-parametrized yads types are handled via the fallback:
        # - Boolean
        # - JSON
        # - UUID
        # - Variant
        # https://sqlglot.com/sqlglot/expressions.html#DataType.build
        try:
            return exp.DataType.build(str(yads_type))
        except ParseError:
            # Currently unsupported in sqlglot:
            # - Duration
            # - Tensor
            return self.raise_or_coerce(
                yads_type,
                coerce_type=exp.DataType(this=self.config.fallback_type),
            )

    @_convert_type.register(ytypes.String)
    def _(self, yads_type: ytypes.String) -> exp.DataType:
        from sqlglot import exp
        from sqlglot.expressions import convert

        expressions = []
        if yads_type.length:
            expressions.append(exp.DataTypeParam(this=convert(yads_type.length)))
        return exp.DataType(
            this=exp.DataType.Type.TEXT,
            expressions=expressions if expressions else None,
        )

    @_convert_type.register(ytypes.Integer)
    def _(self, yads_type: ytypes.Integer) -> exp.DataType:
        from sqlglot import exp

        bits = yads_type.bits or 32
        signed_map = {
            8: exp.DataType.Type.TINYINT,
            16: exp.DataType.Type.SMALLINT,
            32: exp.DataType.Type.INT,
            64: exp.DataType.Type.BIGINT,
        }
        unsigned_map = {
            8: exp.DataType.Type.UTINYINT,
            16: exp.DataType.Type.USMALLINT,
            32: exp.DataType.Type.UINT,
            64: exp.DataType.Type.UBIGINT,
        }
        mapping = signed_map if yads_type.signed else unsigned_map
        try:
            return exp.DataType(this=mapping[bits])
        except KeyError as e:
            raise UnsupportedFeatureError(
                f"Unsupported Integer bits: {bits}. Expected 8/16/32/64"
                f" for '{self._field_context}'."
            ) from e

    @_convert_type.register(ytypes.Float)
    def _(self, yads_type: ytypes.Float) -> exp.DataType:
        from sqlglot import exp

        bits = yads_type.bits or 32
        if bits == 16:
            return self.raise_or_coerce(
                coerce_type=exp.DataType(this=exp.DataType.Type.FLOAT),
                error_msg=(
                    f"SQLGlotConverter does not support half-precision Float (bits={bits})."
                ),
            )
        elif bits == 32:
            return exp.DataType(this=exp.DataType.Type.FLOAT)
        elif bits == 64:
            return exp.DataType(this=exp.DataType.Type.DOUBLE)
        raise UnsupportedFeatureError(
            f"Unsupported Float bits: {bits}. Expected 16/32/64"
            f" for '{self._field_context}'."
        )

    @_convert_type.register(ytypes.Decimal)
    def _(self, yads_type: ytypes.Decimal) -> exp.DataType:
        from sqlglot import exp
        from sqlglot.expressions import convert

        expressions = []
        if yads_type.precision is not None:
            expressions.append(exp.DataTypeParam(this=convert(yads_type.precision)))
            expressions.append(exp.DataTypeParam(this=convert(yads_type.scale)))
        # Ignore bit-width parameter
        return exp.DataType(
            this=exp.DataType.Type.DECIMAL,
            expressions=expressions if expressions else None,
        )

    # Explicit mappings for parametrized temporal types
    @_convert_type.register(ytypes.Timestamp)
    def _(self, yads_type: ytypes.Timestamp) -> exp.DataType:
        from sqlglot import exp

        # Ignore unit parameter
        return exp.DataType(this=exp.DataType.Type.TIMESTAMP)

    @_convert_type.register(ytypes.TimestampTZ)
    def _(self, yads_type: ytypes.TimestampTZ) -> exp.DataType:
        from sqlglot import exp

        # Ignore unit parameter
        # Ignore tz parameter
        return exp.DataType(this=exp.DataType.Type.TIMESTAMPTZ)

    @_convert_type.register(ytypes.TimestampLTZ)
    def _(self, yads_type: ytypes.TimestampLTZ) -> exp.DataType:
        from sqlglot import exp

        # Ignore unit parameter
        return exp.DataType(this=exp.DataType.Type.TIMESTAMPLTZ)

    @_convert_type.register(ytypes.TimestampNTZ)
    def _(self, yads_type: ytypes.TimestampNTZ) -> exp.DataType:
        from sqlglot import exp

        # Ignore unit parameter
        return exp.DataType(this=exp.DataType.Type.TIMESTAMPNTZ)

    @_convert_type.register(ytypes.Time)
    def _(self, yads_type: ytypes.Time) -> exp.DataType:
        from sqlglot import exp

        # Ignore bit-width parameter
        # Ignore unit parameter
        return exp.DataType(this=exp.DataType.Type.TIME)

    @_convert_type.register(ytypes.Date)
    def _(self, yads_type: ytypes.Date) -> exp.DataType:
        from sqlglot import exp

        # Ignore bit-width parameter
        return exp.DataType(this=exp.DataType.Type.DATE)

    @_convert_type.register(ytypes.Binary)
    def _(self, yads_type: ytypes.Binary) -> exp.DataType:
        from sqlglot import exp
        from sqlglot.expressions import convert

        expressions = []
        if yads_type.length is not None:
            expressions.append(exp.DataTypeParam(this=convert(yads_type.length)))
        return exp.DataType(
            this=exp.DataType.Type.BINARY, expressions=expressions or None
        )

    @_convert_type.register(ytypes.Void)
    def _(self, yads_type: ytypes.Void) -> exp.DataType:
        from sqlglot import exp

        # VOID is not a valid sqlglot type, but can be defined as a Spark type.
        # https://docs.databricks.com/aws/en/sql/language-manual/data-types/null-type
        return exp.DataType(
            this=exp.DataType.Type.USERDEFINED,
            kind="VOID",
        )

    @_convert_type.register(ytypes.Interval)
    def _(self, yads_type: ytypes.Interval) -> exp.DataType:
        from sqlglot import exp

        if yads_type.interval_end and yads_type.interval_start != yads_type.interval_end:
            return exp.DataType(
                this=exp.Interval(
                    unit=exp.IntervalSpan(
                        this=exp.Var(this=yads_type.interval_start.value),
                        expression=exp.Var(this=yads_type.interval_end.value),
                    )
                )
            )
        return exp.DataType(
            this=exp.Interval(unit=exp.Var(this=yads_type.interval_start.value))
        )

    @_convert_type.register(ytypes.Array)
    def _(self, yads_type: ytypes.Array) -> exp.DataType:
        from sqlglot import exp

        element_type = self._convert_type(yads_type.element)
        # Ignore size parameter
        return exp.DataType(
            this=exp.DataType.Type.ARRAY,
            expressions=[element_type],
            nested=exp.DataType.Type.ARRAY in exp.DataType.NESTED_TYPES,
        )

    @_convert_type.register(ytypes.Struct)
    def _(self, yads_type: ytypes.Struct) -> exp.DataType:
        from sqlglot import exp

        return exp.DataType(
            this=exp.DataType.Type.STRUCT,
            expressions=[self._convert_field(field) for field in yads_type.fields],
            nested=exp.DataType.Type.STRUCT in exp.DataType.NESTED_TYPES,
        )

    @_convert_type.register(ytypes.Map)
    def _(self, yads_type: ytypes.Map) -> exp.DataType:
        from sqlglot import exp

        key_type = self._convert_type(yads_type.key)
        value_type = self._convert_type(yads_type.value)
        # Ignore keys_sorted parameter
        return exp.DataType(
            this=exp.DataType.Type.MAP,
            expressions=[key_type, value_type],
            nested=exp.DataType.Type.MAP in exp.DataType.NESTED_TYPES,
        )

    @_convert_type.register(ytypes.Geometry)
    def _(self, yads_type: ytypes.Geometry) -> exp.DataType:
        from sqlglot import exp
        from sqlglot.expressions import convert

        expressions = (
            [exp.DataTypeParam(this=convert(yads_type.srid))]
            if yads_type.srid is not None
            else None
        )
        return exp.DataType(this=exp.DataType.Type.GEOMETRY, expressions=expressions)

    @_convert_type.register(ytypes.Geography)
    def _(self, yads_type: ytypes.Geography) -> exp.DataType:
        from sqlglot import exp
        from sqlglot.expressions import convert

        expressions = (
            [exp.DataTypeParam(this=convert(yads_type.srid))]
            if yads_type.srid is not None
            else None
        )
        return exp.DataType(this=exp.DataType.Type.GEOGRAPHY, expressions=expressions)

    # %% ---- Column constraints ------------------------------------------------------
    @singledispatchmethod
    def _convert_column_constraint(self, constraint: Any) -> exp.ColumnConstraint | None:
        error_msg = (
            f"SQLGlotConverter does not support constraint: {type(constraint)}"
            f" for '{self._field_context}'."
        )

        if self.config.mode == "coerce":
            validation_warning(
                message=f"{error_msg} The constraint will be omitted.",
                filename="yads.converters.sql.ast_converter",
                module=__name__,
            )
            return None
        else:
            raise UnsupportedFeatureError(error_msg)

    @_convert_column_constraint.register(NotNullConstraint)
    def _(self, constraint: NotNullConstraint) -> exp.ColumnConstraint:
        from sqlglot import exp

        return exp.ColumnConstraint(kind=exp.NotNullColumnConstraint())

    @_convert_column_constraint.register(PrimaryKeyConstraint)
    def _(self, constraint: PrimaryKeyConstraint) -> exp.ColumnConstraint:
        from sqlglot import exp

        return exp.ColumnConstraint(kind=exp.PrimaryKeyColumnConstraint())

    @_convert_column_constraint.register(DefaultConstraint)
    def _(self, constraint: DefaultConstraint) -> exp.ColumnConstraint:
        from sqlglot import exp
        from sqlglot.expressions import convert

        return exp.ColumnConstraint(
            kind=exp.DefaultColumnConstraint(this=convert(constraint.value))
        )

    @_convert_column_constraint.register(IdentityConstraint)
    def _(self, constraint: IdentityConstraint) -> exp.ColumnConstraint:
        from sqlglot import exp
        from sqlglot.expressions import convert

        start_expr: exp.Expression | None = None
        if constraint.start is not None:
            start_expr = (
                exp.Neg(this=convert(abs(constraint.start)))
                if constraint.start < 0
                else convert(constraint.start)
            )

        increment_expr: exp.Expression | None = None
        if constraint.increment is not None:
            increment_expr = (
                exp.Neg(this=convert(abs(constraint.increment)))
                if constraint.increment < 0
                else convert(constraint.increment)
            )

        return exp.ColumnConstraint(
            kind=exp.GeneratedAsIdentityColumnConstraint(
                this=constraint.always,
                start=start_expr,
                increment=increment_expr,
            )
        )

    @_convert_column_constraint.register(ForeignKeyConstraint)
    def _(self, constraint: ForeignKeyConstraint) -> exp.ColumnConstraint:
        from sqlglot import exp

        reference_expression = exp.Reference(
            this=self._parse_full_table_name(
                constraint.references.table, constraint.references.columns
            ),
        )
        if constraint.name:
            return exp.ColumnConstraint(
                this=exp.Identifier(this=constraint.name), kind=reference_expression
            )
        return exp.ColumnConstraint(kind=reference_expression)

    # %% ---- Table constraints -------------------------------------------------------
    @singledispatchmethod
    def _convert_table_constraint(self, constraint: Any) -> exp.Expression | None:
        error_msg = (
            f"SQLGlotConverter does not support table constraint: {type(constraint)}"
        )

        if self.config.mode == "coerce":
            validation_warning(
                message=f"{error_msg} The constraint will be omitted.",
                filename="yads.converters.sql.ast_converter",
                module=__name__,
            )
            return None
        else:
            raise UnsupportedFeatureError(error_msg)

    @_convert_table_constraint.register(PrimaryKeyTableConstraint)
    def _(self, constraint: PrimaryKeyTableConstraint) -> exp.Expression:
        from sqlglot import exp

        pk_expression = exp.PrimaryKey(
            expressions=[
                exp.Ordered(
                    this=exp.Column(this=exp.Identifier(this=c)), nulls_first=True
                )
                for c in constraint.columns
            ],
            include=exp.IndexParameters(),
        )
        if constraint.name:
            return exp.Constraint(
                this=exp.Identifier(this=constraint.name), expressions=[pk_expression]
            )
        raise ConversionError("Primary key constraint must have a name.")

    @_convert_table_constraint.register(ForeignKeyTableConstraint)
    def _(self, constraint: ForeignKeyTableConstraint) -> exp.Expression:
        from sqlglot import exp

        reference_expression = exp.Reference(
            this=self._parse_full_table_name(
                constraint.references.table, constraint.references.columns
            ),
        )
        fk_expression = exp.ForeignKey(
            expressions=[exp.Identifier(this=c) for c in constraint.columns],
            reference=reference_expression,
        )
        if constraint.name:
            return exp.Constraint(
                this=exp.Identifier(this=constraint.name), expressions=[fk_expression]
            )
        raise ConversionError("Foreign key constraint must have a name.")

    # %% ---- Properties --------------------------------------------------------------
    def _handle_storage_properties(
        self, storage: yspec.Storage | None
    ) -> list[exp.Property]:
        if not storage:
            return []
        properties: list[exp.Property] = []
        if storage.format:
            properties.append(self._handle_file_format_property(storage.format))
        if storage.location:
            properties.append(self._handle_location_property(storage.location))
        if storage.tbl_properties:
            for key, value in storage.tbl_properties.items():
                properties.append(self._handle_generic_property(key, value))
        return properties

    def _handle_partitioned_by_property(
        self, value: list[yspec.TransformedColumnReference]
    ) -> exp.PartitionedByProperty:
        from sqlglot import exp

        schema_expressions = []
        for col in value:
            with self.conversion_context(field=col.column):
                if col.transform:
                    expression = self._handle_transformation(
                        col.column, col.transform, col.transform_args
                    )
                else:
                    expression = exp.Identifier(this=col.column)
                schema_expressions.append(expression)
        return exp.PartitionedByProperty(this=exp.Schema(expressions=schema_expressions))

    def _handle_location_property(self, value: str) -> exp.LocationProperty:
        from sqlglot import exp
        from sqlglot.expressions import convert

        return exp.LocationProperty(this=convert(value))

    def _handle_file_format_property(self, value: str) -> exp.FileFormatProperty:
        from sqlglot import exp

        return exp.FileFormatProperty(this=exp.Var(this=value))

    def _handle_external_property(self) -> exp.ExternalProperty:
        from sqlglot import exp

        return exp.ExternalProperty()

    def _handle_generic_property(self, key: str, value: Any) -> exp.Property:
        from sqlglot import exp
        from sqlglot.expressions import convert

        return exp.Property(this=convert(key), value=convert(value))

    def _collect_properties(self, spec: yspec.YadsSpec) -> list[exp.Property]:
        properties: list[exp.Property] = []
        if spec.external:
            properties.append(self._handle_external_property())
        properties.extend(self._handle_storage_properties(spec.storage))
        if spec.partitioned_by:
            properties.append(self._handle_partitioned_by_property(spec.partitioned_by))
        return properties

    # %% ---- Transform handlers ------------------------------------------------------
    def _handle_transformation(
        self, column: str, transform: str, transform_args: list[Any]
    ) -> exp.Expression:
        from sqlglot import exp

        if handler_method_name := self._TRANSFORM_HANDLERS.get(transform):
            handler_method = getattr(self, handler_method_name)
            return handler_method(column, transform_args)

        # Fallback to a generic function expression for all other transforms.
        # Most direct or parametrized transformation functions are supported
        # via the fallback. I.e.
        # - `day(original_col)`
        # - `month(original_col)`
        # - `year(original_col)`
        # - `date_format(original_col, 'yyyy-MM-dd')`
        # https://sqlglot.com/sqlglot/expressions.html#func
        return exp.func(
            transform, exp.column(column), *(exp.convert(arg) for arg in transform_args)
        )

    def _handle_cast_transform(
        self, column: str, transform_args: list[Any]
    ) -> exp.Expression:
        from sqlglot import exp

        self._validate_transform_args("cast", len(transform_args), 1)
        cast_to_type = transform_args[0].upper()
        try:
            target_type = exp.DataType.Type[cast_to_type]
        except KeyError:
            return self.raise_or_coerce(
                coerce_type=exp.Cast(
                    this=exp.column(column),
                    to=exp.DataType(this=self.config.fallback_type),
                ),
                error_msg=(
                    f"Transform type '{cast_to_type}' is not a valid sqlglot Type"
                    f" for '{self._field_context}'."
                ),
            )
        return exp.Cast(
            this=exp.column(column),
            to=exp.DataType(this=target_type),
        )

    def _handle_bucket_transform(
        self, column: str, transform_args: list[Any]
    ) -> exp.Expression:
        from sqlglot import exp

        self._validate_transform_args("bucket", len(transform_args), 1)
        return exp.PartitionedByBucket(
            this=exp.column(column), expression=exp.convert(transform_args[0])
        )

    def _handle_truncate_transform(
        self, column: str, transform_args: list[Any]
    ) -> exp.Expression:
        from sqlglot import exp

        self._validate_transform_args("truncate", len(transform_args), 1)
        return exp.PartitionByTruncate(
            this=exp.column(column), expression=exp.convert(transform_args[0])
        )

    def _handle_date_trunc_transform(
        self, column: str, transform_args: list[Any]
    ) -> exp.Expression:
        from sqlglot import exp

        self._validate_transform_args("date_trunc", len(transform_args), 1)
        return exp.DateTrunc(unit=exp.convert(transform_args[0]), this=exp.column(column))

    def _validate_transform_args(
        self, transform: str, received_args_len: int, required_args_len: int
    ) -> None:
        if received_args_len != required_args_len:
            raise ConversionError(
                f"The '{transform}' transform requires exactly {required_args_len} argument(s)."
                f" Got {received_args_len}."
            )

    # %% ---- Helpers -----------------------------------------------------------------
    def _convert_field(self, field: yspec.Field) -> exp.ColumnDef:
        from sqlglot import exp

        return exp.ColumnDef(
            this=exp.Identifier(this=field.name),
            kind=self._convert_type(field.type),
            constraints=None,
        )

    def _convert_column(self, column: yspec.Column) -> exp.ColumnDef:
        from sqlglot import exp

        constraints = []
        with self.conversion_context(field=column.name):
            if column.generated_as and column.generated_as.transform:
                expression = self._handle_transformation(
                    column.generated_as.column,
                    column.generated_as.transform,
                    column.generated_as.transform_args,
                )
                constraints.append(
                    exp.ColumnConstraint(
                        kind=exp.GeneratedAsIdentityColumnConstraint(
                            this=True, expression=expression
                        )
                    )
                )
            for constraint in column.constraints:
                converted = self._convert_column_constraint(constraint)
                if converted is not None:
                    constraints.append(converted)
            return exp.ColumnDef(
                this=exp.Identifier(this=column.name),
                kind=self._convert_type(column.type),
                constraints=constraints if constraints else None,
            )

    def _convert_field_default(self, field: yspec.Field) -> exp.ColumnDef:
        if not isinstance(field, yspec.Column):  # Overrides happen on column level
            raise TypeError(f"Expected Column, got {type(field)}")
        return self._convert_column(field)

    def _collect_expressions(self, spec: yspec.YadsSpec) -> list[exp.Expression]:
        expressions: list[exp.Expression] = []
        for col in self._filter_columns(spec):
            with self.conversion_context(field=col.name):
                column_expr = self._convert_field_with_overrides(col)
                expressions.append(column_expr)

        for tbl_constraint in spec.table_constraints:
            converted_constraint = self._convert_table_constraint(tbl_constraint)
            if converted_constraint is not None:
                expressions.append(converted_constraint)
        return expressions

    def _parse_full_table_name(
        self,
        full_name: str,
        columns: list[str] | None = None,
        ignore_catalog: bool = False,
        ignore_database: bool = False,
    ) -> exp.Table | exp.Schema:
        from sqlglot import exp

        parts = full_name.split(".")
        table_name = parts[-1]
        db_name = None
        catalog_name = None
        if not ignore_database and len(parts) > 1:
            db_name = parts[-2]
        if not ignore_catalog and len(parts) > 2:
            catalog_name = parts[-3]

        table_expression = exp.Table(
            this=exp.Identifier(this=table_name),
            db=exp.Identifier(this=db_name) if db_name else None,
            catalog=exp.Identifier(this=catalog_name) if catalog_name else None,
        )
        if columns:
            return exp.Schema(
                this=table_expression,
                expressions=[exp.Identifier(this=c) for c in columns],
            )
        return table_expression

__init__(config=None)

Initialize the SQLGlotConverter.

Parameters:

Name Type Description Default
config SQLGlotConverterConfig | None

Configuration object. If None, uses default SQLGlotConverterConfig.

None
Source code in src/yads/converters/sql/ast_converter.py
138
139
140
141
142
143
144
145
def __init__(self, config: SQLGlotConverterConfig | None = None) -> None:
    """Initialize the SQLGlotConverter.

    Args:
        config: Configuration object. If None, uses default SQLGlotConverterConfig.
    """
    self.config: SQLGlotConverterConfig = config or SQLGlotConverterConfig()
    super().__init__(self.config)

conversion_context(*, mode=None, field=None)

Temporarily set conversion mode and field context.

This context manager centralizes handling of converter state used for warnings and coercions, ensuring that values are restored afterwards.

Parameters:

Name Type Description Default
mode Literal['raise', 'coerce'] | None

Optional override for the current conversion mode.

None
field str | None

Optional field name for contextual warnings.

None
Source code in src/yads/converters/base.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
@contextmanager
def conversion_context(
    self,
    *,
    mode: Literal["raise", "coerce"] | None = None,
    field: str | None = None,
) -> Generator[None, None, None]:
    """Temporarily set conversion mode and field context.

    This context manager centralizes handling of converter state used for
    warnings and coercions, ensuring that values are restored afterwards.

    Args:
        mode: Optional override for the current conversion mode.
        field: Optional field name for contextual warnings.
    """
    # Snapshot current state
    previous_config = self.config
    previous_field = self._current_field_name

    try:
        if mode is not None:
            if mode not in ("raise", "coerce"):
                raise ConverterConfigError("mode must be one of 'raise' or 'coerce'.")
            self.config = replace(self.config, mode=mode)
        if field is not None:
            self._current_field_name = field
        yield
    finally:
        # Restore prior state
        self.config = previous_config
        self._current_field_name = previous_field

convert(spec, *, mode=None)

Convert a yads YadsSpec into a sqlglot exp.Create AST expression.

The resulting AST is dialect-agnostic and can be serialized to SQL for any database system supported by sqlglot. The conversion preserves all spec information and applies appropriate sqlglot expression types.

Parameters:

Name Type Description Default
spec YadsSpec

The yads spec as a YadsSpec object.

required
mode Literal['raise', 'coerce'] | None

Optional conversion mode override for this call. When not provided, the converter's configured mode is used. If provided: - "raise": Raise on any unsupported features. - "coerce": Apply adjustments to produce a valid AST and emit warnings.

None

Returns:

Type Description
Create

sqlglot exp.Create expression representing a CREATE TABLE statement.

Create

The AST includes table schema, constraints, properties, and metadata

Create

from the yads spec.

Source code in src/yads/converters/sql/ast_converter.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@requires_dependency("sqlglot", import_name="sqlglot")
def convert(
    self,
    spec: yspec.YadsSpec,
    *,
    mode: Literal["raise", "coerce"] | None = None,
) -> exp.Create:
    """Convert a yads `YadsSpec` into a sqlglot `exp.Create` AST expression.

    The resulting AST is dialect-agnostic and can be serialized to SQL for
    any database system supported by sqlglot. The conversion preserves all
    spec information and applies appropriate sqlglot expression types.

    Args:
        spec: The yads spec as a `YadsSpec` object.
        mode: Optional conversion mode override for this call. When not
            provided, the converter's configured mode is used. If provided:
            - "raise": Raise on any unsupported features.
            - "coerce": Apply adjustments to produce a valid AST and emit warnings.

    Returns:
        sqlglot `exp.Create` expression representing a CREATE TABLE statement.
        The AST includes table schema, constraints, properties, and metadata
        from the yads spec.
    """
    from sqlglot import exp

    # Set mode for this conversion call
    with self.conversion_context(mode=mode):
        self._validate_column_filters(spec)
        table = self._parse_full_table_name(
            spec.name,
            ignore_catalog=self.config.ignore_catalog,
            ignore_database=self.config.ignore_database,
        )
        properties = self._collect_properties(spec)
        expressions = self._collect_expressions(spec)

    return exp.Create(
        this=exp.Schema(this=table, expressions=expressions),
        kind="TABLE",
        exists=self.config.if_not_exists or None,
        replace=self.config.or_replace or None,
        properties=(exp.Properties(expressions=properties) if properties else None),
    )

raise_or_coerce(yads_type=None, *, coerce_type=None, error_msg=None)

Handle raise or coerce mode for unsupported type features.

This public method provides a consistent way to handle unsupported types based on the converter's mode. It can be used within converters and in custom column override functions.

The method uses the template method pattern with several hook methods that subclasses can override to customize behavior.

Hook that subclasses can override
  • _format_type_for_display: Customize how types appear in warnings
  • _emit_warning: Customize warning emission
  • _get_fallback_type: Customize fallback type resolution
  • _generate_error_message: Customize error message generation

Parameters:

Name Type Description Default
yads_type Any | None

The yads type that is not supported. Can be None if error_msg is explicitly provided.

None
coerce_type Any | None

The type to coerce to in coerce mode. If None, uses the converter's configured fallback type.

None
error_msg str | None

Custom error message. If None, uses a default message based on the converter class name and yads_type. When providing a custom error_msg, yads_type can be None.

None

Returns:

Type Description
T

The coerced type in coerce mode.

Raises:

Type Description
UnsupportedFeatureError

In raise mode when the feature is not supported, or in coerce mode when fallback_type is None.

ValueError

If both yads_type and error_msg are None.

Source code in src/yads/converters/base.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def raise_or_coerce(
    self,
    yads_type: Any | None = None,
    *,
    coerce_type: Any | None = None,
    error_msg: str | None = None,
) -> T:
    """Handle raise or coerce mode for unsupported type features.

    This public method provides a consistent way to handle unsupported types
    based on the converter's mode. It can be used within converters and in
    custom column override functions.

    The method uses the template method pattern with several hook methods
    that subclasses can override to customize behavior.

    Hook that subclasses can override:
        - `_format_type_for_display`: Customize how types appear in warnings
        - `_emit_warning`: Customize warning emission
        - `_get_fallback_type`: Customize fallback type resolution
        - `_generate_error_message`: Customize error message generation

    Args:
        yads_type: The yads type that is not supported. Can be None if
            error_msg is explicitly provided.
        coerce_type: The type to coerce to in coerce mode. If None, uses
            the converter's configured fallback type.
        error_msg: Custom error message. If None, uses a default message
            based on the converter class name and yads_type. When providing
            a custom error_msg, yads_type can be None.

    Returns:
        The coerced type in coerce mode.

    Raises:
        UnsupportedFeatureError: In raise mode when the feature is not supported,
            or in coerce mode when fallback_type is None.
        ValueError: If both yads_type and error_msg are None.
    """
    # Resolve error message once
    if error_msg is None:
        if yads_type is None:
            raise ValueError(
                "Either yads_type or error_msg must be provided to raise_or_coerce"
            )
        error_msg = self._generate_error_message(yads_type)

    # Resolve coerce_type (fallback to config if not provided)
    if coerce_type is None:
        try:
            coerce_type = self._get_fallback_type()
        except ValueError:
            # fallback_type is None - must raise even in coerce mode
            if self.config.mode == "coerce":
                error_msg = f"{error_msg} Specify a fallback_type to enable coercion of unsupported types."
            raise UnsupportedFeatureError(error_msg)

    # Handle based on mode
    if self.config.mode == "coerce":
        display_type = self._format_type_for_display(coerce_type)
        warning_msg = f"{error_msg} The data type will be coerced to {display_type}."
        self._emit_warning(warning_msg)
        return coerce_type
    else:
        raise UnsupportedFeatureError(error_msg)

SQLGlotConverterConfig dataclass

Bases: BaseConverterConfig[Any]

Configuration for SQLGlotConverter.

Parameters:

Name Type Description Default
mode Literal['raise', 'coerce']

Conversion mode. One of "raise" or "coerce". Inherited from BaseConverterConfig. Defaults to "coerce".

'coerce'
ignore_columns frozenset[str]

Column names to exclude from conversion. Inherited from BaseConverterConfig. Defaults to empty.

_empty_frozenset_str()
include_columns frozenset[str] | None

If provided, only these columns are included. Inherited from BaseConverterConfig. Defaults to None.

None
column_overrides Mapping[str, Callable[[Field, SQLGlotConverter], ColumnDef]]

Mapping of column name to a callable that returns a custom sqlglot exp.ColumnDef. Inherited from BaseConverterConfig. Defaults to empty mapping.

(lambda: MappingProxyType({}))()
if_not_exists bool

If True, sets the exists property of the exp.Create node to True. Defaults to False.

False
or_replace bool

If True, sets the replace property of the exp.Create node to True. Defaults to False.

False
ignore_catalog bool

If True, omits the catalog from the table name. Defaults to False.

False
ignore_database bool

If True, omits the database from the table name. Defaults to False.

False
fallback_type Type | None

SQL data type to use for unsupported types in coerce mode. Must be one of: exp.DataType.Type.TEXT, exp.DataType.Type.BINARY, exp.DataType.Type.BLOB, or None. Defaults to None.

None
Source code in src/yads/converters/sql/ast_converter.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@dataclass(frozen=True)
# %% ---- Configuration --------------------------------------------------------------
class SQLGlotConverterConfig(BaseConverterConfig[Any]):
    """Configuration for SQLGlotConverter.

    Args:
        mode: Conversion mode. One of "raise" or "coerce". Inherited from
            BaseConverterConfig. Defaults to "coerce".
        ignore_columns: Column names to exclude from conversion. Inherited from
            BaseConverterConfig. Defaults to empty.
        include_columns: If provided, only these columns are included. Inherited
            from BaseConverterConfig. Defaults to None.
        column_overrides: Mapping of column name to a callable that returns a
            custom sqlglot `exp.ColumnDef`. Inherited from BaseConverterConfig.
            Defaults to empty mapping.
        if_not_exists: If True, sets the `exists` property of the `exp.Create`
            node to `True`. Defaults to False.
        or_replace: If True, sets the `replace` property of the `exp.Create`
            node to `True`. Defaults to False.
        ignore_catalog: If True, omits the catalog from the table name. Defaults to False.
        ignore_database: If True, omits the database from the table name. Defaults to False.
        fallback_type: SQL data type to use for unsupported types in coerce mode.
            Must be one of: exp.DataType.Type.TEXT, exp.DataType.Type.BINARY, exp.DataType.Type.BLOB, or None.
            Defaults to None.
    """

    if_not_exists: bool = False
    or_replace: bool = False
    ignore_catalog: bool = False
    ignore_database: bool = False
    fallback_type: exp.DataType.Type | None = None
    column_overrides: Mapping[
        str, Callable[[yspec.Field, SQLGlotConverter], exp.ColumnDef]
    ] = field(default_factory=lambda: MappingProxyType({}))

    def __post_init__(self) -> None:
        """Validate configuration parameters."""
        super().__post_init__()

        # Validate fallback_type if provided
        if self.fallback_type is not None:
            from sqlglot import exp

            valid_fallback_types = {
                exp.DataType.Type.TEXT,
                exp.DataType.Type.BINARY,
                exp.DataType.Type.BLOB,
            }
            if self.fallback_type not in valid_fallback_types:
                raise UnsupportedFeatureError(
                    f"fallback_type must be one of: exp.DataType.Type.TEXT, "
                    f"exp.DataType.Type.BINARY, exp.DataType.Type.BLOB, or None. Got: {self.fallback_type}"
                )

__post_init__()

Validate configuration parameters.

Source code in src/yads/converters/sql/ast_converter.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def __post_init__(self) -> None:
    """Validate configuration parameters."""
    super().__post_init__()

    # Validate fallback_type if provided
    if self.fallback_type is not None:
        from sqlglot import exp

        valid_fallback_types = {
            exp.DataType.Type.TEXT,
            exp.DataType.Type.BINARY,
            exp.DataType.Type.BLOB,
        }
        if self.fallback_type not in valid_fallback_types:
            raise UnsupportedFeatureError(
                f"fallback_type must be one of: exp.DataType.Type.TEXT, "
                f"exp.DataType.Type.BINARY, exp.DataType.Type.BLOB, or None. Got: {self.fallback_type}"
            )