Skip to content

File System Registry

This registry uses fsspec to support cloud object stores. Install fsspec with the backend you need:

Backend Install
Local paths uv add 'yads[fs]'
S3 uv add 'yads[s3]'
Azure Blob Storage uv add 'yads[abfs]'
Google Cloud Storage uv add 'yads[gcs]'
Backend Install
Local paths pip install "yads[fs]"
S3 pip install "yads[s3]"
Azure Blob Storage pip install "yads[abfs]"
Google Cloud Storage pip install "yads[gcs]"

FileSystemRegistry

Bases: BaseRegistry

Filesystem-based registry using fsspec for multi-cloud support.

Stores specs in a simple directory structure:

{base_path}/
└── {url_encoded_spec_name}/
    └── versions/
        ├── 1.yaml
        ├── 2.yaml
        └── 3.yaml

The registry assigns monotonically increasing version numbers automatically. If a spec with identical content (excluding version) is registered, the existing version number is returned.

Thread Safety

This implementation is not thread-safe. Concurrent registrations may result in race conditions. For production use, ensure only one process (e.g., a CI/CD pipeline) has write access to the registry.

Parameters:

Name Type Description Default
base_path str

Base path for the registry. Can be local path or cloud URL: - Local: "/path/to/registry" - S3: "s3://bucket/registry/" - GCS: "gs://bucket/registry/" - Azure: "az://container/registry/"

required
logger Logger | None

Optional logger for registry operations. If None, creates a default logger at "yads.registries.filesystem".

None
**fsspec_kwargs Any

Additional arguments passed to fsspec for authentication and configuration (e.g., profile="production" for S3).

{}

Raises:

Type Description
RegistryConnectionError

If the base path is invalid or inaccessible.

Example
# Local registry
registry = FileSystemRegistry("/data/specs")

# S3 with specific profile
registry = FileSystemRegistry(
    "s3://my-bucket/schemas/",
    profile="production"
)

# With custom logger
import logging
logger = logging.getLogger("my_app.registry")
registry = FileSystemRegistry("/data/specs", logger=logger)
Source code in src/yads/registries/filesystem_registry.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
class FileSystemRegistry(BaseRegistry):
    """Filesystem-based registry using fsspec for multi-cloud support.

    Stores specs in a simple directory structure:

        {base_path}/
        └── {url_encoded_spec_name}/
            └── versions/
                ├── 1.yaml
                ├── 2.yaml
                └── 3.yaml

    The registry assigns monotonically increasing version numbers automatically.
    If a spec with identical content (excluding version) is registered, the
    existing version number is returned.

    Thread Safety:
        This implementation is not thread-safe. Concurrent registrations may
        result in race conditions. For production use, ensure only one process
        (e.g., a CI/CD pipeline) has write access to the registry.

    Args:
        base_path: Base path for the registry. Can be local path or cloud URL:
            - Local: "/path/to/registry"
            - S3: "s3://bucket/registry/"
            - GCS: "gs://bucket/registry/"
            - Azure: "az://container/registry/"
        logger: Optional logger for registry operations. If None, creates
            a default logger at "yads.registries.filesystem".
        **fsspec_kwargs: Additional arguments passed to fsspec for authentication
            and configuration (e.g., profile="production" for S3).

    Raises:
        RegistryConnectionError: If the base path is invalid or inaccessible.

    Example:
        ```python
        # Local registry
        registry = FileSystemRegistry("/data/specs")

        # S3 with specific profile
        registry = FileSystemRegistry(
            "s3://my-bucket/schemas/",
            profile="production"
        )

        # With custom logger
        import logging
        logger = logging.getLogger("my_app.registry")
        registry = FileSystemRegistry("/data/specs", logger=logger)
        ```
    """

    # Characters not allowed in spec names (filesystem-unsafe)
    INVALID_NAME_CHARS = frozenset({"/", "\\", ":", "*", "?", "<", ">", "|", "\0"})

    def __init__(
        self,
        base_path: str,
        logger: logging.Logger | None = None,
        serializer: SpecSerializer | None = None,
        **fsspec_kwargs: Any,
    ):
        """Initialize the FileSystemRegistry.

        Args:
            base_path: Base path for the registry storage.
            logger: Optional logger instance.
            serializer: Optional spec serializer override used for YAML exports.
            **fsspec_kwargs: Additional fsspec configuration.
        """
        # Initialize logger
        self.logger = logger or logging.getLogger("yads.registries.filesystem")

        # Initialize filesystem
        try:
            fs_result = cast(
                tuple[Any, Any],
                fsspec.core.url_to_fs(  # pyright: ignore[reportUnknownMemberType]
                    base_path, **fsspec_kwargs
                ),
            )
            fs_obj_any, resolved_base_path_any = fs_result
            fs_obj = cast(FileSystemProtocol, fs_obj_any)
            resolved_base_path = str(resolved_base_path_any)
            # Validate base path exists by attempting to access it
            fs_obj.exists(resolved_base_path)
        except Exception as e:
            raise RegistryConnectionError(
                f"Failed to connect to registry at '{base_path}': {e}"
            ) from e

        self.fs: FileSystemProtocol = fs_obj
        self.base_path: str = resolved_base_path
        self._serializer = serializer or SpecSerializer()
        self.logger.info(f"Initialized FileSystemRegistry at: {self.base_path}")

    def register(self, spec: YadsSpec) -> int:
        """Register a spec and assign it a version number.

        If the spec content matches the latest version (excluding the version
        field), returns the existing version number without creating a new entry.

        Args:
            spec: The YadsSpec to register.

        Returns:
            The assigned or existing version number.

        Raises:
            InvalidSpecNameError: If `spec.name` contains invalid characters.
            RegistryError: If registration fails.
        """
        # Validate spec name
        self._validate_spec_name(spec.name)

        # URL-encode the spec name for filesystem safety
        encoded_name = urllib.parse.quote(spec.name, safe="")

        # Get latest version
        latest_version = self._get_latest_version(encoded_name)

        # Check if content is identical to latest
        if latest_version is not None:
            try:
                latest_spec = self._read_spec(encoded_name, latest_version)
                if self._specs_equal(spec, latest_spec):
                    warnings.warn(
                        f"Spec '{spec.name}' content is identical to version "
                        f"{latest_version}. Skipping registration.",
                        DuplicateSpecWarning,
                        stacklevel=2,
                    )
                    self.logger.warning(
                        f"Duplicate content for '{spec.name}'. "
                        f"Returning existing version {latest_version}."
                    )
                    return latest_version
            except Exception as e:
                self.logger.debug(f"Could not read latest version for comparison: {e}")

        # Assign new version
        new_version = (latest_version or 0) + 1

        # Write the new version
        try:
            self._write_spec(encoded_name, new_version, spec)
            self.logger.info(f"Registered '{spec.name}' as version {new_version}")
            return new_version
        except Exception as e:
            raise RegistryError(f"Failed to register spec '{spec.name}': {e}") from e

    def get(self, name: str, version: int | None = None) -> YadsSpec:
        """Retrieve a spec by name and optional version.

        Args:
            name: The fully qualified spec name.
            version: Optional version number. If None, retrieves latest.

        Returns:
            The requested YadsSpec with version field set.

        Raises:
            SpecNotFoundError: If the spec or version doesn't exist.
            RegistryError: If retrieval fails.
        """
        encoded_name = urllib.parse.quote(name, safe="")

        # Determine version to retrieve
        if version is None:
            version = self._get_latest_version(encoded_name)
            if version is None:
                raise SpecNotFoundError(f"Spec '{name}' not found in registry")
            self.logger.debug(f"Retrieving latest version {version} of '{name}'")
        else:
            self.logger.debug(f"Retrieving version {version} of '{name}'")

        # Read and return the spec
        try:
            spec = self._read_spec(encoded_name, version)
            self.logger.info(f"Retrieved '{name}' version {version}")
            return spec
        except FileNotFoundError:
            raise SpecNotFoundError(
                f"Spec '{name}' version {version} not found in registry"
            )
        except Exception as e:
            raise RegistryError(
                f"Failed to retrieve spec '{name}' version {version}: {e}"
            ) from e

    def list_versions(self, name: str) -> list[int]:
        """List all available versions for a spec.

        Args:
            name: The fully qualified spec name.

        Returns:
            Sorted list of version numbers, or empty list if not found.

        Raises:
            RegistryError: If listing fails.
        """
        encoded_name = urllib.parse.quote(name, safe="")
        versions_dir = f"{self.base_path}/{encoded_name}/versions"

        try:
            if not self.fs.exists(versions_dir):
                self.logger.debug(f"No versions found for '{name}'")
                return []

            # List all files in versions directory
            files = self.fs.ls(versions_dir, detail=False)

            # Extract version numbers from filenames
            versions: list[int] = []
            for file_path in files:
                filename = file_path.split("/")[-1]
                if filename.endswith(".yaml"):
                    try:
                        version_num = int(filename[:-5])  # Remove .yaml extension
                        versions.append(version_num)
                    except ValueError:
                        self.logger.warning(f"Skipping non-version file: {filename}")

            versions.sort()
            self.logger.debug(f"Found {len(versions)} versions for '{name}'")
            return versions

        except Exception as e:
            raise RegistryError(f"Failed to list versions for '{name}': {e}") from e

    def exists(self, name: str) -> bool:
        """Check if a spec exists in the registry.

        Args:
            name: The fully qualified spec name.

        Returns:
            True if the spec exists, False otherwise.
        """
        encoded_name = urllib.parse.quote(name, safe="")
        spec_dir = f"{self.base_path}/{encoded_name}"

        try:
            result = self.fs.exists(spec_dir)
            self.logger.debug(f"Spec '{name}' exists: {result}")
            return result
        except Exception as e:
            self.logger.error(f"Failed to check existence of '{name}': {e}")
            return False

    # Private helper methods
    def _validate_spec_name(self, name: str) -> None:
        """Validate that spec name doesn't contain filesystem-unsafe characters.

        Args:
            name: The spec name to validate.

        Raises:
            InvalidSpecNameError: If name contains invalid characters.
        """
        if not name:
            raise InvalidSpecNameError("Spec name cannot be empty")

        invalid_found = set(name) & self.INVALID_NAME_CHARS
        if invalid_found:
            chars_str = ", ".join(repr(c) for c in sorted(invalid_found))
            raise InvalidSpecNameError(
                f"Spec name '{name}' contains invalid characters: {chars_str}"
            )

    def _get_latest_version(self, encoded_name: str) -> int | None:
        """Get the latest version number for a spec.

        Args:
            encoded_name: URL-encoded spec name.

        Returns:
            Latest version number, or None if no versions exist.
        """
        versions = self.list_versions(urllib.parse.unquote(encoded_name))
        return max(versions) if versions else None

    def _specs_equal(self, spec1: YadsSpec, spec2: YadsSpec) -> bool:
        """Compare two specs for equality, excluding the version field.

        Args:
            spec1: First spec to compare.
            spec2: Second spec to compare.

        Returns:
            True if specs are equal (excluding version), False otherwise.
        """
        return self._normalized_spec_dict(spec1) == self._normalized_spec_dict(spec2)

    def _normalized_spec_dict(self, spec: YadsSpec) -> dict[str, Any]:
        """Serialize a spec into a dict suitable for equality checks."""
        normalized = self._serializer.serialize(spec)
        normalized["version"] = 0
        return normalized

    def _write_spec(self, encoded_name: str, version: int, spec: YadsSpec) -> None:
        """Write a spec to the registry.

        Args:
            encoded_name: URL-encoded spec name.
            version: Version number to assign.
            spec: The spec to write.
        """
        yaml_content = self._serialize_spec(spec, version)
        versions_dir = f"{self.base_path}/{encoded_name}/versions"
        file_path = f"{versions_dir}/{version}.yaml"

        # Ensure directory exists
        self.fs.makedirs(versions_dir, exist_ok=True)

        # Write file
        with self.fs.open(file_path, "w") as f:
            f.write(yaml_content)

    def _read_spec(self, encoded_name: str, version: int) -> YadsSpec:
        """Read a spec from the registry.

        Args:
            encoded_name: URL-encoded spec name.
            version: Version number to read.

        Returns:
            The loaded YadsSpec.

        Raises:
            FileNotFoundError: If the version file doesn't exist.
        """
        file_path = f"{self.base_path}/{encoded_name}/versions/{version}.yaml"

        with self.fs.open(file_path, "r") as f:
            yaml_content = f.read()

        # Load spec from YAML
        return from_yaml_string(yaml_content)

    def _serialize_spec(self, spec: YadsSpec, version: int) -> str:
        """Serialize a spec to YAML string with specified version.

        Args:
            spec: The spec to serialize.
            version: Version number to set in the YAML.

        Returns:
            YAML string representation.
        """
        serialized_spec = self._serializer.serialize(replace(spec, version=version))
        return yaml.safe_dump(serialized_spec, sort_keys=False, default_flow_style=False)

__init__(base_path, logger=None, serializer=None, **fsspec_kwargs)

Initialize the FileSystemRegistry.

Parameters:

Name Type Description Default
base_path str

Base path for the registry storage.

required
logger Logger | None

Optional logger instance.

None
serializer SpecSerializer | None

Optional spec serializer override used for YAML exports.

None
**fsspec_kwargs Any

Additional fsspec configuration.

{}
Source code in src/yads/registries/filesystem_registry.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def __init__(
    self,
    base_path: str,
    logger: logging.Logger | None = None,
    serializer: SpecSerializer | None = None,
    **fsspec_kwargs: Any,
):
    """Initialize the FileSystemRegistry.

    Args:
        base_path: Base path for the registry storage.
        logger: Optional logger instance.
        serializer: Optional spec serializer override used for YAML exports.
        **fsspec_kwargs: Additional fsspec configuration.
    """
    # Initialize logger
    self.logger = logger or logging.getLogger("yads.registries.filesystem")

    # Initialize filesystem
    try:
        fs_result = cast(
            tuple[Any, Any],
            fsspec.core.url_to_fs(  # pyright: ignore[reportUnknownMemberType]
                base_path, **fsspec_kwargs
            ),
        )
        fs_obj_any, resolved_base_path_any = fs_result
        fs_obj = cast(FileSystemProtocol, fs_obj_any)
        resolved_base_path = str(resolved_base_path_any)
        # Validate base path exists by attempting to access it
        fs_obj.exists(resolved_base_path)
    except Exception as e:
        raise RegistryConnectionError(
            f"Failed to connect to registry at '{base_path}': {e}"
        ) from e

    self.fs: FileSystemProtocol = fs_obj
    self.base_path: str = resolved_base_path
    self._serializer = serializer or SpecSerializer()
    self.logger.info(f"Initialized FileSystemRegistry at: {self.base_path}")

exists(name)

Check if a spec exists in the registry.

Parameters:

Name Type Description Default
name str

The fully qualified spec name.

required

Returns:

Type Description
bool

True if the spec exists, False otherwise.

Source code in src/yads/registries/filesystem_registry.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def exists(self, name: str) -> bool:
    """Check if a spec exists in the registry.

    Args:
        name: The fully qualified spec name.

    Returns:
        True if the spec exists, False otherwise.
    """
    encoded_name = urllib.parse.quote(name, safe="")
    spec_dir = f"{self.base_path}/{encoded_name}"

    try:
        result = self.fs.exists(spec_dir)
        self.logger.debug(f"Spec '{name}' exists: {result}")
        return result
    except Exception as e:
        self.logger.error(f"Failed to check existence of '{name}': {e}")
        return False

get(name, version=None)

Retrieve a spec by name and optional version.

Parameters:

Name Type Description Default
name str

The fully qualified spec name.

required
version int | None

Optional version number. If None, retrieves latest.

None

Returns:

Type Description
YadsSpec

The requested YadsSpec with version field set.

Raises:

Type Description
SpecNotFoundError

If the spec or version doesn't exist.

RegistryError

If retrieval fails.

Source code in src/yads/registries/filesystem_registry.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def get(self, name: str, version: int | None = None) -> YadsSpec:
    """Retrieve a spec by name and optional version.

    Args:
        name: The fully qualified spec name.
        version: Optional version number. If None, retrieves latest.

    Returns:
        The requested YadsSpec with version field set.

    Raises:
        SpecNotFoundError: If the spec or version doesn't exist.
        RegistryError: If retrieval fails.
    """
    encoded_name = urllib.parse.quote(name, safe="")

    # Determine version to retrieve
    if version is None:
        version = self._get_latest_version(encoded_name)
        if version is None:
            raise SpecNotFoundError(f"Spec '{name}' not found in registry")
        self.logger.debug(f"Retrieving latest version {version} of '{name}'")
    else:
        self.logger.debug(f"Retrieving version {version} of '{name}'")

    # Read and return the spec
    try:
        spec = self._read_spec(encoded_name, version)
        self.logger.info(f"Retrieved '{name}' version {version}")
        return spec
    except FileNotFoundError:
        raise SpecNotFoundError(
            f"Spec '{name}' version {version} not found in registry"
        )
    except Exception as e:
        raise RegistryError(
            f"Failed to retrieve spec '{name}' version {version}: {e}"
        ) from e

list_versions(name)

List all available versions for a spec.

Parameters:

Name Type Description Default
name str

The fully qualified spec name.

required

Returns:

Type Description
list[int]

Sorted list of version numbers, or empty list if not found.

Raises:

Type Description
RegistryError

If listing fails.

Source code in src/yads/registries/filesystem_registry.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def list_versions(self, name: str) -> list[int]:
    """List all available versions for a spec.

    Args:
        name: The fully qualified spec name.

    Returns:
        Sorted list of version numbers, or empty list if not found.

    Raises:
        RegistryError: If listing fails.
    """
    encoded_name = urllib.parse.quote(name, safe="")
    versions_dir = f"{self.base_path}/{encoded_name}/versions"

    try:
        if not self.fs.exists(versions_dir):
            self.logger.debug(f"No versions found for '{name}'")
            return []

        # List all files in versions directory
        files = self.fs.ls(versions_dir, detail=False)

        # Extract version numbers from filenames
        versions: list[int] = []
        for file_path in files:
            filename = file_path.split("/")[-1]
            if filename.endswith(".yaml"):
                try:
                    version_num = int(filename[:-5])  # Remove .yaml extension
                    versions.append(version_num)
                except ValueError:
                    self.logger.warning(f"Skipping non-version file: {filename}")

        versions.sort()
        self.logger.debug(f"Found {len(versions)} versions for '{name}'")
        return versions

    except Exception as e:
        raise RegistryError(f"Failed to list versions for '{name}': {e}") from e

register(spec)

Register a spec and assign it a version number.

If the spec content matches the latest version (excluding the version field), returns the existing version number without creating a new entry.

Parameters:

Name Type Description Default
spec YadsSpec

The YadsSpec to register.

required

Returns:

Type Description
int

The assigned or existing version number.

Raises:

Type Description
InvalidSpecNameError

If spec.name contains invalid characters.

RegistryError

If registration fails.

Source code in src/yads/registries/filesystem_registry.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def register(self, spec: YadsSpec) -> int:
    """Register a spec and assign it a version number.

    If the spec content matches the latest version (excluding the version
    field), returns the existing version number without creating a new entry.

    Args:
        spec: The YadsSpec to register.

    Returns:
        The assigned or existing version number.

    Raises:
        InvalidSpecNameError: If `spec.name` contains invalid characters.
        RegistryError: If registration fails.
    """
    # Validate spec name
    self._validate_spec_name(spec.name)

    # URL-encode the spec name for filesystem safety
    encoded_name = urllib.parse.quote(spec.name, safe="")

    # Get latest version
    latest_version = self._get_latest_version(encoded_name)

    # Check if content is identical to latest
    if latest_version is not None:
        try:
            latest_spec = self._read_spec(encoded_name, latest_version)
            if self._specs_equal(spec, latest_spec):
                warnings.warn(
                    f"Spec '{spec.name}' content is identical to version "
                    f"{latest_version}. Skipping registration.",
                    DuplicateSpecWarning,
                    stacklevel=2,
                )
                self.logger.warning(
                    f"Duplicate content for '{spec.name}'. "
                    f"Returning existing version {latest_version}."
                )
                return latest_version
        except Exception as e:
            self.logger.debug(f"Could not read latest version for comparison: {e}")

    # Assign new version
    new_version = (latest_version or 0) + 1

    # Write the new version
    try:
        self._write_spec(encoded_name, new_version, spec)
        self.logger.info(f"Registered '{spec.name}' as version {new_version}")
        return new_version
    except Exception as e:
        raise RegistryError(f"Failed to register spec '{spec.name}': {e}") from e