Prepared-to-go pattern information pipelines with Dataflow | by Netflix Expertise Weblog | Dec, 2022

Dataflow

Dataflow

$ dataflow --help
Utilization: dataflow [OPTIONS] COMMAND [ARGS]...

Choices:
--docker-image TEXT Url of the docker picture to run in.
--run-in-docker Run dataflow in a docker container.
-v, --verbose Permits verbose mode.
--version Present the model and exit.
--help Present this message and exit.

Instructions:
migration Handle schema migration.
mock Generate or validate mock datasets.
mission Handle a Dataflow mission.
pattern Generate totally useful pattern workflows.

Enterprise Logic

WITH STEP_1 AS (
SELECT
title_id
, country_code
, SUM(view_hours) AS view_hours
FROM some_db.source_table
WHERE playback_date = CURRENT_DATE
GROUP BY
title_id
, country_code
)
WITH STEP_2 AS (
SELECT
title_id
, country_code
, view_hours
, RANK() OVER (
PARTITION BY country_code
ORDER BY view_hours DESC
) AS title_rank
FROM STEP_1
)
WITH STEP_3 AS (
SELECT
title_id
, country_code
, view_hours
, title_rank
FROM STEP_2
WHERE title_rank <= 100
)
CREATE TABLE IF NOT EXISTS $TARGET_DB.dataflow_sample_results (
title_id INT COMMENT "Title ID of the film or present."
, country_code STRING COMMENT "Nation code of the playback session."
, title_rank INT COMMENT "Rank of a given title in a given nation."
, view_hours DOUBLE COMMENT "Complete viewing hours of a given title in a given nation."
)
COMMENT
"Instance dataset delivered to you by Dataflow. For extra data on this
and different examples please go to the Dataflow documentation web page."
PARTITIONED BY (
date DATE COMMENT "Playback date."
)
STORED AS ICEBERG;
 sql> SELECT * FROM foo.dataflow_sample_results 
WHERE date = 20220101 and country_code = 'US'
ORDER BY title_rank LIMIT 5;

title_id | country_code | title_rank | view_hours | date
----------+--------------+------------+------------+----------
11111111 | US | 1 | 123 | 20220101
44444444 | US | 2 | 111 | 20220101
33333333 | US | 3 | 98 | 20220101
55555555 | US | 4 | 55 | 20220101
22222222 | US | 5 | 11 | 20220101
(5 rows)

Parts

.
├── backfill.sch.yaml
├── every day.sch.yaml
├── primary.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py
.
├── backfill.sch.yaml
├── every day.sch.yaml
├── primary.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py
      - job:
id: ddl
sort: Spark
spark:
script: $S3./ddl/dataflow_sparksql_sample.sql
parameters:
TARGET_DB: $TARGET_DB
     - job:
id: metadata
sort: Metadata
metacat:
tables:
- $CATALOG/$TARGET_DB/$TARGET_TABLE
proprietor: $username
tags:
- dataflow
- pattern
lifetime: 123
column_types:
date: pk
country_code: pk
rank: pk
.
├── backfill.sch.yaml
├── every day.sch.yaml
├── primary.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py
      - template:
id: wap
sort: wap
tables:
- $CATALOG/$DATABASE/$TABLE
write_jobs:
- job:
id: write
sort: Spark
spark:
script: $S3./src/sparksql_write.sql
         data_auditor:
audits:
- operate: columns_should_not_have_nulls
blocking: true
params:
desk: $TARGET_TABLE
columns:
- title_id
      - job:
id: hwm
sort: HWM
metacat:
desk: $CATALOG/$TARGET_DB/$TARGET_TABLE
hwm_datetime: $EXECUTION_DATE
hwm_timezone: $EXECUTION_TIMEZONE
.
├── backfill.sch.yaml
├── every day.sch.yaml
├── primary.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py

Languages

$ dataflow pattern workflow --help                                                         
Dataflow (0.6.16)

Utilization: dataflow pattern workflow [OPTIONS] RECIPE [TARGET_PATH]

Create a pattern workflow based mostly on chosen RECIPE and land it within the
specified TARGET_PATH.

Presently supported workflow RECIPEs are: spark-sql, pyspark,
scala and sparklyr.

If TARGET_PATH:
- if not specified, present listing is assumed
- factors to a listing, it will likely be used because the goal location

Choices:
--source-path TEXT Supply path of the pattern workflows.
--workflow-shortname TEXT Workflow quick identify.
--workflow-id TEXT Workflow ID.
--skip-info Skip the information in regards to the workflow pattern.
--help Present this message and exit.

$ cd stranger-data
$ dataflow pattern workflow spark-sql ./sparksql-workflow
.
├── pyspark-workflow
│ ├── primary.sch.yaml
│ ├── every day.sch.yaml
│ ├── backfill.sch.yaml
│ ├── ddl
│ │ └── ...
│ ├── src
│ │ └── ...
│ └── tox.ini
├── scala-workflow
│ ├── construct.gradle
│ └── ...
├── sparklyR-workflow
│ └── ...
└── sparksql-workflow
└── ...
def primary(args, spark):

source_table_df = spark.desk(f"some_db.source_table)

viewing_by_title_country = (
source_table_df.choose("title_id", "country_code",
"view_hours")
.filter(col("date") == date)
.filter("title_id IS NOT NULL AND view_hours > 0")
.groupBy("title_id", "country_code")
.agg(F.sum("view_hours").alias("view_hours"))
)

window = Window.partitionBy(
"country_code"
).orderBy(col("view_hours").desc())

ranked_viewing_by_title_country = viewing_by_title_country.withColumn(
"title_rank", rank().over(window)
)

ranked_viewing_by_title_country.filter(
col("title_rank") <= 100
).withColumn(
"date", lit(int(date))
).choose(
"title_id",
"country_code",
"title_rank",
"view_hours",
"date",
).repartition(1).write.byName().insertInto(
target_table, overwrite=True
)

package deal com.netflix.spark

object ExampleApp
import spark.implicits._

def readSourceTable(sourceDb: String, dataDate: String): DataFrame =
spark
.desk(s"$someDb.source_table")
.filter($"playback_start_date" === dataDate)

def viewingByTitleCountry(sourceTableDF: DataFrame): DataFrame =
sourceTableDF
.choose($"title_id", $"country_code", $"view_hours")
.filter($"title_id".isNotNull)
.filter($"view_hours" > 0)
.groupBy($"title_id", $"country_code")
.agg(F.sum($"view_hours").as("view_hours"))

def addTitleRank(viewingDF: DataFrame): DataFrame =
viewingDF.withColumn(
"title_rank", F.rank().over(
Window.partitionBy($"country_code").orderBy($"view_hours".desc)
)
)

def writeViewing(viewingDF: DataFrame, targetTable: String, dataDate: String): Unit =
viewingDF
.choose($"title_id", $"country_code", $"title_rank", $"view_hours")
.filter($"title_rank" <= 100)
.repartition(1)
.withColumn("date", F.lit(dataDate.toInt))
.writeTo(targetTable)
.overwritePartitions()

def primary():
sourceTableDF = readSourceTable("some_db", "source_table", 20200101)
viewingDf = viewingByTitleCountry(sourceTableDF)
titleRankedDf = addTitleRank(viewingDF)
writeViewing(titleRankedDf)

suppressPackageStartupMessages(
library(sparklyr)
library(dplyr)
)

...

primary <- operate(args, spark) >
ungroup()
primary(args = args, spark = spark)