Skip to content

kamihi.datasources ⚓︎

Data sources module for Kamihi.

License

MIT

Modules:

Name Description
datasource

Base class for data sources in Kamihi.

postgres

PostgreSQL datasource module.

sqlite

SQLite datasource module.

Classes:

Name Description
DataSource

Base class for data sources in Kamihi.

DataSourceConfig

Configuration model for data sources.

PostgresDataSource

PostgreSQL data source implementation.

PostgresDataSourceConfig

Configuration model for PostgreSQL data sources.

SQLiteDataSource

SQLite data source implementation.

SQLiteDataSourceConfig

Configuration model for SQLite data sources.

DataSource ⚓︎

DataSource(settings: DataSourceConfig)

Base class for data sources in Kamihi.

This class serves as a template for creating specific data source implementations. It defines the basic structure and methods that all data sources should implement.

Initialize the DataSource with a name.

Parameters:

Name Type Description Default

settings ⚓︎

str

The name of the data source.

required

Methods:

Name Description
connect

Connect to the data source.

disconnect

Disconnect from the data source.

fetch

Asynchronously fetch data from the data source.

get_datasource_class

Get the data source class by its type name.

Source code in src/kamihi/datasources/datasource.py
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(self, settings: DataSourceConfig) -> None:
    """
    Initialize the DataSource with a name.

    Args:
        settings (str): The name of the data source.

    """
    self.settings = settings

connect async ⚓︎

connect() -> None

Connect to the data source.

This method can be implemented, if needed, to establish a connection to the data source. By default, it does nothing.

Source code in src/kamihi/datasources/datasource.py
104
105
106
107
108
109
110
111
112
async def connect(self) -> None:  # noqa: ANN002, ANN003, ARG002
    """
    Connect to the data source.

    This method can be implemented, if needed, to establish a connection
    to the data source. By default, it does nothing.

    """
    raise NotImplementedError("Subclasses must implement this method.")

disconnect async ⚓︎

disconnect() -> None

Disconnect from the data source.

This method can be implemented, if needed, to close the connection to the data source. By default, it does nothing.

Source code in src/kamihi/datasources/datasource.py
131
132
133
134
135
136
137
138
139
async def disconnect(self) -> None:
    """
    Disconnect from the data source.

    This method can be implemented, if needed, to close the connection
    to the data source. By default, it does nothing.

    """
    raise NotImplementedError("Subclasses must implement this method.")

fetch async ⚓︎

fetch(request: Path | str) -> Any

Asynchronously fetch data from the data source.

This method should be implemented by subclasses to define how data is retrieved from the specific data source in an asynchronous manner.

Parameters:

Name Type Description Default

request ⚓︎

Path | str

The request to fetch data from the data source. This could be a file path, URL, or any identifier for the data.

required

Returns:

Type Description
Any

The fetched data.

Source code in src/kamihi/datasources/datasource.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def fetch(self, request: Path | str) -> Any:  # noqa: ANN002, ANN003, ANN401
    """
    Asynchronously fetch data from the data source.

    This method should be implemented by subclasses to define how data is retrieved
    from the specific data source in an asynchronous manner.

    Args:
        request (Path | str): The request to fetch data from the data source.
            This could be a file path, URL, or any identifier for the data.

    Returns:
        The fetched data.

    """
    raise NotImplementedError("Subclasses must implement this method.")

get_datasource_class classmethod ⚓︎

get_datasource_class(
    type_name: str,
) -> type[DataSource] | None

Get the data source class by its type name.

Parameters:

Name Type Description Default

type_name ⚓︎

str

The type name of the data source.

required

Returns:

Type Description
type[DataSource] | None

type[DataSource] | None: The data source class if found, otherwise None.

Source code in src/kamihi/datasources/datasource.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@classmethod
def get_datasource_class(cls, type_name: str) -> type["DataSource"] | None:
    """
    Get the data source class by its type name.

    Args:
        type_name (str): The type name of the data source.

    Returns:
        type[DataSource] | None: The data source class if found, otherwise None.

    """
    if not cls._registry:
        cls._build_registry()
    return cls._registry.get(type_name)

DataSourceConfig ⚓︎

Bases: BaseModel

Configuration model for data sources.

This model defines the configuration schema for data sources in Kamihi. It includes the name of the data source and any additional parameters required for its initialization.

Attributes:

Name Type Description
name str

The name of the data source. Must be unique across all data sources.

Methods:

Name Description
union_type

Get a union type of all registered data source configuration classes.

union_type classmethod ⚓︎

union_type() -> type[Annotated] | NoneType

Get a union type of all registered data source configuration classes.

Returns:

Type Description
type[Annotated] | NoneType

Union[type[DataSourceConfig], ...]: A union type of all registered data source configuration classes.

Source code in src/kamihi/datasources/datasource.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@classmethod
def union_type(cls) -> type[Annotated] | NoneType:
    """
    Get a union type of all registered data source configuration classes.

    Returns:
        Union[type[DataSourceConfig], ...]: A union type of all registered data source configuration classes.

    """
    if not cls._registry.values():
        cls._build_registry()
    return (
        Annotated[
            Union[tuple(cls._registry.values())],  # noqa: UP007
            Field(discriminator="type"),
        ]
        if cls._registry
        else NoneType
    )

PostgresDataSource ⚓︎

PostgresDataSource(settings: PostgresDataSourceConfig)

Bases: DataSource

PostgreSQL data source implementation.

This class implements the DataSource interface for connecting to and interacting with a PostgreSQL database.

Initialize the PostgresDataSource with the provided configuration.

Parameters:

Name Type Description Default

settings ⚓︎

PostgresDataSourceConfig

The configuration for the PostgreSQL data source.

required

Methods:

Name Description
connect

Initialize the connection pool for the PostgreSQL database.

disconnect

Disconnect from the PostgreSQL database asynchronously.

fetch

Fetch data asynchronously from the PostgreSQL database.

Attributes:

Name Type Description
NamedRecord type

Create a named record class for asyncpg records.

Source code in src/kamihi/datasources/postgres.py
101
102
103
104
105
106
107
108
109
110
111
112
@requires("postgresql")
def __init__(self, settings: PostgresDataSourceConfig) -> None:
    """
    Initialize the PostgresDataSource with the provided configuration.

    Args:
        settings (PostgresDataSourceConfig): The configuration for the PostgreSQL data source.

    """
    super().__init__(settings)
    self.settings = settings
    self._logger = logger.bind(datasource=settings.name, type=settings.type)

NamedRecord cached property ⚓︎

NamedRecord: type

Create a named record class for asyncpg records.

connect async ⚓︎

connect(*args, **kwargs) -> None

Initialize the connection pool for the PostgreSQL database.

This method is called to set up the connection pool for the PostgreSQL database. It uses asyncpg to create a connection pool with the provided settings.

Raises:

Type Description
RuntimeError

If asyncpg is not installed.

ConnectionError

If the connection pool initialization fails.

Source code in src/kamihi/datasources/postgres.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
async def connect(self, *args, **kwargs) -> None:  # noqa: ANN002, ANN003, ARG002
    """
    Initialize the connection pool for the PostgreSQL database.

    This method is called to set up the connection pool for the PostgreSQL database.
    It uses asyncpg to create a connection pool with the provided settings.

    Raises:
        RuntimeError: If asyncpg is not installed.
        ConnectionError: If the connection pool initialization fails.

    """
    import asyncpg

    if self._pool is not None:
        self._logger.warning("Connection pool already initialized, skipping re-initialization")
        return

    try:
        self._pool = await asyncpg.create_pool(
            host=self.settings.host,
            port=self.settings.port,
            database=self.settings.database,
            user=self.settings.user,
            password=self.settings.password,
            min_size=self.settings.min_pool_size,
            max_size=self.settings.max_pool_size,
            timeout=self.settings.timeout,
            record_class=self.NamedRecord,
        )
        self._logger.info("Connected to {datasource}", datasource=self.settings.name)
    except asyncpg.PostgresError as e:
        raise ConnectionError("Failed to initialize connection pool") from e

disconnect async ⚓︎

disconnect() -> None

Disconnect from the PostgreSQL database asynchronously.

This method closes the connection pool if it is initialized.

Source code in src/kamihi/datasources/postgres.py
171
172
173
174
175
176
177
178
179
180
181
182
async def disconnect(self) -> None:
    """
    Disconnect from the PostgreSQL database asynchronously.

    This method closes the connection pool if it is initialized.

    """
    if self._pool is not None:
        self._logger.trace("Closing connection pool...")
        await self._pool.close()
        self._logger.info("Disconnected")
        self._pool = None

fetch async ⚓︎

fetch(request: Path | str) -> list[NamedRecord]

Fetch data asynchronously from the PostgreSQL database.

This method executes a SQL request from a file and returns the results.

Parameters:

Name Type Description Default

request ⚓︎

Path | str

The path to the SQL request file or the SQL query as a string.

required

Returns:

Type Description
list[NamedRecord]

list[NamedRecord]: The results obtained from the SQL request.

Source code in src/kamihi/datasources/postgres.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
async def fetch(self, request: Path | str) -> list[NamedRecord]:
    """
    Fetch data asynchronously from the PostgreSQL database.

    This method executes a SQL request from a file and returns the results.

    Args:
        request (Path | str): The path to the SQL request file or the SQL query as a string.

    Returns:
        list[NamedRecord]: The results obtained from the SQL request.

    """
    if not self._pool:
        raise RuntimeError("Connection pool is not initialized. Call connect() first.")

    with self._logger.contextualize(request=str(request)), timer(self._logger, "Executed command"):
        async with self._pool.acquire() as conn:
            self._logger.trace("Acquired connection from pool")
            results = await conn.fetch(anyio.read_text() if isinstance(request, Path) else request)
            self._logger.trace("Fetched {results} results from datasource", results=len(results))
    return results

PostgresDataSourceConfig ⚓︎

Bases: DataSourceConfig

Configuration model for PostgreSQL data sources.

This model extends the base DataSourceConfig to include specific parameters required for connecting to a PostgreSQL database.

Attributes:

Name Type Description
host str

The hostname of the PostgreSQL server.

port int

The port number of the PostgreSQL server.

database str

The name of the PostgreSQL database to connect to.

user str

The username for connecting to the PostgreSQL database.

password str

The password for the specified user.

min_pool_size int

The minimum number of connections in the pool.

max_pool_size int

The maximum number of connections in the pool.

timeout int

The timeout duration for establishing connections, in seconds.

SQLiteDataSource ⚓︎

SQLiteDataSource(settings: SQLiteDataSourceConfig)

Bases: DataSource

SQLite data source implementation.

This class implements the DataSource interface for connecting to and interacting with an SQLite database.

Attributes:

Name Type Description
type Literal['sqlite']

The type of the data source, which is "sqlite".

settings SQLiteDataSourceConfig

The configuration for the SQLite data source.

Initialize the SQLiteDataSource with the provided configuration.

Parameters:

Name Type Description Default

settings ⚓︎

SQLiteDataSourceConfig

The configuration for the SQLite data source.

required

Methods:

Name Description
connect

Connect to the SQLite database.

disconnect

Disconnect from the SQLite database.

fetch

Fetch data from the SQLite database.

Source code in src/kamihi/datasources/sqlite.py
86
87
88
89
90
91
92
93
94
95
96
97
@requires("sqlite")
def __init__(self, settings: SQLiteDataSourceConfig) -> None:
    """
    Initialize the SQLiteDataSource with the provided configuration.

    Args:
        settings (SQLiteDataSourceConfig): The configuration for the SQLite data source.

    """
    super().__init__(settings)
    self.settings = settings
    self._logger = logger.bind(datasource=settings.name, type=settings.type)

NamedRecord cached property ⚓︎

NamedRecord: type

Create a named record class for aiosqlite rows.

connect async ⚓︎

connect() -> None

Connect to the SQLite database.

This method establishes a connection to the SQLite database specified in the settings. It uses the aiosqlite library for asynchronous database operations.

Source code in src/kamihi/datasources/sqlite.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
async def connect(self) -> None:
    """
    Connect to the SQLite database.

    This method establishes a connection to the SQLite database specified in the
    settings. It uses the aiosqlite library for asynchronous database operations.

    """
    import aiosqlite

    if self._db is not None:
        self._logger.warning("Already connected, skipping re-initialization")
        return

    try:
        self._db = await aiosqlite.connect(self.settings.path)
        self._db.row_factory = self.NamedRecord
        self._logger.info("Connected to {datasource}", datasource=self.settings.name)
    except aiosqlite.Error as e:
        raise RuntimeError("Failed to connect") from e

disconnect async ⚓︎

disconnect() -> None

Disconnect from the SQLite database.

This method closes the connection to the SQLite database.

Source code in src/kamihi/datasources/sqlite.py
146
147
148
149
150
151
152
153
154
155
156
157
async def disconnect(self) -> None:
    """
    Disconnect from the SQLite database.

    This method closes the connection to the SQLite database.

    """
    if self._db:
        self._logger.trace("Closing database connection...")
        await self._db.close()
        self._logger.info("Disconnected")
        self._db = None

fetch async ⚓︎

fetch(request: Path | str) -> list[NamedRecord]

Fetch data from the SQLite database.

This method executes a query against the SQLite database and returns the results.

Parameters:

Name Type Description Default

request ⚓︎

Path | str

The SQL query to execute. This can be a path to a SQL file or a raw SQL string.

required

Returns:

Name Type Description
Any list[NamedRecord]

The results of the query.

Source code in src/kamihi/datasources/sqlite.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
async def fetch(self, request: Path | str) -> list[NamedRecord]:
    """
    Fetch data from the SQLite database.

    This method executes a query against the SQLite database and returns the results.

    Args:
        request (Path | str): The SQL query to execute. This can be a path to a SQL file
                              or a raw SQL string.

    Returns:
        Any: The results of the query.

    """
    if not self._db:
        raise RuntimeError("Database connection is not established. Call connect() first.")

    with self._logger.contextualize(request=str(request)), timer(self._logger, "Executed command"):
        async with self._db.execute(
            request if isinstance(request, str) else await anyio.Path(request).read_text()
        ) as cursor:
            self._logger.trace("Created cursor and executed query")
            results = await cursor.fetchall()
            self._logger.trace("Fetched {results} results from datasource", results=len(results))
    return results

SQLiteDataSourceConfig ⚓︎

Bases: DataSourceConfig

Configuration model for SQLite data sources.

This model extends the base DataSourceConfig to include specific parameters required for connecting to an SQLite database.

Attributes:

Name Type Description
type Literal['sqlite']

The type of the data source, which is "sqlite".

path str | Path

The path to the SQLite database file.