The Need for New Low-Level API for Lakehouse-Centric Compute Engines

The Need for New Low-Level API for Lakehouse-Centric Compute Engines

In this post, I will argue that today’s query engines, such as Apache Spark, do not effectively interact with lake houses due to limitations in handling object storage semantics, lack of integration with catalog systems like Iceberg for governance and lineage, and inadequate support for lakehouse-specific features. I believe there is a need for a new class of lakehouse-native query engine APIs with explicit semantics for object storage and open table formats.

This blog explores the fundamental principles such APIs should embody, illustrates them with practical examples, and demonstrates how high-level wrappers can be constructed to abstract low-level configurations, leading to powerful abstractions. However, readers should not overfit the examples as the exact design suggestions for the API. Instead, they should focus on the need for a new design and how it can change the experience for developers building on the lakehouse.

Where we are today

As data volumes grow exponentially, modern data architectures increasingly rely on object storage and lakehouse systems to manage and process large datasets efficiently. However, building applications on the lakehouse architecture is challenging because developers must repeatedly handle tasks like governance, lineage, schema management, caching, and indexing—adding complexity and reducing productivity.

Traditional data processing frameworks often struggle to interact effectively with these technologies due to limitations in handling object storage semantics, lack of integration with catalog systems like Iceberg for governance and lineage, and inadequate support for lakehouse-specific features. This gap necessitates the development of new low-level APIs explicitly designed for lakehouse and object storage environments within distributed compute engines.

What would new lakehouse-native APIs look like?

Unlike RDDs, which were initially not built for the lakehouse world, these new APIs must integrate with catalog systems like Iceberg to provide out-of-the-box governance, lineage, schema support, and optimizations. Such primitives enable building robust high-level systems—like SQL engines, machine learning frameworks, and other analytics tools—on top of the APIs, much like how RDDs form the foundation of Apache Spark. 

Moreover, features like metadata access in these APIs can be leveraged to build advanced functionalities, such as partition hints for high-level APIs. Additionally, these new low-level APIs can add a caching layer, which is not inherently supported by the Lakehouse protocol. This caching capability can significantly enhance performance by reducing redundant data retrievals.

Caching as a separate layer is critical for improving efficiency in distributed data processing environments. Additionally, indexing, unavailable in the lakehouse protocol, can significantly speed up applications built on top by providing faster data access and query optimizations.

By designing these new lakehouse-first low-level APIs, everything built on top gains the advantages of the lakehouse paradigm for free, including schema evolution, governance, lineage tracking, and performance optimizations. Not only does this help in building complex systems like SQL processing, but allows developers to create applications with superior performance and user experience out of the box without having to explicitly handle governance, lineage, schema, caching, indexing, batching data, and so on. The design, advantages, and shortcomings of lakehouse formats on object storage make these low-level APIs beneficial and necessary.

Let’s now explore what a lakehouse-native query engine API might look like.


1. Optimized Data Writing and Batching for Object Storage

a. Configurable Batching

Low-Level API Example:

When ingesting streaming data from IoT devices into an object store like Amazon S3, writing each data point individually can cause performance degradation due to the small file problem. A new low-level API can offer configurable batching tailored for object storage.

# Low-level API usage with explicit configurations
data_stream.write.format('lakehouse') \
    .option('batchSize', '100MB') \
    .option('batchInterval', '5min') \
    .save('s3://data-lake/sensors/')

Explanation:

  • Batch Size and Interval: Configures how data is batched before writing.
  • Benefit: Optimizes write performance and reduces storage costs.

High-Level Wrapper Usage:

A high-level wrapper can abstract these configurations to simplify this process, providing sensible defaults.

# High-level wrapper simplifies the write operation
data_stream.write_to_lakehouse('sensors')

Under the Hood:

The high-level method encapsulates the low-level configurations:

def write_to_lakehouse(self, dataset_name, batch_size='100MB', batch_interval='5min'):
    self.write.format('lakehouse') \
        .option('batchSize', batch_size) \
        .option('batchInterval', batch_interval) \
        .save(f's3://data-lake/{dataset_name}/')

Benefit:

  • Ease of Use: Developers use a simple method without worrying about batching configurations.
  • Flexibility: Defaults can be overridden if necessary.
  • Consistency: Enforces best practices across the team.

b. Efficient Commit Strategies

Low-Level API Example:

Efficiently committing data is crucial when performing large-scale transformations.

# Low-level API with explicit transaction management
with data_api.transaction() as txn:
  transformed_data = data_api.read('s3://data-lake/raw/') \
   .transform(custom_transformation)
  txn.write(transformed_data, 's3://data-lake/processed/')
  txn.commit()
  

Explanation:

Transactional Writes: Ensures data integrity by committing only after successful processing.

High-Level Wrapper Usage:

A high-level wrapper can handle transactions implicitly.

# High-level wrapper handles transaction internally
data_api.process_and_save('raw', 'processed', custom_transformation)

# Under the hood
def process_and_save(self, source_dataset, target_dataset, transformation_func):
  with self.transaction() as txn:
    data = txn.read(f's3://data-lake/{source_dataset}/')
    transformed_data = data.transform(transformation_func)
    txn.write(transformed_data, f's3://data-lake/{target_dataset}/')

Benefit:

Simplifies Code: Reduces boilerplate and potential for errors.

Focus on Logic: Developers can concentrate on data transformations.

c. Optimized File Sizes

Low-Level API Example:

Optimizing file sizes for object storage manually.

# Low-level API with explicit file size setting
data.write.format('lakehouse') \
    .option('targetFileSize', '512MB') \
    .save('s3://data-lake/optimized/')

High-Level Wrapper Usage:

A high-level wrapper can automatically determine the optimal file size.

# High-level wrapper writes data with optimized file size
data.write_optimized('optimized')

Under the Hood:

def write_optimized(self, dataset_name):
    optimal_size = self.get_optimal_file_size()
    self.write.format('lakehouse') \
        .option('targetFileSize', optimal_size) \
        .save(f's3://data-lake/{dataset_name}/')

def get_optimal_file_size():
    # Logic to determine optimal size based on storage characteristics
    return '512MB'

Benefit:

  • Automation: Removes the need for developers to specify file sizes.
  • Performance: Ensures data is written in sizes that maximize storage efficiency.

2. Seamless Lakehouse Integration in Distributed Compute Engines

a. Caching and Indexing Layer for Enhanced Performance

Low-Level API Example:

Adding a caching and indexing layer optimizes data retrievals and improves query performance. Although the Lakehouse protocol does not inherently support these features, they can significantly improve performance in distributed compute environments.

# Low-level API with explicit caching and indexing configuration
data_api.read('s3://data-lake/metrics/') \
    .option('cacheEnabled', 'true') \
    .option('cacheTTL', '10min') \
    .option('indexingEnabled', 'true')

Explanation:

  • Caching Layer: Adds an explicit caching mechanism to reduce redundant data retrievals.
  • Indexing Layer: Adds indexing support to speed up query performance.
  • Benefit: Enhances performance by minimizing repeated access to the underlying object storage and optimizing query execution.

High-Level Wrapper Usage:

A high-level wrapper can abstract caching and indexing features seamlessly.

# High-level wrapper automatically enables caching and indexing during data read
cached_data = data_api.read_with_cache_and_index('metrics')

Under the Hood:

def read_with_cache_and_index(self, dataset_name, cache_ttl='10min'):
    return self.read(f's3://data-lake/{dataset_name}/') \
        .option('cacheEnabled', 'true') \
        .option('cacheTTL', cache_ttl) \
        .option('indexingEnabled', 'true')

Benefit:

  • Performance Optimization: Reduces latency by avoiding repeated reads from object storage and speeding up query execution.
  • Ease of Use: Automatically configures caching and indexing without developer intervention.

b. First-Class Integration with Governance APIs

Low-Level API Example:

Integrating governance capabilities directly with lakehouse APIs, using systems like Iceberg, ensures governance is available out of the box.

# Low-level API integrates with Iceberg governance features
data_api.write.format('iceberg') \
    .option('governanceEnabled', 'true') \
    .save('s3://data-lake/regulated-data/')

Explanation:

  • Governance Features: Out-of-the-box support for data governance, including schema enforcement and audit logging.
  • Benefit: Simplifies compliance and data management.

High-Level Wrapper Usage:

A high-level wrapper can abstract governance features seamlessly.

# High-level wrapper automatically saves data with governance features
data_api.save('regulated-data')

Under the Hood:

def save(self, dataset_name):
    self.write.format('iceberg') \
        .option('governanceEnabled', 'true') \
        .save(f's3://data-lake/{dataset_name}/')

Benefit:

  • Ease of Use: Governance is integrated without additional configurations.
  • Compliance: Enforces data governance policies automatically.

c. Transactional Protocols Support

We're moving to a paradigm where catalog servers can coordinate concurrent writes even for transaction commits. The default integration can also provide safety for transaction commits or transformations, ensuring consistency and reliability. Integrating with the Iceberg catalog ensures that concurrent writes are appropriately managed, providing ACID guarantees. Iceberg API-compatible catalogs like Polaris converge on APIs to register and enable safety for large-scale table updates. This integration can offer out-of-the-box safety for large-scale updates, further simplifying the management of transactional operations.

Low-Level API Example:

Managing ACID transactions explicitly, highlighting the underlying integration with the Iceberg catalog.

# Low-level API with explicit transaction handling and Iceberg catalog integration
with lakehouse_api.transaction() as txn:
    df = txn.read('s3://data-lake/table/')
    updated_df = df.withColumn('status', lit('active'))
    txn.write(updated_df, 's3://data-lake/table/')
    txn.update_catalog('table', updated_df)  # Explicitly update the catalog
    txn.commit()

Explanation:

  • ACID Compliance ensures atomicity, consistency, isolation, and durability, which are crucial for reliable data processing.
  • Iceberg Integration: Coordinates concurrent writes using the Iceberg catalog to maintain consistency.

High-Level Wrapper Usage:

A high-level wrapper can handle transaction management internally, abstracting the Iceberg catalog integration.

# High-level wrapper simplifies transactional updates
data_api.update_table('table', {'status': 'active'})

Under the Hood:

def update_table(self, table_name, updates):
    with self.transaction() as txn:
        df = txn.read(f's3://data-lake/{table_name}/')
        for column, value in updates.items():
            df = df.withColumn(column, lit(value))
        txn.write(df, f's3://data-lake/{table_name}/')
        txn.update_catalog(table_name, df)  # Automatically update the catalog

Benefit:

  • Simplified Updates: Provides a concise method for common update operations.
  • Error Handling: Centralizes transaction error handling and retries.
  • Catalog Coordination: Automatically integrates with Iceberg catalog to handle concurrent writes safely.

d. Lineage Tracking

With upcoming features like Lineage in Iceberg APIs, the low-level APIs should integrate with the catalog to support lineage capabilities across all features. This integration ensures that systems built on top of the APIs, including SQL engines, ETL processes, and ML frameworks, benefit from lineage tracking out of the box. This means that all transformations and data movements are automatically tracked, allowing developers to focus on building features rather than reinventing lineage mechanisms repeatedly. By relying on the fundamental low-level API, everything built on top can quickly provide comprehensive lineage support.

Low-Level API Example:

Explicitly utilizing lineage-enabled low-level API to track data transformations and movements.

# Low-level API explicitly tracks lineage during data processing
result = data_api.read('s3://data-lake/sales/') \
    .option('lineageTracking', 'true') \
    .filter("date >= '2023-01-01'") \
    .groupBy('region').sum('sales_amount')

Explanation:

  • Lineage Tracking: Ensures that all transformations and data movements are automatically recorded, providing end-to-end visibility.

e. Partition Hints with Visual Studio Code Plugin

Developers can receive partition hints automatically by integrating metadata access into the high-level wrapper and combining it with tools like a Visual Studio Code plugin. This enables high-level APIs to optimize queries by skipping irrelevant partitions, improving performance.

# High-level wrapper automatically handles metadata optimizations
result = data_api.query_sales_since('2023-01-01')

Under the Hood:

def query_sales_since(self, start_date):
    # Use metadata to determine relevant partitions
    partitions = self.get_partitions('sales', start_date)
    return self.read('sales') \
        .hint('partitions', partitions) \
        .filter(f"date >= '{start_date}'") \
        .groupBy('region').sum('sales_amount')

def get_partitions(self, table_name, start_date):
    # Access metadata to get partition information
    metadata = self.read_metadata(f's3://data-lake/{table_name}/')
    return [p for p in metadata['partitions'] if p >= start_date]

Benefit:

  • Optimizations Built-In: Developers benefit from performance enhancements without extra effort.
  • Enhanced Developer Experience: Tools like Visual Studio Code plugins can provide suggestions and autocompletion based on metadata.
  • Foundation for High-Level Engines: These low-level primitives enable the building of powerful SQL engines and ML frameworks.

3. Time Travel and Versioning for Lakehouse Data: Enabling Time Travel and Versioning

Low-Level API Example:

The low-level API can provide time travel and versioning support, enabling developers to query data as it existed at previous points in time. This feature is critical for scenarios like auditing, debugging, or comparing historical datasets.

# Low-level API with time travel support
data_at_version = data_api.read('s3://data-lake/sales/') \
    .option('version', '18')  # Specify the version

Explanation:

  • Time Travel: Allows accessing historical versions of the data.

High-Level Wrapper Usage:

A high-level wrapper can abstract versioning and time travel to simplify its usage.

# High-level wrapper for time travel
data_at_version = data_api.get_version('sales', '2023-03-01')

Under the Hood:

def get_version(self, dataset_name, version):
    return self.read(f's3://data-lake/{dataset_name}/') \
        .option('version', version)

Benefit:

  • Enables features like audit trails, debugging, and reproducibility for experiments.

Conclusion

The new low-level APIs for lakehouse-centric compute engines bring together the best aspects of the lakehouse architecture while addressing its inherent shortcomings. By integrating features such as schema evolution, caching, indexing, governance, and lineage into low-level primitives, these APIs offer a comprehensive foundation that empowers developers to build complex systems effortlessly.

These APIs enable sophisticated SQL engines, machine learning frameworks, and analytics tools and provide developers with a superior experience. They also automatically handle governance, schema, lineage, caching, indexing, and other essential features, reducing complexity and ensuring consistency across all layers.

By embracing these principles and leveraging high-level wrappers built on top of the new low-level APIs, developers and organizations can create robust, efficient, and scalable distributed data processing applications on top of the lakehouse architecture. This approach maximizes the potential of object storage and lakehouse architectures within distributed computing environments while ensuring a second-to-none developer experience.

Read more