Apache Nifi use cases: Dataflow to Ingestion sample data PDF Free Download

1 / 18
0 views18 pages

Apache Nifi use cases: Dataflow to Ingestion sample data PDF Free Download

Apache Nifi use cases: Dataflow to Ingestion sample data PDF free Download. Think more deeply and widely.

Apache Ni use cases: Dataow to Ingeson
sample data
2
Table of Contents
1 Disclaimer ....................................................................................................................... 3
2 Overview of Apache Nifi Interface ..................................................................................... 3
3 Pipeline organization ........................................................................................................ 5
3.1 Example Structure ................................................................................................... 5
3.2 Use case Hierarchical Process Groups : ................................................................... 5
4 Use case implementation: Ingest Presences table from CDR Database.............................. 6
4.1 Short description ..................................................................................................... 6
4.2 Controller services ................................................................................................... 6
4.3 Data Pipeline Flow ................................................................................................... 9
4.3.1 Flow Overview ...................................................................................................... 9
4.3.2 Flow explanation ................................................................................................ 10
3
1 Disclaimer
NSO may intend to process Call Detail Records (CDRs) obtained from the telecommunicaons
operator.
To demonstrate the use case for mobile data (which may originate from an operator), sample mobile
data has been loaded into a PostgreSQL database. These sample datasets are available as open-
source resources and can be accessed at the following repository:
hps://github.com/alisdair/nodobo-release/tree/master.
2 Overview of Apache Ni Interface
The NiFi User Interface (UI) facilitates the creaon, visualizaon, modicaon, monitoring, and
management of automated dataows. It comprises mulple segments, each serving a disnct
funcon within the applicaon. This secon presents visual representaons of the interface and
outlines its key components.
When a user navigates to the UI for the rst me, a blank canvas is provided on which a dataow can
be built:
The Components Toolbar, located in the upper le poron of the interface, provides a selecon of
elements that can be dragged onto the canvas to construct dataows. A detailed explanaon of each
component is provided in the Building a DataFlow secon.
Posioned below the Components Toolbar, the Status Bar displays real-me system metrics, including
acve processing threads, data volume within the ow, and the state of various process groups and
processors (e.g., transming, running, disabled). In clustered deployments, it also indicates the
number of nodes and their connecvity status. The Status Bar is periodically refreshed, with the
latest update mestamp provided.
On the le side of the screen, the Operate Palee oers controls for user to oversee workows and
for administrators to manage user access and congure system properes, including resource
allocaon for the applicaon.
To the right of the canvas, the Search Funcon enables ecient component discovery by name, type,
idener, or conguraon properes.
4
Adjacent to Search, the Global Menu provides opons (see the screenshot below) for modifying
exisng components within the dataow environment.
The user interface includes navigaon features to facilitate ecient movement across the canvas. The
Navigate Palee enables users to pan and zoom, while the Bird’s Eye View provides an overview of
the dataow, allowing navigaon across extensive secons. Addionally, breadcrumbs at the boom
of the screen display the navigaon path within Process Groups, indicang the hierarchy and depth
of the ow. Each breadcrumb serves as a link, enabling users to seamlessly return to previous levels
within the dataow structure.
5
Certain walkthrough resources
1
, which are further elaborated upon, provide a more comprehensive
overview of the NiFi interface.
3 Pipeline organizaon
Upon logging in, the user is directed to the main canvas, commonly referred to as the NiFi Flow. The
NiFi Flow can be conceptually compared to a home directory on a computer, where organizing data
pipelines is similar to structuring work within folders and subfolders.
In Apache NiFi, the Process Group component funcons similarly to a directory, serving as a logical
container for organizing workows. Eecve pipeline management in Apache NiFi relies primarily on
the structured arrangement of Process Groups and their nested sub-Process Groups.
To ensure an opmal organizaon, it is recommended to structure pipelines into logically dened
Process Groups, categorizing them by department, dataset, and data segments (e.g., tables). This
approach enhances clarity, maintainability, and operaonal eciency in dataow management.
3.1 Example Structure
Nif main low
Department 1 (process group):
Sub-ow (process group) for Dataset A.
Sub-ow (process group) for each table if needed
Sub-ow (process group) for Dataset B.
Sub-ow (process group) for each table if needed
Department 2 (process group):
Sub-ow (process group) for Dataset C.
Sub-ow (process group) for each table if needed
Sub-ow (process group) for Dataset D.
Sub-ow (process group) for each table if needed
3.2 Use case Hierarchical Process Groups :
In our case, we will ingest the "Presences" table from the "CDR" dataset for the Department of
Demographic and Social Stascs.
Below is the hierarchical structure of the project:
1
hps://www.youtube.com/watch?v=nisWXIRXyq0
6
4 Use case implementaon: Ingest Presences table from CDR Database
4.1 Short descripon
This case outlines the process of ingesng data from a PostgreSQL database and implemenng
paroning mechanisms within the data ow. Specically, it details the procedure for incorporang
paron elds into the content of the owle, dynamically generang parons based on owle
content aributes, and eciently storing the structured data in a MinIO bucket using Apache NiFi.
4.2 Controller services
Within the framework of Apache NiFi, a Controller Service constutes a shared service designed to
facilitate the reuse of conguraons and resources across mulple processors, reporng tasks, or
other services within a NiFi data ow. This mechanism enables the centralized management and
governance of conguraons, which is essenal to the interoperability of various system
components.
7
For the present use case, a single Controller Service is required, namely the DBCPConneconPool,
which shall be designated as PG_CDRDB_ConneconPool. The DBCPConneconPool serves as a
Controller Service in Apache NiFi, managing a pool of database connecons ulizing Apache
Commons DBCP (Database Connecon Pooling). This conguraon opmizes database interacons
by enabling NiFi processors to establish ecient and sustainable communicaon with relaonal
databases, thereby migang the need for repeated connecon establishment and terminaon.
Consequently, this approach enhances system performance and ensures opmal resource ulizaon.
To create a DBCPConneconPool, follow these steps:
a. Open Controller Services Panel
Click on the Gear Icon (󼿝󼿞󼿟) at the top-right corner of the “Table: Presences” process group
Conguraon.
Navigate to Controller Services.
b. Add a New Controller Service
Click the "+" buon to add a new Controller Service.
Search for DBCPConneconPool and select it.
Click "Add".
c. Congure the DBCPConneconPool
Click on the CView Conguraon (󼿝󼿞󼿟) buon to congure the connecon pool.
8
Change de default “Name” on « SETTINGS » tab
Enter the following details on « PROPERTIES » tab :
Property
Value
Database Connecon URL
For MySQL: jdbc:mysql://<HOST>:<PORT>/<DATABASE>
For PostgreSQL: jdbc:postgresql://<HOST>:<PORT>/<DATABASE>
For SQL Server: jdbc:sqlserver://<HOST>:<PORT>;databaseName=<DATABASE>
Database Driver Class Name
MySQL: com.mysql.cj.jdbc.Driver
PostgreSQL: org.postgresql.Driver
SQL Server: com.microso.sqlserver.jdbc.SQLServerDriver
Database Driver Locaon(s)
/opt/ni/drivers
Database User
your_username
Password
your_password
Max Total Connecons
10
Validaon Query
SELECT 1
d. Enable the Connecon Pool
Click Apply to save the sengs.
Click on the Enable (󼿳) buon to acvate the service.
9
4.3 Data Pipeline Flow
4.3.1 Flow Overview
When ingesng a database for the rst me, it is common pracce to account for the historical data
already present before scheduling the ingeson of new data at a well-dened frequency. NSO may
process mobile data on a monthly basis.
The data ingeson workow eecvely demonstrates this approach by retrieving both historical and
newly generated data from the "Présences" table each month. The data from the "Présences" table
will be stored as paroned Parquet les, with each paron represenng the volume of data
collected for a given month.
Outlined below are the steps and processors ulized in construcng the data ingeson workow for
the "Présences" table:
10
1- GenerateFlowFile: Generates two owles containing the aributes startDate and endDate,
represenng the inial and current data load parameters.
2- UpdateAribute: Computes and assigns the aribute monthDuraon to the owle using a
predened constant milliSeconds, established in the preceding GenerateFlowFile processor.
3- RouteOnAribute: Evaluates whether the computed startDate does not exceed endDate and
routes the owle accordingly.
4- UpdateAribute: Updates the startDate aribute in the owle to the subsequent month.
5- UpdateAribute: Incorporates dataset paroning aributes into the owle to enable
structured data management.
6- GenerateTableFetch: Constructs an SQL query using startDate and endDate as ltering
criteria.
7- ExecuteSQL: Executes the dynamically generated SQL query to retrieve the required dataset.
8- ConvertAvroToParquet: Converts the extracted Avro le format into Parquet to enhance
storage eciency and processing performance.
9- PutS3Object: Transfers and stores the Parquet le in MinIO to ensure accessibility and long-
term data management.
4.3.2 Flow explanaon
4.3.2.1 GenerateFlowFile (s):
The pipeline begins with two GenerateFlowFile processors, each congured to handle dierent
aspects of data ingeson
1. Conguraon for Historical Data
Three new properes are added to the processor:
startDate: The starng date for retrieving historical data, formaed as YYYY-MM, compable
with the loaded database.
endDate: The ending date for the historical data, formaed as YYYY-MM.
milliSeconds: A constant that will be used later to calculate the duraon of one month.
11
2. Conguraon for New Data
Similarly, three new properes are added to the processor:
startDate: The starng date is automacally determined using Apache NiFi’s expression
language
2
.
startDate = ${now():format('yyyy-MM')}
endDate: The ending date is automacally determined using Apache NiFi’s expression
language.
endDate = ${now():format('yyyy-MM')}
milliSeconds: A constant that will be used later to calculate the duraon of one month.
These two processors are linked to the next stage of the pipeline through the Funnel component,
ensuring a streamlined ow of data.
A Funnel ( )is a component used to merge mulple data ows into a single stream or distribute
data from one source to mulple desnaons.
4.3.2.2 UpdateAribute
2
hps://ni.apache.org/docs/ni-docs/html/expression-language-guide.html
12
This processor is supplied with input from the Funnel ( ) component. In the event of a
successful execution, its output is directed to the RouteOnAttribute processor.
Congure the processor as shown below
Add a new property monthDuraon. This property will later be used as an incremental value in the
loop that iterates through the months from startDate to endDate.
Using Apache NiFi’s expression language, the monthDuraon is calculated in milliseconds based on
the following formula:
monthDuraon = 31 days × 24 hours × 60 minutes × 60 seconds × 1000 milliseconds
monthDuration =
${milliSeconds:toNumber():multiply(31):multiply(24):multiply(60):multiply(60)}
This calculaon ensures that the monthly duraon is consistently represented in milliseconds for
further processing within the data pipeline.
4.3.2.3 RouteOnAribute
Congure the processor as below
13
Within the RouteOnAribute processor in Apache NiFi, a new property notLast has been introduced
using Apache Ni expression language, dened as follows :
notLast = ${startDate:toDate("yyyy-MM"):le(${endDate:toDate("yyyy-MM")})}
This property serves the purpose of evaluang whether the startDate is less than or equal to the
endDate. If startDate is not greater than endDate, the following acons are performed:
Using the UpdateAribute (1) processor, dataset aributes are updated to facilitate the
storage of data as paroned Parquet les in MinIO.
Using the UpdateAribute (2) processor, the startDate is incremented to proceed with the
next data ingeson cycle.
4.3.2.4 UpdateAribute:
Congure the processor as shown below
startDate = ${startDate:toDate("yyyy-
MM"):toNumber():plus(${monthDuration}):format("yyyy-MM")}
This property serves the funcon of dynamically updang the startDate aribute by incremenng it
based on a dened duraon (monthDuraon).
This processor, along with the preceding RouteOnAribute processor, forms a loop that iterates
through each month from startDate to endDate, ensuring the sequenal processing of data over
me.
14
Aer incremenng the startDate property, this processor forwards its output (FlowFile) as a new
input to the previously idened Funnel component.
4.3.2.5 UpdateAribute:
This processor receives as input the FlowFile from the RouteOnAribute processor and updates the
dataset informaon required to query the database and store the data in MinIO.
Congure the processor as shown below
The following aributes must be incorporated to facilitate data paroning:
datasetMonth: The month relevant to the query. This value is automacally extracted from
the startDate aribute using Apache NiFi's expression language.
datasetMonth = ${startDate:toDate("yyyy-MM"):format("MM")}
datasetName: The name of the dataset.
datasetYear: The year corresponding to the dataset month, dynamically derived from the
startDate aribute.
15
datasetYear = ${startDate:toDate("yyyy-MM"):format("yyyy")}
department: The department responsible for the dataset.
tableName: The name of the database table.
These aributes ensure proper organizaon and retrieval of data within the pipeline.
4.3.2.6 GenerateTableFetch
This processor ulizes the aributes dened by the preceding UpdateAribute processor to
construct the database query.
Congure this processor as shown below
The following core properes are modied:
Database Connecon Pooling Service = PG_CDRDB_ConneconPool: The
DBCPConneconPool service, previously created as a prerequisite, which provides a
connecon to the database.
Database Type = PostgreSQL: The database type, which can be PostgreSQL, MySQL, MS SQL
2012+, etc.
Table Name = Presences: The name of the target database table.
16
Addional WHERE Clause: A lter applied to the dataset to retrieve records for a specic
month = date_part('year', created_at) = ${datasetYear} AND
date_part('month', created_at) = ${datasetMonth}
4.3.2.7 ExecuteSQL:
Congure this processor as shown below
The database connecon service must be specied by modifying the Database Connecon Pool
Service property.
The ExecuteSQL processor in Apache NiFi produces data in Avro format
4.3.2.8 ConvertAvroToParquet.
Since the objecve is to store the data in Parquet format, this processor is responsible for converng
the data from Avro format to Parquet format, ensuring compability with the targeted storage and
processing requirements.
Congure the processor as shown below
17
The selected compression type is SNAPPY to priorize query performance
3
4.3.2.9 PutS3Object:
Once the data for the month has been collected and transformed into Parquet format, this processor
will store it in MinIO for further processing and analysis.
Congure the processor as shown below
3
https://dev.to/alexmercedcoder/all-about-parquet-part-05-compression-techniques-in-parquet-4bcb
18
The list of core properes to be modied is as follows:
Object Key: The absolute path of the le.
Object Key =
${department}/${datasetName}/${tableName}/${datasetYear}/${datasetMonth}/da
ta.parquet
Bucket: The name of the bucket.
Access Key ID: The access key generated by MinIO.
Secret Access Key: The secret key generated by MinIO.
Endpoint Override URL: The API URL of MinIO (port 9000).