This is part 3 of a series on data engineering in a big data environment. It will reflect my personal journey of lessons learnt and culminate in the open source tool Flowman I created to take the burden of reimplementing all the boiler plate code over and over again in a couple of projects.
Part 1: Big Data Engineering — Best Practices
Part 2: Big Data Engineering — Apache Spark
Part 3: Big Data Engineering —Declarative Data Flows
Part 4: Big Data Engineering — Flowman up and running
What to expect
This series is about building data pipelines with Apache Spark for batch processing. But some aspects are also valid for other frameworks or for stream processing. Eventually I will introduce Flowman, an Apache Spark based application that simplifies the implementation of data pipelines for batch processing.
Functional Requirements
Part 1 of the series already pointed out the existence of two types of requirements, which apply to probably almost all kinds of applications: Functional and Non-Functional requirements. Let us first focus on the first type of requirements.
Functional requirements describe the real problem that a solution should solve in the first place. They describe the core functionality and what should be implemented. That can be a data processing application which is required to integrate multiple data sources and perform an aggregation to provide a simplified data model for some data scientists. But the idea of functional requirements also apply to an Android application for booking a trip to different city.
Functional requirements are always written by business experts as proxies of final end user. They should focus on the problem to be solved, such that the developer or architect still can decide how a viable solution should look like. Of course the chosen approach should be validated together with the business experts before implementation.
Note that I specifically wrote that functional requirements should focus on the problem (or task) and not on the solution. This small difference is very important, since it leaves up a bigger design space for the developer to find the best possible solution as opposed to a specific solution given by the business expert, which may be difficult to implement within the given technology.
Non-Functional Requirements
In addition to functional requirements which describe the primary task a piece of software has to implement, there are also non-functional requirements. They describe everything that has to be implemented but is not directly part of the core functionality.
Typical requirements of a data processing pipeline could be among the following list:
- A log containing meaningful information, warnings and error messages is pushed to a central log aggregator.
- Metrics about the number of records processed are published and pushed to a central metric collector.
- The processing of input data generated by upstream systems within a single hour does not take longer than a single hour.
- The application has to run distributed in the existing cluster infrastructure (which could be Kubernetes, YARN, or maybe even Mesos)
- The application should be able to easily scale its processing throughput as the data volume is expected to grow.
All these requirements are very technical so far. Specifically in the case of data processing pipelines, I would add the following requirement:
- The business expert should be able to read and (at least roughly) understand the implemented transformation logic.
This last requirement probably is debatable, but I found out that it is very helpful if fulfilled. Being able to discuss a specific solution with the business expert or stake holder is invaluable as an additional layer of validation and trust. Fortunately in the domain of data processing, business experts often know at least SQL and well understand the concepts of typical transformations like filtering, joining and grouped aggregation. So what we need is a representation of the logic that is simple enough to read.
Separation of Concerns
We have just seen that almost all applications have to implement two different classes of requirements, the functional and the non-functional requirements. A simple but important observation is that within a project or even within a company, the functional requirements for different applications vary from case to case (since each application is to solve a different problem), but the non functional requirements are often the same, at least within a specific application type (like “data processing pipeline” or “web application”).
Every developer should now have a term like “reusable code” and maybe even “separation of concerns” in its mind. Obviously this is the way to go given the situation that most non-functional requirements are the same. If we are able to build a solution where functional and non-functional requirements are implemented at different layers and if we also succeed in sharing the code responsible for all those non-functional requirements, then we have gained a lot:
- Via shared implementation of non-functional requirements, all application will benefit from any improvements.
- All application can be integrated in very similar ways into the whole IT landscape thanks to uniform metrics and logging.
- Operations is also streamlined since knowledge and problem solving strategies about one application can be applied to all other applications using the same shared base layer for addressing non-functional requirements.
- The core business logic implemented as part of the functional requirements is not intermixed with technical details and thus is easier to understand and validate by business experts.
The next section will describe a possible approach to build a solution based on these ideas.
Declarative Data Pipelines
By using an appropriate high level description language of the business logic which then is parsed and executed by a lower data flow execution layer, we can gain all the advantages that I described above. This is the route which I chose to follow for implementing a generic data processing application on top of Apache Spark.
Basically the idea is to use simple YAML files which contain a purely declarative description of all data transformations, similar to deployment descriptions in Kubernetes. Before I will give some small examples, let me explain the benefits of this approach:
- By building on top of Apache Spark, we ensure to take all the benefits of Spark like extensibility, scalability, rich connectors and so on.
- A description language with a potentially higher level of abstraction allows to specify logic at a similar level like the original business requirements (like building a history, extracting subtrees in complex JSON documents etc), which are not directly available at the level of the Spark DataFrame API.
- Using a purely declarative language without any flow control helps to focus on the logical flow and thereby prevents mixing functional and non-functional requirements within a single source file.
- The declarative language can also be used for extracting and providing additional information at a low cost like data lineage or output schemas. This information can be used for automating workflows, schema management and for generating metrics.
Example Data Flow
A simple example data flow is presented in the following sections. The example might look a bit complicated at first sight, but you will see that it also contains lot of valuable detail information, which is required in some way or the other and which can be used for one of the additional benefits mentioned above like automatic schema management.
For this example I will build a small data processing pipeline for working with the “Integrated Surface Dataset” from the NOAA containing weather measurements. It is a very interesting and big dataset containing weather information from around the whole globe and going back until 1901. But it uses a very complex custom format (no CSV), where some basic measurements can be extracted at fixed locations (what we will do). In this example we will not directly access their servers, but I assume that some data is downloaded to some private location (like local filesystem, S3 or maybe HDFS).
1. Source Location
First we need to declare the physical source location, where the raw measurements are stored, after they have been downloaded from the NOAA servers:
relations: measurements-raw: kind: file format: text location: "s3a://dimajix-training/data/weather/" pattern: "${year}" schema: kind: embedded fields: - name: raw_data type: string description: "Raw measurement data" partitions: - name: year type: integer granularity: 1
This definition already contains plenty of information about a source relation called “measurements-raw”:
- kind: file — The data is stored as files
- format: text — The data is stored as text files (each line represents one record)
- location: … — The data is stored in S3 at the specified location
- pattern: ${year} — The location contains partitions, whose directory names are simply of the form $year (partitions will come below)
- schema: … — The data has a specific schema with a single column called “raw_data”
- partitions: … — The relation is partitioned by “year”, i.e. its data is split up into smaller chunks along the axis “year”.
You see, this already contains a lot of information, which all is required (or at least valuable) to work with the relation.
2. Read Data
Now the next step is to read in data from the relation declared above. This can be done with the following mapping specification:
mappings: measurements-raw: kind: read relation: measurements-raw partitions: year: $year
I won’t go over all lines like I did in the first subsection — the code should be rather self-explanatory: This specification instructs to read a single partition $year from the previously defined relation “measurements-raw”. The dollar at the beginning of “$year” indicates that this is a variable, which has to be defined somewhere or explicitly set before execution.
3. Extract Measurements
As I explained at the beginning of the example, some measurements can be extracted at fixed locations with a single record (other measurements are stored at optional and dynamic locations — we don’t care about them here). This can be performed with the following select mapping:
mappings: measurements: kind: select input: measurements-raw columns: usaf: "SUBSTR(raw_data,5,6)" wban: "SUBSTR(raw_data,11,5)" date: "SUBSTR(raw_data,16,8)" time: "SUBSTR(raw_data,24,4)" report_type: "SUBSTR(raw_data,42,5)" wind_direction: "SUBSTR(raw_data,61,3)" wind_direction_qual: "SUBSTR(raw_data,64,1)" wind_observation: "SUBSTR(raw_data,65,1)" wind_speed: "CAST(SUBSTR(raw_data,66,4) AS FLOAT)/10" wind_speed_qual: "SUBSTR(raw_data,70,1)" air_temperature: "CAST(SUBSTR(raw_data,88,5) AS FLOAT)/10" air_temperature_qual: "SUBSTR(raw_data,93,1)"
Now you can see how this mapping refers to the first one via its “input” field and then specifies a list of columns to be generated together with an expression for each of the columns.
4. Target Location
Now we want to write the extracted data into an appropriate Hive table, again partitioned by year. Before specifying the write operation itself, we first need to create a new target relation where the data should be written to. This is specified as follows:
relations: measurements: kind: hiveTable database: "weather" table: "measurements" format: parquet partitions: - name: year type: int schema: kind: mapping mapping: measurements
This relation describes a Hive table called “measurements” within a database called “weather”. Data should be stored as Parquet files, and the schema should be implicitly inferred from the mapping “measurements”, which we have defined in step 3.
5. Build Target
Now that we have an input relation, some extraction logic and a target relation, we need to create a build target, which tells that all records generated by the “measurements” mapping should be written into the “measurements” target (which in turn represents the Hive table with the same name). This can be done as follows:
targets: measurements: kind: relation relation: measurements mapping: measurements partition: year: $year
The build target called “measurements” kind of marries the result of the mapping “measurements” with the relation “measurements” as its target location.
You might ask why we need to explicitly specify both a relation and a build target. The reason simply is that a relation is a logical object that can be used both for reading and for writing (maybe even within the same application). By separating the declaration from the write and read operations, we automatically increase the potential of reuse of a single relation.
6. Build Job
We are almost done, but one last tidbit is still missing. It may well be the case that you create multiple targets for different relations, but you do not want to execute all of them. Therefore a build job is required as the last step, which mainly contains a list of targets to be built. In addition the build job is also a good place to specify runtime parameters which are required for correct execution. In our example, the year would be a natural candidate, since you would want to run the same data flow independently for different years:
jobs: main: parameters: - name: year type: Integer default: 2013 targets: - measurements
Summary
Let’s summarize which details we needed to specify for a working data pipeline:
- Relations. Like in a traditional database, we need to specify the physical data sources where we want to read from or write to. I chose the term “relation” simply because this is precisely the term used in Spark. A relation can refer to files, Hive tables, JDBC sources or any other physical data representation.
- Mappings. Transformations (and read operations) are modeled as mappings. While we only used two very simple types (“read” and “select”), you can imagine any kind of data transformation represented as a mapping.
- Targets. A data processing pipeline normally is comparable with a classical build pipeline — except that it creates new data instead of an application. Therefore I reuse some concepts and terms from build tools. A build “target” represents the amount of work that needs to be performed for writing data to a relation.
- Jobs. Often (especially in batch processing) you not only have a single output, but need to write to multiple outputs with different transformations. Each write operation is represented as a build target, and a collection of those is represented as a job.
To complete the idea of a “data build tool”, Flowman itself also supports build phases, like they are found in Maven:
- Create. The first phase is for creating and/or migrating any relations. This addresses the whole “schema management” part.
- Build. The second phase will execute all write operations implied by the build targets.
- Verify. The verify phase will check that the previously executed write operations actually resulted in some records written to the corresponding relations. The verify phase can also be used to perform some tests as part of unittests.
- Truncate. While the first three phases where about creating relations and data, the truncate phase will perform some cleanup by deleting contents of relations, but it will leave the relations itself (Hive tables, directories, etc) alive.
- Clean. Finally the last phase will also remove the physical relations itself.
We did not explicitly mention the build phases in the specifications above, and you even cannot do that. Build phases are part of the execution of the data pipelines, something I will discuss in the next section.
Executing a Dataflow
I just showed you a small (albeit verbose) example for a simple data flow representing a very basic and typical operation of reading, extracting and storing data. Of course you cannot execute these YAML files directly. What you now need is an application that understands the specific syntax above and which then can execute the job, the target, the read operation, the transformation and the write operation.
Flowman just does this job. This is the open source application that I created for implementing this very specific approach where data flows are separated from the execution logic. It heavily builds upon Apache Spark, and with a little bit experience with Spark, you might well imagine how the YAMLs above are executed with the DataFrame API.
Design Advantages
Maybe the topics which I mentioned in part one and part two of this series now begin to make more sense to you. Or maybe you might feel that this approach is too complicated. In either case let me try to point out some advantages of this design as opposed to a classical application directly using Apache Spark.
By relying on the higher level description language with a specific entity model (relations, mappings, targets and jobs) you ensure a uniform approach for data pipelines. This helps to streamline operations and to follow best practices (at least as I understand them) — actually the approach makes it very hard not to follow them.
Next when implementing a data pipeline, you mainly work on a high level with technical details and possible workarounds hidden in the execution layer (Flowman).
Another advantage of this approach is that many non-functional aspects like schema management (creating and changing Hive tables), providing meaningful runtime metrics etc can be implemented in the execution layer without messing up the specifications. This is accomplished as part of the build phases, which are completely implemented in the execution layer and need not to be specified explicitly in the data flow specification itself.
Finally even technically inclined business experts can understand the YAML files much better than a full blown Spark application (be it written in Scala, Java, Python, …), which also contains much boilerplate code.
Final Words
This was the third part of a series about building robust data pipelines with Apache Spark. This time I presented you my very specific approach for implementing data processing pipelines in a very uniform manner. It did not contain any Spark code (you can view Flowmans source code on GitHub) — which represents my perspective of how data pipelines should look like.
The next part will be about getting Flowman up and running on your local Linux machine.