↔️Formats
Formats are the IO capabilities.
Formats make reading and writing trivial. Currently, the following physical formats are supported:
Iceberg (default Table format)
Delta Lake
Parquet Files
Json Files
CSV Files
Kafka Topics
Additionally Metabolic provides a virtual format named catalog.
Batch vs Streaming formats
Metabolic doesn't restrict the use of a specific format whereas you want to run your entities in Batch or Streaming, as other processing engines like Flink or KSQL do. Given said that, Iceberg, Delta and Kafka are preferred options if you plan on switching a lot between them (for example following a Kappa Architecture) or you plan on having
Iceberg
Iceberg is the default Metabolic Table format, as it provides atomic operations over a data lake, allowing file-based storages to behave like databases.
Iceberg operates by using a catalog for both reading and writing data, rather than relying directly on the underlying filesystem. This approach provides a more flexible and robust way to manage large-scale datasets, as the catalog stores metadata about the tables, including their schema, partitions, and versions.
How to read an Iceberg Source
sources: [
{
catalog: ${dp.database}."my_table"
name: data_lake_my_iceberg_table
format: TABLE
ops: [ ... ]
}
...
]
How to write an Iceberg Table
sink: {
catalog: ${dp.database_silver}."my_table_silver"
}
Iceberg supports append, overwrite, upsert and delete write modes. Upsert need an idColumn param to identify matching rows. Delete write mode only allows full table deletions, so schema and data are fully removed.
Iceberg also supports Schema Evolution.
Schema Evolution support by Write Mode:
Append
âś… Yes
Overwrite (REPLACE)
âś… Yes
Upsert (MERGE INTO)
❌ No
Delete
N/A
Delta Lake
Delta Lake is another powerful table format supported by Metabolic, with automatic optimizations build in.
How to read a Delta Source
sources: [
{
inputPath: ${dp.dl_clean_bucket}/my_input_table/
name: data_lake_clean_my_delta_table
format: DELTA
ops: [ ... ]
}
...
]
How to write a Delta Table
sink: {
outputPath: ${dp.dl_gold_bucket}/my_output_table
format: DELTA
}
Delta supports append, overwrite, upsert and delete write modes. Upsert and delete need an idColumn param to identify matching rows. Upsert optionally supports eventDtColumn to also match identical updates.
For example to upsert on event_id, but maintaining historical evolution (event source without duplicates):
sink: {
outputPath: ${ dp.dl_gold_bucket}/stream_events/
writeMode: upsert
idColumn: event_id
eventDtColumn: event_created_at
format: DELTA
ops: [
...
]
}
Parquet
Parquet is a very popular storage format ideal for data lakes, as it efficiently compresses columnar data for later analysis in Hadoop compatible ecosystems.
How to read a Parquet Source
sources: [
{
inputPath: ${dp.dl_clean_bucket}/my_input_table/
name: data_lake_clean_my_parquet_table
format: PARQUET
ops: [ ... ]
}
...
]
How to write a Parquet Table
sink: {
outputPath: ${dp.dl_gold_bucket}/my_output_table
format: PARQUET
}
Json
Json is another storage format popular in the analytics community, as it safely serializes records while maintaining a very human-readable interface. Metabolic specifically uses the JSON Lines format.
How to read a JSON Source
sources: [
{
inputPath: ${dp.dl_clean_bucket}/my_input_table/
name: data_lake_clean_my_json_table
format: JSON
ops: [ ... ]
}
...
]
Since JSON doesn't force validation when being written, some types can be malformed. In this case you can use the useStringPrimitives option to force to all columns as string type.
sources: [
{
inputPath: ${dp.dl_clean_bucket}/my_input_table/
name: data_lake_clean_my_json_string_table
useStringPrimitives: true
format: JSON
ops: [ ... ]
}
...
]
How to write a Json Table
sink: {
outputPath: ${dp.dl_gold_bucket}/my_output_table
format: JSON
}
CSV
CSV is another storage format popular in the analytics community, that stores records in a tabular way making it very compatible with broader audience that use Microsoft Excel and alternates. Metabolic requires CSV files to provide a header in the first line:
Year,Make,Model
1997,Ford,E350
2000,Mercury,Cougar
How to read a CSV Source
sources: [
{
inputPath: ${dp.dl_clean_bucket}/my_input_table/
name: data_lake_clean_my_csv_table
format: CSV
ops: [ ... ]
}
...
]
How to write a CSV Table
sink: {
outputPath: ${dp.dl_gold_bucket}/my_output_table
format: CSV
}
Kafka
Kafka topics are stream storages for events.
How to read a Kafka Source
sources: [
{
topic: production.pub.raw.my_events
name: data_lake_raw_kafka_events
format: KAFKA
ops: [ ... ]
}
...
]
If your Kafka is protected by some authentication you can add the variable kafkaSecret to pass a resolver. Currently Metabolic only support AWS SecretManager as a resolver, but in general you need a json with server, api key and secret.
sources: [
{
format: KAFKA
topic: "production.pub.raw.my_events"
kafkaSecret: "production/kafka"
name: "data_lake_raw_kafka_events"
ops: [ ... ]
}
...
]
How to write a Kafka Topic
sink: {
format: KAFKA
topic: production.pub.gold.my_events
name: data_lake_gold_kafka_events
}
name is required in streaming jobs in order to create a checkpoint.
Last updated