Building a poor man’s data lake: Exploring the power of Polars and Delta Lake

Or why you probably do not need databases in the way you think

Introduction

Embarking on the journey of data engineering often feels like navigating a landscape where innovation and repetition go hand in hand. The cyclic introduction of new query engines and MPP databases, each attempting to redefine the familiar staging, historical, and data warehouse layers, can, at times, border on mundane. It’s an environment that rarely sparks excitement for me as I find data science a much more interesting realm. However, the data lake concept was kind of a revelation.

Early in my career, I found myself intrigued by the concept of separating storage from compute in the world of OLAP and analytics. This fascination was fueled by my positive encounter with Snowflake, a platform that elegantly embraced this separation. It led me to ponder the evolution from traditional Data Warehouse (DWH) systems to what felt like a more natural progression—the data lake or the lake house architecture.

In parallel to the rise of micro-service architectures, the notion of breaking down complex structures into more manageable “small microservices” seemed not only logical, but also promising for the field of data engineering. Reflecting on my past experiences with on-premise MSSQL servers managed by IT people and not expert data engineers (which lacked proper version control, integration with identity services, and a general air of disorder with regards of their administration), I couldn’t help but yearn for a more efficient and cost-effective solution.

Also, the practice of employing OLTP systems for executing intricate joins and on-the-fly aggregations struck me as a somewhat misguided use of SQL Server. Furthermore, there was a prevailing belief among some individuals that SQL Server served as a viable option for housing an exorbitant volume of time series data—billions upon billions of records. Setting aside the questionable wisdom of such a notion, I can only imagine that the cloud bill for this SQL Server storage was definitely not appealing to the eyes of rational people. It becomes evident that, at the beginning of my career, my sentiment toward databases was far from favorable.

But what is a data lake and why do we need one?

As I delve into the realm of data engineering, let’s explore the foundations of this field and unravel the advantages that data lakes, particularly, leveraging technologies like Polars and Delta Lake, hold over their traditional data warehouse counterparts. The landscape may be ever-changing, but the essence lies in adapting, evolving, and embracing innovations that promise efficiency and progress in the world of data.

As organizations seek more flexible, scalable, and cost-effective solutions, data lakes emerged as a compelling alternative. A data lake stands as a centralized repository empowering organizations to store their structured and unstructured data at any conceivable scale. This encompasses raw data originating from diverse sources such as PDFs, IoT devices, and business applications. The distinctive feature that sets data lakes apart from their traditional warehouse counterparts is the flexibility to house data in its unaltered state.

However, this flexibility does come at a cost. Ultimately, the pursuit is for high-quality, validated data, and the imposition of schemas through SQL DDLs or data contracts becomes imperative to derive meaningful insights. One of the defining strengths of data lakes lies in their exceptionally low storage costs compared to traditional data warehouse solutions. To illustrate this, consider the Azure SQL Server storage costs

In comparison ADLS Gen2 costs:

Here, the hot storage—likely swift enough to cater to the majority of your analytical workloads—comes at a fraction of the cost, approximately ten times less. Moreover, cloud storage services offer the flexibility to enact life cycle policies, enabling the seamless movement of data across storage tiers based on both the creation date and access frequency. An illustration of such a storage policy is provided below:

{
  "rules": [
    {
      "enabled": true,
      "name": "sample-rule",
      "type": "Lifecycle",
      "definition": {
        "actions": {
          "version": {
            "delete": {
              "daysAfterCreationGreaterThan": 90
            }
          },
          "baseBlob": {
            "tierToCool": {
              "daysAfterModificationGreaterThan": 30
            },
            "tierToArchive": {
              "daysAfterModificationGreaterThan": 90,
              "daysAfterLastTierChangeGreaterThan": 7
            },
            "delete": {
              "daysAfterModificationGreaterThan": 2555
            }
          }
        },
        "filters": {
          "blobTypes": [
            "blockBlob"
          ],
          "prefixMatch": [
            "sample-container/*.avro",
            "sample-container/*.parquet",

          ]
        }
      }
    }
  ]
}
JSON

This can help organizations to dramatically cut their data storage costs over time. Basically, the storage policies will transition all files with *.parquet or *.avro formats to different storage tiers depending on some rules that we can decide upfront.

Big data formats for data lakes

Traditionally, schema-rich formats (.parquet and .avro) have been used to store structure data in the data lake. Parquet is more suitable as columnar storage and Avro is more suitable for row-based formats. These formats allow us to validate or cast our data types to the desired format in the same way that table DDL scripts block data to be ingested in tables that do not fulfill some basic quality rules.

Parquet Example

import polars as pl

# Create a sample Polars DataFrame
data = {'name': ['Alice', 'Bob', 'Charlie'],
        'age': [25, 30, 35],
        'city': ['New York', 'San Francisco', 'Los Angeles']}

df = pl.DataFrame(data)

# Define the desired schema
desired_schema = pl.Schema([
    ('name', pl.DataType.Utf8),
    ('age', pl.DataType.Int64),
    ('city', pl.DataType.Utf8)
])

# Apply the desired schema to the DataFrame
df = df.cast(desired_schema)

# Write the Polars DataFrame to a Parquet file with the specified schema
file_path = 'example.parquet'
df.write_parquet(file_path)
Python

Avro Example

import fastavro
import polars as pl

# Create a sample Polars DataFrame
data = {'name': ['Alice', 'Bob', 'Charlie'],
        'age': [25, 30, 35],
        'city': ['New York', 'San Francisco', 'Los Angeles']}
df = pl.DataFrame(data)

# Convert Polars DataFrame to a list of dictionaries
records = df.to_dict(orient='records')

# Define Avro schema based on the DataFrame
schema = {
    'type': 'record',
    'name': 'example',
    'fields': [
        {'name': 'name', 'type': 'string'},
        {'name': 'age', 'type': 'int'},
        {'name': 'city', 'type': 'string'}
    ]
}

# Serialize the data to Avro format
with open('output.avro', 'wb') as avro_file:
    fastavro.writer(avro_file, schema, records)
Python

One might be tempted to think that these formats come with a catch, given the overhead involved when writing them to storage, particularly when applying heavy compression. However, the efficiency of the writers, coupled with engines that facilitate parallelized writing, mitigates this concern to a considerable extent. In addition, the absence of the need for query engines to compute the schema, results in significantly faster reads in comparison with other formats.

Nevertheless, challenges arise when the need to modify these files appears, or more precisely, when overwriting becomes a necessity. To illustrate this point, consider a Parquet table with the following structure:

/
└── 📁 mydata
├── 📁 year=2022
│ ├── 📁 month=11
│ │ └── 📄 file1
│ │ └── 📄 file2
│ └── 📁 month=12
│ └── 📄 file1
│ │ └── 📄 file2
└── 📁 year=2023
└── 📁 month=1
└── 📄 file1
│ │ └── 📄 file2

If we want to modify the schema of the parquet table, we need to overwrite all of the partitions. These can be problematic, especially if our number of files is big, as the list operations of these files is bottlenecked by the performance of the storage account. This is commonly known as the file listing problem:

Extracted from Delta Lake blog


Moreover, when Parquet files are overwritten, all prior data is lost. Unlike transaction databases, there is no mechanism for rolling back the transaction. If an error occurs during the file writing process, you run the risk of corrupting your table. The delta lake vs parquet blog post is a great resource to learn about the limitations of parquet.

Transactional data lake formats

For a conventional, old-fashioned Data Warehouse (DWH) engineer, the absence of ACID properties in flat files can be akin to a nightmare. After all, databases were originally conceived to address specific needs. Despite the allure of low storage costs, losing the features that make databases invaluable raises concerns. In essence, our ideal file format should possess the ability to:

  • Support DML language, that is: INSERTS, DELETES, UPSERTS and SELECTS.
  • Support schema evolution without having to rewrite the whole table (in the same way you can add a column to a DB by changing the definition of the DDL of a table)
  • Guarantee consistency among others (ACID properties).
  • Support partitioning, clustering and indexes.
  • Support SCDs (slow changing dimensions) in an easy way.

Transactional data lakes were born to overcome these limitations. Currently, the more prominent ones are:

  • Apache Iceberg: Iceberg focuses on providing ACID transactions for large-scale analytical data sets. It introduces features like atomic commits, schema evolution, and consistent snapshots. The table format is designed to be portable across different storage systems, allowing users to switch between systems without rewriting data. Apache iceberg also enables partition evolution (unlike Delta or Hudi).
  • Delta Lake: Delta Lake, from the creators of Spark, addresses the challenges associated with data lake reliability. It offers ACID transactions, schema evolution, and time travel capabilities.
  • Apache Hudi: Hudi is tailored for incremental data processing and simplifying data management in Hadoop-based systems. It supports upserts and deletes at the record level, making it efficient for handling changing data.

Choosing among these formats depends on factors such as your existing technological infrastructure, team expertise, and the specific needs of your business. Each format comes with its own set of advantages, and the decision should align with your organization’s goals and preferences. In this context, I will delve deeper into Delta Lake, considering its prominent position in the landscape.

Why delta lake?

Conceptually, delta lake is an ecosystem. It is an open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs for Scala, Java, Rust, Ruby, and Python.

Fundamentally you can interact with Delta with 2 kind of APIs:

  • Direct Java/Scala/Python APIs – The classes and methods documented in the API docs are considered as stable public APIs. All other classes, interfaces, methods that may be directly accessible in code are considered internal, and they are subject to change across releases. It is worth to mention that the python API mentioned in these docs needs an spark session to execute delta operations.
  • Spark-based APIs – You can read Delta tables through the DataFrameReader/Writer (i.e. spark.readdf.writespark.readStream and df.writeStream). Options to these APIs will remain stable within a major release of Delta Lake (e.g., 1.x.x).

When we take a look to the writer clients that Apache Hudi supports:

As well as Apache Iceberg:

We realized that, at present, there isn’t a straightforward Pythonic method to write to these formats using Python alone, excluding our trusty PySpark. I’m not passing judgment on whether this is positive or negative. I’m merely highlighting that it could potentially create an entry barrier. Beyond this consideration, it’s crucial to acknowledge that we’re constructing a poor man’s data lake. And that means that we do not have neither the budget, nor the data size to spin up a spark cluster to process thousands of TB of data. In fact, tweaking spark for small files is actually something we need to setup properly.

Having said that, the Delta Lake landscape presents intriguing possibilities. Firstly, there’s the remarkable delta-rs package—an implementation of the Delta Lake protocol in Rust, featuring Python bindings. This is what we call “python deltalake” (with no spark dependencies). Additionally, capturing the spotlight is the emerging star of the show: Polars.

Polars is able to write directly to delta.

Python bindings of delta-rs offers more functionality than polars (but merges/upserts should be supported in the next release as my loud complain/Xmas wish was granted)

Hi Ion 🙂

Why polars?

As mentioned above, polars is the new start in the show. Choosing between Polars and Spark for data processing workloads depends on various factors, each with its own set of considerations. Polars stands out as a compelling alternative, particularly for certain use cases.

Polars, built in Rust and featuring Python bindings, offers a more lightweight and memory-efficient solution compared to the more resource-intensive Apache Spark. This becomes especially advantageous when working with moderately sized datasets on machines with limited resources. The performance of Polars is noteworthy, with the ability to handle complex operations efficiently, making it a robust choice for analytical workloads.

One of Polars’ key strengths lies in its native support for parallelization, allowing for faster execution of operations across multiple cores. This parallel processing capability contributes to impressive performance metrics, enabling users to process data at scale with efficiency.

Moreover, Polars’ seamless integration with Pandas, a popular data manipulation library in Python, enhances its appeal. The familiarity of Pandas syntax makes it easier for data scientists and analysts to transition to Polars, streamlining the learning curve.

While Spark is a robust and feature-rich framework suitable for large-scale distributed computing, Polars excels in scenarios where agility, ease of use, and efficient memory utilization are paramount. For tasks involving mid-sized datasets and resource-constrained environments, Polars emerges as a nimble and powerful contender, offering a refreshing alternative to the heavyweight capabilities of Spark.

Finally it is important to mention that polar bears can quack.

A poor man’s data lake

To build a poor man’s data lake we will need the following:

  • A storage service (S3, MinIO, ADLSGen2 etc…)
  • A layered setup based on the data lake house architecture (raw, bronze, silver, gold)
  • Python magic with polars and delta lake.

The storage service is up to the reader to set up. For the remaining parts, I have made a minimal demo repo that you can find here.

The pipeline performs the following actions:

  1. It reads ZIP files from a set of URLs.
  2. It decompresses them and reads the .csv files inside.
  3. It saves them locally as .parquet files in thefile system of the compute engine you are using to run your code.
  4. It uploads them to a landing zone in ADLSGen2.
  5. It reads from the landing zone and then it converts the parquet files to delta and sends them to the bronze layer (append only).
  6. It reads from the bronze layer and performs a merge (without deduplication) against the silver table.
  7. Besides, it shows how to perform Z-orders (you do not need to runs Z orders every time you append a file but this was just for testing).
if __name__ == "__main__":
    t = time.time()
    etl_workflow = ETLPipeline()
    for uri in DOWNLOAD_URIS:
        try:
            etl_workflow.upload_to_landing(uri=uri)
            etl_workflow.raw_to_bronze()
            etl_workflow.bronze_to_silver()
        except Exception as e:
            logger_normal.error(e)
            logger_normal.info("Continuing for next file")
            pass
    logger_normal.info(f'Total runtime was {time.time()-t}')
Python

Results

If you are checking the code you will probably notice some reference to the following environment variables:

LANDING_ZONE_PATH = os.getenv('LANDING_ZONE_PATH')
ACCOUNT_NAME = os.getenv('STORAGE_ACCOUNT_NAME')
BRONZE_CONTAINER = os.getenv('APPEND_LAYER')
SILVER_CONTAINER = os.getenv('HISTORICAL_PATH')
GOLD_CONTAINER = os.getenv('DW_PATH')
Python

This should have values that are meaningful to you, but in my case the storage account structure is as follows:

The landing zone contains all the original CSV files converted to Parquet format. These files will be eliminated by the storage policies mentioned at the beginning of the post after 1 day:

Then we have our append only layer that contains only the files in delta format that matched the schema generated by the first file (as of today there is no possibility to merge schema in Delta Lake without using PySpark):

Finally, the silver layer contains the incrementally merged data:

And in the _delta_log_ folder you can find the transactional log that shows you how the delta table was modified with a merge operation.

{
    "remove": {
        "path": "part-00001-9ec91f4f-f438-41b...c000.snappy.parquet",
        "dataChange": false,
        "deletionTimestamp": 1701785668789,
        "partitionValues": {},
        "size": 80948938
    }
}
{
    "add": {
        "path": "part-00001-2bb8e5fb-65c3-4ca5-9d16...*.parquet",
        "partitionValues": {},
        "size": 57361922,
        "modificationTime": 1701785675935,
        "dataChange": false,
        "stats": "{\"numRecords\":3352527,\"minValues\".....}}",
        "tags": null,
        "deletionVector": null,
        "baseRowId": null,
        "defaultRowCommitVersion": null,
        "clusteringProvider": null
    }
}
{
    "commitInfo": {
        "timestamp": 1701785676006,
        "operation": "OPTIMIZE",
        "operationParameters": {
            "targetSize": "104857600"
        },
        "clientVersion": "delta-rs.0.17.0",
        "readVersion": 5,
        "operationMetrics": {
            "filesAdded": {
                "avg": 57361922.0,
                "max": 57361922,
                "min": 57361922,
                "totalFiles": 1,
                "totalSize": 57361922
.....
}
JSON

It is probably worth to mention that in a 16 cores machine with 64gbs of RAM I am getting an average performance of 134k merge condition evaluations per second. Which basically is around 8M rows per minute, which I think is astonishing.

Final notes

It’s important to acknowledge that this example represents a minimal implementation. We haven’t integrated our code with an orchestrator, utilized any external compute engines, used all the features of delta lake (as file compaction, vacuuming etc…) or established a data catalog connection within the code. Despite these simplifications, my intention is that this post serves as a stepping stone, illustrating that working with transactional data lakes does not need to be overly complex.

If you find merit in this example and wish to contribute, please feel free to open a pull request in the example repository. Lastly, and perhaps most significantly, if you find these technologies intriguing, we’re currently expanding our team, so feel free to reach out to our head of analytics if you are interested in working with us.