Skip to content

Module

Module for managing koza runs through the CLI

get_koza_app

Return a KozaApp object for a given source

Parameters:

Name Type Description Default
source_name str

Name of source

required
Source code in src/koza/cli_utils.py
27
28
29
30
31
32
33
34
35
36
def get_koza_app(source_name) -> Optional[KozaApp]:
    """Return a KozaApp object for a given source

    Args:
        source_name (str): Name of source
    """
    try:
        return koza_apps[source_name]
    except KeyError:
        raise KeyError(f"{source_name} was not found in KozaApp dictionary")

get_translation_table

Create a translation table object from two file paths

Parameters:

Name Type Description Default
global_table str

Path to global translation table. Defaults to None.

None
local_table str

Path to local translation table. Defaults to None.

None

Returns:

Name Type Description
TranslationTable TranslationTable

TranslationTable object

Source code in src/koza/cli_utils.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def get_translation_table(
    global_table: Union[str, Dict] = None,
    local_table: Union[str, Dict] = None,
    logger=None,
) -> TranslationTable:
    """Create a translation table object from two file paths

    Args:
        global_table (str, optional): Path to global translation table. Defaults to None.
        local_table (str, optional): Path to local translation table. Defaults to None.

    Returns:
        TranslationTable: TranslationTable object
    """

    global_tt = {}
    local_tt = {}

    if not global_table:
        if local_table:
            raise ValueError("Local table without a global table not allowed")
        else:
            logger.debug("No global table used for transform")
    else:
        if isinstance(global_table, str):
            with open(global_table, "r") as global_tt_fh:
                global_tt = yaml.safe_load(global_tt_fh)
        elif isinstance(global_table, Dict):
            global_tt = global_table

        if local_table:
            if isinstance(local_table, str):
                with open(local_table, "r") as local_tt_fh:
                    local_tt = yaml.safe_load(local_tt_fh)
            elif isinstance(local_table, Dict):
                local_tt = local_table

        else:
            logger.debug("No local table used for transform")

    return TranslationTable(global_tt, local_tt)

transform_source

Create a KozaApp object, process maps, and run the transform

Parameters:

Name Type Description Default
source str

Path to source metadata file

required
output_dir str

Path to output directory

required
output_format OutputFormat

Output format. Defaults to OutputFormat('tsv').

OutputFormat('tsv')
global_table str

Path to global translation table. Defaults to None.

None
local_table str

Path to local translation table. Defaults to None.

None
schema str

Path to schema file. Defaults to None.

None
row_limit int

Number of rows to process. Defaults to None.

None
verbose bool

Verbose logging. Defaults to None.

None
log bool

Log to file. Defaults to False.

False
Source code in src/koza/cli_utils.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def transform_source(
    source: str,
    output_dir: str,
    output_format: OutputFormat = OutputFormat("tsv"),
    global_table: str = None,
    local_table: str = None,
    schema: str = None,
    node_type: str = None,
    edge_type: str = None,
    row_limit: int = None,
    verbose: bool = None,
    log: bool = False,
):
    """Create a KozaApp object, process maps, and run the transform

    Args:
        source (str): Path to source metadata file
        output_dir (str): Path to output directory
        output_format (OutputFormat, optional): Output format. Defaults to OutputFormat('tsv').
        global_table (str, optional): Path to global translation table. Defaults to None.
        local_table (str, optional): Path to local translation table. Defaults to None.
        schema (str, optional): Path to schema file. Defaults to None.
        row_limit (int, optional): Number of rows to process. Defaults to None.
        verbose (bool, optional): Verbose logging. Defaults to None.
        log (bool, optional): Log to file. Defaults to False.
    """
    logger = get_logger(name=Path(source).name if log else None, verbose=verbose)

    with open(source, "r") as source_fh:
        source_config = PrimaryFileConfig(**yaml.load(source_fh, Loader=UniqueIncludeLoader))

    # Set name and transform code if not provided
    if not source_config.name:
        source_config.name = Path(source).stem

    if not source_config.transform_code:
        filename = f"{Path(source).parent / Path(source).stem}.py"
        if not Path(filename).exists():
            filename = Path(source).parent / "transform.py"
        if not Path(filename).exists():
            raise FileNotFoundError(f"Could not find transform file for {source}")
        source_config.transform_code = filename

    koza_source = Source(source_config, row_limit)
    logger.debug(f"Source created: {koza_source.config.name}")
    translation_table = get_translation_table(
        global_table if global_table else source_config.global_table,
        local_table if local_table else source_config.local_table,
        logger,
    )

    koza_app = _set_koza_app(
        koza_source, translation_table, output_dir, output_format, schema, node_type, edge_type, logger
    )
    koza_app.process_maps()
    koza_app.process_sources()

    ### QC checks

    def _check_row_count(type: Literal["node", "edge"]):
        """Check row count for nodes or edges."""

        if type == "node":
            outfile = koza_app.node_file
            min_count = source_config.min_node_count
        elif type == "edge":
            outfile = koza_app.edge_file
            min_count = source_config.min_edge_count

        count = duckdb.sql(f"SELECT count(*) from '{outfile}' as count").fetchone()[0]

        if row_limit and row_limit < min_count:
            logger.warning(f"Row limit '{row_limit}' is less than expected count of {min_count} {type}s")
        elif row_limit and row_limit < count:
            logger.error(f"Actual {type} count {count} exceeds row limit {row_limit}")
        else:
            if count < min_count * 0.7:
                raise ValueError(f"Actual {type} count {count} is less than 70% of expected {min_count} {type}s")
            if min_count * 0.7 <= count < min_count:
                logger.warning(
                    f"Actual {type} count {count} is less than expected {min_count}, but more than 70% of expected"
                )

    # Confirm min number of rows in output
    if hasattr(koza_app, "node_file") and source_config.min_node_count is not None:
        _check_row_count("node")

    if hasattr(koza_app, "edge_file") and source_config.min_edge_count is not None:
        _check_row_count("edge")

validate_file

Runs a file through one of the Koza readers For csv files will return header and row length

For json and jsonl just validates them

Source code in src/koza/cli_utils.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def validate_file(
    file: str,
    format: FormatType = FormatType.csv,
    delimiter: str = ",",
    header_delimiter: str = None,
    skip_blank_lines: bool = True,
):
    """
    Runs a file through one of the Koza readers
    For csv files will return header and row length

    For json and jsonl just validates them
    """

    with open_resource(file) as resource_io:
        if format == FormatType.csv:
            reader = CSVReader(
                resource_io,
                delimiter=delimiter,
                header_delimiter=header_delimiter,
                skip_blank_lines=skip_blank_lines,
            )
        elif format == FormatType.jsonl:
            reader = JSONLReader(resource_io)
        elif format == FormatType.json:
            reader = JSONReader(resource_io)
        else:
            raise ValueError

        for _ in reader:
            pass