Skip to content

Module

Module for managing koza runs through the CLI

get_koza_app

Return a KozaApp object for a given source

Parameters:

Name Type Description Default
source_name str

Name of source

required
Source code in src/koza/cli_utils.py
26
27
28
29
30
31
32
33
34
35
def get_koza_app(source_name) -> Optional[KozaApp]:
    """Return a KozaApp object for a given source

    Args:
        source_name (str): Name of source
    """
    try:
        return koza_apps[source_name]
    except KeyError:
        raise KeyError(f"{source_name} was not found in KozaApp dictionary")

get_translation_table

Create a translation table object from two file paths

Parameters:

Name Type Description Default
global_table str

Path to global translation table. Defaults to None.

None
local_table str

Path to local translation table. Defaults to None.

None

Returns:

Name Type Description
TranslationTable TranslationTable

TranslationTable object

Source code in src/koza/cli_utils.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def get_translation_table(
    global_table: Union[str, Dict] = None,
    local_table: Union[str, Dict] = None,
    logger=None,
) -> TranslationTable:
    """Create a translation table object from two file paths

    Args:
        global_table (str, optional): Path to global translation table. Defaults to None.
        local_table (str, optional): Path to local translation table. Defaults to None.

    Returns:
        TranslationTable: TranslationTable object
    """

    global_tt = {}
    local_tt = {}

    if not global_table:
        if local_table:
            raise ValueError("Local table without a global table not allowed")
        else:
            logger.debug("No global table used for transform")
    else:
        if isinstance(global_table, str):
            with open(global_table, "r") as global_tt_fh:
                global_tt = yaml.safe_load(global_tt_fh)
        elif isinstance(global_table, Dict):
            global_tt = global_table

        if local_table:
            if isinstance(local_table, str):
                with open(local_table, "r") as local_tt_fh:
                    local_tt = yaml.safe_load(local_tt_fh)
            elif isinstance(local_table, Dict):
                local_tt = local_table

        else:
            logger.debug("No local table used for transform")

    return TranslationTable(global_tt, local_tt)

transform_source

Create a KozaApp object, process maps, and run the transform

Parameters:

Name Type Description Default
source str

Path to source metadata file

required
output_dir str

Path to output directory

required
output_format OutputFormat

Output format. Defaults to OutputFormat('tsv').

OutputFormat('tsv')
global_table str

Path to global translation table. Defaults to None.

None
local_table str

Path to local translation table. Defaults to None.

None
schema str

Path to schema file. Defaults to None.

None
row_limit int

Number of rows to process. Defaults to None.

None
verbose bool

Verbose logging. Defaults to None.

None
log bool

Log to file. Defaults to False.

False
Source code in src/koza/cli_utils.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def transform_source(
    source: str,
    output_dir: str,
    output_format: OutputFormat = OutputFormat("tsv"),
    global_table: str = None,
    local_table: str = None,
    schema: str = None,
    node_type: str = None,
    edge_type: str = None,
    row_limit: int = None,
    verbose: bool = None,
    log: bool = False,
):
    """Create a KozaApp object, process maps, and run the transform

    Args:
        source (str): Path to source metadata file
        output_dir (str): Path to output directory
        output_format (OutputFormat, optional): Output format. Defaults to OutputFormat('tsv').
        global_table (str, optional): Path to global translation table. Defaults to None.
        local_table (str, optional): Path to local translation table. Defaults to None.
        schema (str, optional): Path to schema file. Defaults to None.
        row_limit (int, optional): Number of rows to process. Defaults to None.
        verbose (bool, optional): Verbose logging. Defaults to None.
        log (bool, optional): Log to file. Defaults to False.
    """
    logger = get_logger(name=Path(source).name if log else None, verbose=verbose)

    with open(source, "r") as source_fh:
        source_config = PrimaryFileConfig(**yaml.load(source_fh, Loader=UniqueIncludeLoader))

    # Set name and transform code if not provided
    if not source_config.name:
        source_config.name = Path(source).stem

    if not source_config.transform_code:
        filename = f"{Path(source).parent / Path(source).stem}.py"
        if not Path(filename).exists():
            filename = Path(source).parent / "transform.py"
        if not Path(filename).exists():
            raise FileNotFoundError(f"Could not find transform file for {source}")
        source_config.transform_code = filename

    koza_source = Source(source_config, row_limit)
    logger.debug(f"Source created: {koza_source.config.name}")
    translation_table = get_translation_table(
        global_table if global_table else source_config.global_table,
        local_table if local_table else source_config.local_table,
        logger,
    )

    koza_app = _set_koza_app(
        koza_source, translation_table, output_dir, output_format, schema, node_type, edge_type, logger
    )
    koza_app.process_maps()
    koza_app.process_sources()

    ### QC checks

    def _check_row_count(type: Literal["node", "edge"]):
        """Check row count for nodes or edges."""

        if type == "node":
            outfile = koza_app.node_file
            min_count = source_config.min_node_count
        elif type == "edge":
            outfile = koza_app.edge_file
            min_count = source_config.min_edge_count

        count = duckdb.sql(f"SELECT count(*) from '{outfile}' as count").fetchone()[0]

        if row_limit and row_limit < min_count:
            logger.warning(f"Row limit '{row_limit}' is less than expected count of {min_count} {type}s")
        elif row_limit and row_limit < count:
            logger.error(f"Actual {type} count {count} exceeds row limit {row_limit}")
        else:
            if count < min_count * 0.7:
                raise ValueError(f"Actual {type} count {count} is less than 70% of expected {min_count} {type}s")
            if min_count * 0.7 <= count < min_count:
                logger.warning(
                    f"Actual {type} count {count} is less than expected {min_count}, but more than 70% of expected"
                )

    # Confirm min number of rows in output
    if hasattr(koza_app, "node_file") and source_config.min_node_count is not None:
        _check_row_count("node")

    if hasattr(koza_app, "edge_file") and source_config.min_edge_count is not None:
        _check_row_count("edge")

validate_file

Runs a file through one of the Koza readers For csv files will return header and row length

For json and jsonl just validates them

Source code in src/koza/cli_utils.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def validate_file(
    file: str,
    format: FormatType = FormatType.csv,
    delimiter: str = ",",
    header_delimiter: str = None,
    skip_blank_lines: bool = True,
):
    """
    Runs a file through one of the Koza readers
    For csv files will return header and row length

    For json and jsonl just validates them
    """

    with open_resource(file) as resource_io:
        if format == FormatType.csv:
            reader = CSVReader(
                resource_io,
                delimiter=delimiter,
                header_delimiter=header_delimiter,
                skip_blank_lines=skip_blank_lines,
            )
        elif format == FormatType.jsonl:
            reader = JSONLReader(resource_io)
        elif format == FormatType.json:
            reader = JSONReader(resource_io)
        else:
            raise ValueError

        for _ in reader:
            pass