Fast Distributed Iceberg Writes and Queries with Apache Arrow IPC

In distributed analytical systems, performance relies on two main factors: the efficiency of data movement between processes or network nodes and the efficiency of data processing once it reaches its destination.
Apache Arrow's Inter-Process Communication (IPC) framework addresses both challenges. Arrow IPC offers a language-agnostic columnar data format with minimal serialization overhead when shared. It enables fast in-memory analytics by leveraging modern CPU features.
This blog post explores how Arrow IPC can be a high-performance data movement and computation tool for building analytical data systems. We specifically focus on streaming data into Apache Iceberg tables, allowing fast distributed writes, and accelerating queries on large-scale Iceberg tables.
The following sections will examine Arrow IPC's impact on data movement and computation in general. We will then delve into streaming ingestion and discuss query execution on Iceberg, highlighting real-world implementations and outcomes.
Arrow IPC
Arrow IPC (Inter-Process Communication) is an Apache Arrow mechanism allowing efficient data transfer between different processes. It utilizes the Arrow columnar format, which enables the movement of collections of Arrow columnar arrays, referred to as "record batches." This IPC mechanism enhances high-performance data analytics by providing a seamless transition between in-memory and on-disk data representations, eliminating the need for data serialization and deserialization.
The Arrow IPC format includes two variants: the streaming format, which allows for synchronous data transfer, and the file format, which supports asynchronous operations by enabling data to be saved on storage. Let's explore what each of these formats represents.
IPC Streaming Format
The Arrow IPC streaming format is designed for real-time data transmission, enabling a continuous data flow in batches of any size. This format is particularly useful for applications where data must be processed as it arrives, such as live data analytics or streaming services. It supports zero-copy reads when the source permits, allowing data to be accessed directly from memory without additional copying. This efficiency is essential for performance-sensitive applications that demand low-latency access to large datasets.
IPC File Format
The Arrow IPC file format is designed explicitly for persistent data storage on disk, enabling random access reads. This format incorporates a magic string for file identification and a footer that facilitates efficient searching within the file. Storing and accessing datasets later is particularly advantageous in batch processing or long-term storage solutions. Additionally, the format supports various features, including compression and integrity checks, making it suitable for effective data management.
The following sections will explore Arrow IPC's advantages for transferring analytical data between processes, across networks, and in various scenarios.
Arrow IPC between processes
Arrow IPC offers a key advantage with its built-in support for zero-copy operations. It allows data to be accessed directly from its original memory location, reducing memory allocation overhead and data copying, resulting in improved performance and lower latency during data processing.
Zero-copy operations also enhance scalability in distributed systems, allowing multiple processes to access shared data efficiently without duplication. This is particularly beneficial in environments handling large volumes of data, such as big data analytics and machine learning. Zero-copy operations lead to streamlined workflows, better resource management in Arrow IPC applications, faster throughput, and lower resource consumption.
Arrow IPC in High-Speed Network Transport (Arrow Flight)
Apache Arrow Flight is a high-performance Remote Procedure Call (RPC) framework designed for efficient data transfer across distributed systems. It leverages the Apache Arrow ecosystem's columnar in-memory format. Eliminating serialization and deserialization enables rapid data transmission, ideal for real-time analytics and large-scale processing.
Arrow Flight is built on gRPC and ensures low latency and high throughput, allowing seamless application communication. Its foundational layer, Arrow IPC, provides efficient data transport with a standardized format that supports easy sharing across different programming languages without needing reorganization. This zero-copy approach minimizes overhead and enhances performance for bulk operations, making Arrow Flight crucial for high-volume data transfers in modern data environments.
Arrow IPC in Streaming and Messaging:
Arrow IPC's efficiency extends beyond simple request/response data fetches; it is equally beneficial for continuous streaming and message-passing systems. A notable example is Fluss, a real-time, open-source streaming platform by Alibaba.
Fluss utilizes Arrow IPC as both its on-disk and in-memory log format for streaming data. Producers gather records into Arrow vectors (columns) and send them to the Fluss server as Arrow IPC messages. Each message can contain thousands of rows in a compact columnar format rather than sending one event at a time. This approach significantly lowers overhead per record and enables server-side column pruning, allowing Fluss to read only the necessary columns from the message for enhanced efficiency.
Although Fluss is a new system, it illustrates a fundamental principle: by employing Arrow's format in a streaming pipeline—such as with Apache Kafka—one can send batches of data immediately usable by consumers without needing parsing. You can accomplish this by serializing an Arrow RecordBatch into a Kafka message. The consumer can then deserialize it using the Arrow library to retrieve the original columns, bypassing textual encodings like JSON or Avro. The overall effect is reduced CPU usage per message and an increased capability to transmit more analytical-columnar data per second through the pipeline.
Direct Arrow Result Streaming from query engine: Arrow IPC excels in delivering query results to applications through Arrow Flight SQL. This advanced protocol enables efficient SQL interactions with databases using the Apache Arrow in-memory format. This framework allows clients to execute SQL queries, manage prepared statements, and retrieve metadata without data format conversions. The columnar representation enhances performance by reducing serialization overhead and supporting parallel data access, along with built-in security features like encryption and authentication, making it ideal for high-speed data manipulation in distributed systems.
Many query engines, such as Doris and Snowflake, utilize Arrow Flight SQL for efficient data access and rapid analytics.
Apache Drill and InfluxDB also leverage Flight SQL for exploratory data analysis and time-series data querying. Additionally, DuckDB benefits from Flight SQL's performance enhancements, while Apache Spark and DataFusion utilize its capabilities for optimized data processing.
Flight SQL is built on Arrow IPC, allowing efficient data transport with zero-copy reads for high throughput and low latency during queries and data transfers. This foundation enables significant optimizations in resource utilization across various database systems.
Efficient Computation with Arrow IPC
Apache Arrow enhances the speed of data transfer and the efficiency of data computation once it is in memory. The Arrow columnar format optimizes how modern CPUs process data. Instead of storing records as individual objects (one object per row with multiple fields), Arrow organizes each column as a contiguous array of values. This design leads to several performance improvements:
Cache Locality
Arrow ensures that values of the same field are stored next to each other in memory. When performing analytic operations (such as scanning a column for a filter), the CPU accesses memory sequentially, which is ideal for CPU caches and memory prefetching. In contrast, row-oriented data can scatter fields throughout memory, leading to cache misses when transitioning from one row to another. Arrow's linear, compact buffers significantly reduce cache miss penalties, ensuring the CPU remains continuously supplied with data.
Vectorization and SIMD
Because Arrow arranges data in arrays, enabling Single Instruction Multiple Data (SIMD) instructions allows the CPU to process multiple values with a single instruction. For instance, a CPU with a 256-bit vector register can load 4 or 8 values simultaneously (depending on the data type) and apply filter conditions on all of them at once. Arrow's format features aligned buffers and normalized data types that work effectively with SIMD. By processing batches of data—often handling thousands of values at once—Arrow reduces loop overhead and enables the CPU to efficiently pipeline operations. Arrow also minimizes branches; for instance, it manages optional values using vectorizable bitmaps, enhancing branch prediction and keeping execution units engaged.
Efficient Algorithms
Arrow's format supports a rich set of vectorized algorithms that can outperform their row-wise equivalents. Operations such as filtering, aggregation, joins, and sorting can be executed by processing chunks of data simultaneously. For example, a hash join can build a hash table using vectorized inserts and probe it with vectorized lookups. A group-by aggregation can utilize Arrow's compact buffers to update counters for many entries in parallel. Many analytic workloads achieve a significant performance boost, reaching speeds 10 to 100 times faster than traditional row-wise processing in CPU-bound analytics.
Overall, the Arrow project highlights that combining a cache-friendly layout, pipelining, and SIMD can greatly enhance execution speeds, making it an effective choice for high-performance data processing tasks.
Now that we have covered Arrow IPC's data transfer and computational benefits, let us move on to how its application benefits streaming data and performing queries on Iceberg tables.
From Disk to Memory: The Relationship Between Parquet and Arrow
The computational benefits and data transfer capabilities align seamlessly with Iceberg's use of columnar file formats. Iceberg tables typically store data in Parquet files, which are organized columnar on disk. When a query engine accesses an Iceberg table, it reads these files into Arrow record batches without reconstructing rows in between.
For example, Apache Drill, an open-source SQL engine, was developed to read Parquet pages directly into its columnar memory format, similar to Arrow. It allows Drill to execute queries without ever materializing rows (see Performance - Apache Drill). Apache Drill's execution engine can skip irrelevant columns and operate using vectorized operators, resulting in significantly lower memory usage and faster execution for business intelligence queries (see Performance - Apache Drill).
Currently, any Arrow-based engine can perform similarly: when scanning an Iceberg table, it loads only the necessary columns and keeps them in Arrow format throughout the processes of filtering, aggregation, and more until it produces the final result. This approach avoids unnecessary work transforming columnar disk data into row objects and back into a columnar format for processing – it maintains a columnar method from start to finish.
Batching and Computations with Arrow in Query Engines
Additionally, working with larger batches in memory enhances CPU and I/O utilization. Reading a million rows in 10 batches of 100,000 rows each is typically more efficient than processing 100,000 batches with only 10 rows at a time. Arrow encourages processing sizable record batches, facilitating read-ahead from storage, and reducing the number of function calls during execution.
Many engines select a batch size that fits well within the CPU cache to maximize locality and then leverage Arrow's format to process that batch efficiently. For instance, when each batch comprises a few thousand rows, an aggregation can be performed on that batch using vector instructions, which can then be combined to generate partial results.
To illustrate this with real examples, Apache Drill has reported that its vectorized execution (handling record batches rather than single records) has outperformed traditional engines when dealing with large datasets (see Performance - Apache Drill). Apache Impala and Apache Spark also benefit from Arrow; for example, Spark's columnar data source and vectorized Parquet reader apply Arrow-like principles (such as columnar batches and SIMD) to accelerate DataFrame operations. In the Python data ecosystem, projects like pandas, Polars, and DuckDB all utilize Arrow's memory format or concepts, demonstrating that the Arrow computation approach benefits various applications, from databases to data science.
Having discussed Arrow IPC's general advantages in moving and processing data from Parquet at rest, we will apply these concepts to Apache Iceberg, beginning with how Arrow IPC can facilitate faster and more efficient streaming ingestion into Iceberg tables.
Streaming Ingestion into Iceberg Tables
Arrow IPC can improve how we move and process data by adding information to Iceberg tables in real-time or small batches. Usually, streaming data into a data lake table requires several steps. For example, a Flink job might read JSON messages from Kafka, change them into internal objects, and then write Parquet files for Iceberg. Arrow makes this process more efficient.
Efficient Data Movement in Streaming Ingestion
The Problem with Naive Ingestion
A basic streaming ingestion setup often works with one record at a time or tiny batches. For instance, if we receive a stream of JSON messages, each message represents a row for the Iceberg table. Most operational data organizes itself in rows, making JSON messages common. Writing each message individually to Parquet (the typical storage format for Iceberg) is inefficient—Parquet and Iceberg perform better when we use large batches, which helps with compression and avoids the small file issue.
The small files issue happens when there are many tiny files (like dozens of 1 MB files) instead of one larger file (ideally around 128 MB). This situation slows down read performance because managing many files adds overhead. While streaming, there may not be enough data at one time to create a large file, either delaying data (which adds latency) or creating small files (which hurts read performance).
Additionally, formats like JSON or CSV require the system to spend a lot of CPU time parsing text and changing data types.
Arrow IPC-Based Ingestion Approach
By utilizing Arrow as the interchange format in our ingestion pipeline, we can batch incoming data streams at the source and transfer those batches into Iceberg.
For example, consider ingesting application logs that generate millions of log events per minute. Instead of sending individual JSON messages to Kafka, a producer can accumulate, for instance, 10,000 log entries, place them into an Arrow RecordBatch, and serialize this as an Arrow IPC message. Then, the system can transmit the message through the message queue or streaming system.
Downstream consumers, such as an Iceberg writer service, can deserialize the Arrow IPC message to retrieve a batch of 10,000 rows in a columnar format. Because Arrow IPC includes both the schema and binary data, there is no need for a separate schema registry or per-field parsing. The system has already defined the data types and organized the bytes into primitive arrays.
As discussed earlier, Fluss, a solution from Alibaba, provides a concrete example of an effective data handling pattern for streaming analytical data using columnar primitives. In this process, producers send batches of columnar data to the stream storage using Arrow IPC. The ingestion service that receives these batches can process them directly.
Libraries such as Arrow's Parquet Writer or Iceberg's Writers, available in Rust, can consume an Arrow RecordBatch and output a Parquet row group.
In the Iceberg Rust implementation, the AsyncArrowWriter type writes Arrow record batches to Parquet asynchronously.
This writer processes each incoming Arrow batch and encodes it into Parquet pages. It buffers the data until a specified number of rows per group is complete, flushing the data to disk. Also, one can force a flush to the disk and commit the snapshot based on a timing configuration. The key advantage is that the ingestion loop operates with sizable chunks of pre-formatted data instead of processing single or fewer rows at a time.
Reduced Overhead and Small File Mitigation
Utilizing Arrow for data ingestion significantly minimizes the overhead associated with parsing and (de)serialization. Although the initial data source may still require parsing raw data (such as when reading JSON), this step is only necessary once to convert the data into Arrow format. After this conversion, each subsequent stage can transfer Arrow data without reprocessing.
The Iceberg writer does not need to interpret JSON or Avro formats; it only needs to map Arrow columns directly to Parquet columns. This method conserves CPU resources and often enhances throughput due to the efficiency of vectorized writes. Additionally, we can efficiently create larger Parquet files by buffering data into Arrow batches.
By ingesting data in Arrow IPC format, we effectively address the issue of Iceberg's small files at the source significantly. We can batch larger Arrow records more quickly, which results in writing fewer, larger files to the table.
Efficient Computation in Streaming ingestion
Arrow IPC facilitates data transfer to Iceberg and enables valuable computations during the ingestion process that can enhance query performance later on. As the data arrives in Arrow format (batched and columnar), the ingestion process can easily perform vectorized preprocessing, which offers several benefits:
Index and Statistics Gathering
Iceberg maintains metadata such as partition values and column statistics (minimum, maximum, and null counts) for each data file. With Arrow batches, computing these statistics becomes trivial and fast. When we write each RecordBatch to a file, we quickly compute the metadata stats for each column using Arrow's optimized compute functions, which developers have written in C++/SIMD.
Since the data is already in memory in a columnar format, calculating min/max values requires just a single pass over each array—something Arrow performs exceptionally well.
Accurate column statistics allow query engines to perform partition pruning and skip data not meeting query filters. Calculating these statistics upfront on the Arrow data ensures the metadata is comprehensive without incurring extra overhead.
Computing data distribution statistics using specialized data structures like t-digest is also possible, efficiently exceeding standard parquet statistics at the same layer.
Partitioning and Encoding
When you partition an Iceberg table by date or a key hash, the ingestion pipeline can quickly determine the partition value of each row from the Arrow batch.
For example, using a timestamp column allows one to vectorize the data transformation, allowing for faster processing than handling individual events. Additionally, sorting data in an Arrow batch through vectorized algorithms is more efficient than sorting JSON records, as it optimizes CPU cache usage, making it quicker to handle large datasets like 10,000 rows.
Pre-Aggregation or Filtering/Data Quality Checks
Sometimes, the ingestion pipeline may need to aggregate data or filter out unwanted events before writing to Iceberg. This process helps ensure data quality by reducing the overall data size and creating more queryable tables. Doing so can save one or two layers of ETL computing, often introducing significant latencies affecting near-real-time data freshness or readiness.
All the computations performed during data ingestion in Iceberg enhance its query-friendliness for query engines. This process results in well-partitioned data files, each containing useful statistics and potentially even filtered data with data-quality checks done on the fly.
The query planner's tasks become easier by computing metadata statistics and utilizing a parquet planner with Iceberg tables during data ingestion. Based on these statistics, it can skip files, plan more effective scans by knowing that the data is partitioned and sorted, and generally do less work at runtime. Essentially, Arrow IPC allows us to shift some computational tasks to the ingestion phase at a lower cost, which balances the workload and leads to faster queries later on. Without Arrow, overloading the ingestion process with computations can be expensive.
To summarize, streaming Ingestion into Iceberg backed by Arrow IPC means:
(a) the pipeline can ingest high volumes of data with low overhead, writing efficient columnar files
(b) the pipeline can perform smart preprocessing (partitioning, stats, indexes, data quality checks, etc.) on the fly. To a fair degree, the combination addresses the longstanding challenges of streaming into data lakes – namely, dealing with small files and ensuring data quality/organization – by leveraging Arrow's capabilities. Implementations like Fluss are moving in this direction, and the Iceberg project itself through PyIceberg and the Rust writer is embracing Arrow for faster writes.
As fresh data continuously flows into our Iceberg tables, the next concern is ensuring that queries on Iceberg can utilize the same performance benefits powered by Arrow. In the following section, we will explore how Arrow IPC improves the experience of querying Iceberg tables in distributed environments.
Querying Iceberg Tables with Arrow IPC
When executing analytical queries on Iceberg tables, which may consist of large datasets distributed across multiple files, Arrow IPC (Inter-Process Communication) enhances both data movement and computation. Specifically, it improves the efficiency of shuffling data between nodes, reading from disk, sending results to clients, and scanning, filtering, and processing data in a vectorized format.
Modern query engines and SQL platforms have begun integrating Arrow at various levels to accelerate queries on data lake tables like Iceberg. While Arrow's computational benefits are well-documented, this discussion will focus on two lesser-known applications of Arrow IPC: Shuffle and Spill to Disk in query engines.
Efficient Data Movement in Querying
Shuffle and Spill in Arrow Format
In a distributed query—such as an SQL query that involves a shuffle for grouping or joining data—intermediate data must move between executor nodes. When the engine uses Arrow as the exchange format, it can send serialized Arrow records over the network to the appropriate node. Since Arrow data does not require translation, the receiving node can either memory-map or reconstruct the batch immediately, allowing it to continue processing seamlessly. Arrow facilitates extreme efficiency in data exchange between two systems.
The serialization cost is almost negligible, as it only involves writing the Arrow IPC encapsulation around buffers. At the same time, deserialization is essentially a no-op, involving just the reading of the schema and buffer metadata.
In contrast, with a traditional shuffle, one node might convert data to a Java byte stream, and the other would rebuild Java objects, which adds overhead. Arrow eliminates these unnecessary steps. In big data frameworks, transferring data between nodes is often a bottleneck, so Arrow can significantly speed up queries requiring large shuffles.
Spill to Disk Optimization
Similarly, if the engine needs to write data to disk—such as when saving a temporary set of data that does not fit in RAM during a sort—it can do so in Arrow IPC format. When the data is needed again, it can be read back via memory mapping or sequentially read directly into Arrow structures. Many systems based on Arrow implement this approach because writing in Arrow IPC format is fast, and the format is binary-compatible with in-memory structures.
It effectively blurs the boundary between in-memory processing and on-disk buffering. It means storing Arrow data on disk and using it directly when paged into memory. This reduces the overhead of translating data to a specific spill format or parsing it back.
Conclusion
Apache Arrow IPC is vital for efficient distributed data systems and Iceberg-based data lakes. It enhances workflows by enabling fast data ingestion and processing, eliminating unnecessary serialization.
Arrow's columnar format optimizes computation, allowing faster query execution. With Iceberg's capabilities, Arrow IPC unifies data ingestion and query serving, providing a high-performance solution for modern analytics challenges.
If you're interested in brainstorming solutions to the discussed problems, please connect with me on LinkedIn or Twitter. Stay tuned and subscribe, as more blogs on Iceberg-Rust internals are coming soon.
Resources
- https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/
- https://voltrondata.com/blog/apache-arrow-flight-primer
- - https://www.alibabacloud.com/blog/introducing-fluss-streaming-storage-for-real-time-analytics_601921
- https://www.dremio.com/resources/guides/apache-arrow
- https://www.streamingdata.tech/p/fluss-first-impression
- https://arrow.apache.org/rust/parquet/arrow/async_writer/struct.AsyncArrowWriter.html
- https://github.com/apache/iceberg-rust
- https://www.tabular.io/apache-iceberg-cookbook/pyiceberg-writes