An Operation (Op for short) is a recipe used by Data Specialists (like Data Engineers or Data Analysts), made available to other technical profiles.
Last updated
Source Ops
Filter Op
Restricts the input to a dynamic temporal window, based on a date time column. Work very well with placeholders.
op.filter = {
onColumn = "date_time"
from = ${df.start_of_yesterday}
to = ${df.now}
}
Filter Op Use Case: Incremental Loads
Most modern sources contain a way to rapidly identify new or updated values. For example adding an updated_at column in your table or storing inmutable events with an autoincremental event_id column.
This unlocks some business cases where you want to store those deltas to keep track of the history of that table or update a resulting insight with only new information.
Filter Op can transform the input source to a CTE that only contains new data from the period specify between from and to. For example you can filter between the 00:00:00.000 from yesterday to 00:00:00.000 of today to calculate a cummulative KPI values for the last 24 hours. And since it is deterministic you can run it multiples times and will produce the same result as long as the input data does not change.
Prune Op
Restricts the input to a dynamic temporal window, based on a their temporal partitions. Work very well with placeholders.
op.prune = {
from = ${df.start_of_yesterday}
to = ${df.now}
}
You can use both op.prune + op.filter to increase efficiency of filtering on very large sources.
Prune Op Use Case: Filtering Json Files
In previous use case we've seen how filtering is a great technique to reduce the scope of some sources. But many times you don't have these columns you can use, or you have them but you would need to scan the full table.
Prune Op can transform the input source into a CTE that only contains data with matching partitions from the period specify between from and to.
Dedupe Op
Removesduplicates based on specified unique identifier Id criteria and by most recent Order criteria. This always returns one record per id group, so no
This recipie is know as deduplication and allows to obtain the last unique state of a source.
Dedupe Op Use Case: Gold Layers
Most Data Lakes are a historical repository of data, meaning the inner layers of a datalake contain multiple versions of a curated entity. Sometimes though, its interesting to expose only the last version of that entity, specially in gold layers open to everyone to avoid confusion.
Using dedupe on a curated source and just applying a select * from deduped_source mapping with a replace mode sink, will recreate the last version of that source everytime you run the job.
Expression Op (since 0.10.0+)
Adds a new column based on a select expression. A mini mapping before the mapping or any other ops.
Singleexpression is also supported, but not recommended (and might be deprecated in future versions)
op.expr = {
expression = "date(updated_at)"
}
Expression Op Use Case: Filtering Unformatted Data
For most transformations you don't want the whole data, but in order to use filter ops you need a properly iso formatted date time column, which might not be the case when processing raw data.
Even though you could create a two step mapping, Expression Op empowers to apply this fast pre-transformation of the data in order to use it in another op, like filter, dedupe or demulti, so you end up with just one curated entity, while respecting
Drop Op (since 0.10.0+)
Removes an existing column. A mini mapping before the mapping or any other ops.
op.expr = {
columns = ["email", "phone"]
}
Singledrop is also supported, but not recommended (and might be deprecated in future versions)
drop = {
column = "id"
}
Drop Op Use Case: Sensible Data
Certain entities might store sensible information. Fields like emails, ids, adresses, ips and many more are considered Personal Indentificable Information(PII) which is regulated in many countries.
Several strategies are well known in the industry, like tagging the metadata to quickly identify this information or aggregate to reduce the granularity and make it more difficult to trace it back to certian individuals.
But as developers, most of the time we have one more way: Simply remove them if they are not going to be used downstream: In metabolic you have two options to remove a column, either through a drop op or use a specific select statement to select only the desired columns.
Since most of use use a select * , drop op is a very safe way to minimize exposing sensible fields.
Demulti Op
Generatesduplicates off a new specified time dimension (Day, Month or Year) named period.
op.demulti = {
dateColumn = "updated_at"
from = "${df.previous_first_day_of_year}"
to = ${df.now}
format = "Month"
idColumns = ["id"]
orderColumns = ["extracted_at"]
endOfMonth = true
}
Parameter
Description
Required|Default
dateColumn
Name of a timestamp column to generate the cohorts from
Required String
from
Timestamp of the first cohort.
Required (ISO)
to
Timestamp of the last cohort
Required (ISO)
format
Cohort period
"Day", "Month" or "Year"
idColumns
Colum names to deduplicate your rows on.
Array with one value.
orderColumns
Column names to order matching duplicates
Empty array
Demulti Op Use Case: Generating Monthly Status
WIP
Flatten Op
Flattens structures into columns.
op.flatten {
column = "_airbyte_data"
}
If column is not specified it will flatten all the struct columns into parent_child column names with the right format. If specified it will flatten only that parent column naming new columns directly with child names.
Flatten Use Case: Working with Airbyte Data
When using Airbyte with Datalakes (fileformat) , Airbytes uses structures to manage source data schema evolution. Usually creating one _airbyte_data field that requires quering _airbyte_data.property_1 on all subsequent queries. This op will flatten this structure to obtain the original schema and queries will run natively.
Watermark Op
Constraints how further in time a source will be considered.
In general used to limit the scope of the source to a certain time window. This is useful for sources that are not append only, like a table that is updated with new data, but also has old data that is not relevant anymore. This is a must in streaming modes especially when joining with other sources so the join window is not too large, and the job runs out of memory waiting for a possible source delay.
Mapping Operations
Mapping Operations have all the context of the sources and can be exectued before (preOps section) or after (postOps section) the SQL Mapping.
Intervals Op (Beta)
Generatesintervalicstates off two sources with unrelated time dimensions.
This Operation produces a CTE with the two tables joined, yieliding a row for each state that happened in the past.
Intervals Op Use Case: Joining Updates (Events)
Classic joins assume that either there's no temporal implications or that it exists a temporal window to match the records.
This makes very hard to answer questions like: What was the plan's price back to when this customer was subscribe to it? Specially because both plans and customers are mutablein time, but they do not share a specific window between timelines. In summary: both plans and customers updatesare events.
You can use Interval Op to generate a single source of customer subscription to plans that contains all the states of the records that existed in the past. From there becomes so much trivial use your analytical SQL to obtain your desired insights.
Sink Operations (0.15.0+)
There are two groups of sink operation, transformative and operational:
TransformativeOps modify the behaviour similarly to Source Ops:
Flatten Use Case: Creating a bronze layer of Debezium Data.
When ingesting data into the datalake using debezium you will notice that you add extra information that is usally not very relevant for final users.
Applying Flatten Op at Sink level allows to generate queryable layer of relevant fields (like payload.after) in a generic way, that helps build bronze layers (or the first actionable layer after the landing/raw layer)
Operational Ops do not modify the contents but the behaviour of how sinks are applied:
Columns Partition Op
Defines which existing columns should be used for partitioning.
op.cols_partition {
cols = ["a","b","c"]
}
Columns Partition Use Case: Optimizing Dashboards Filters
Modern BI / Dashboarding tools that not preprocess data like Looker, Superset or Metabase rely on speed of the query engine to filter down the results. Having a partition on common filtered columns can heavily increase performance.
Date Partition Op
Defines which date columns should be used for partitioning into date time components automatically.
op.date_partition {
col = "updated_at"
}
Date Partition User Case:
Schema Management Op
If you want metabolic to make all changes backwards compatible you can use manage_schema op to automatically version your sinks.\