lintingbin opened a new issue, #15925:
URL: https://github.com/apache/iceberg/issues/15925

   ## Background
   
   The `DynamicIcebergSink` is designed for multi-table fan-out scenarios where 
a single Flink job ingests heterogeneous event streams into different Iceberg 
tables. A common production pattern is the **wide-table model**: all event 
types (log types) share one large Iceberg table whose schema is the union of 
all their columns, while each individual event type only carries a subset of 
those columns.
   
   **Example:** A game analytics pipeline ingests 50 distinct log types into 
one wide Iceberg table with 500 columns. Log type A produces columns 1–40, log 
type B produces columns 200–230, and so on. No single event populates all 500 
columns.
   
   ## Problem
   
   Without projected schema writes, `DynamicIcebergSink` must write a 
**full-width row** for every record. The caller is required to pad missing 
columns with `null` before handing the record to the sink. This causes 
significant overhead in wide-table scenarios:
   
   ### 1. Wasted CPU on null padding
   
   The upstream operator must construct a `RowData` with the full table width 
(e.g., 500 fields), setting the vast majority to `null`. For high-throughput 
pipelines (millions of records/second), this is measurable CPU waste.
   
   ### 2. Wasted storage and I/O
   
   Even with Parquet's efficient null encoding, writing explicit null columns 
in every row file inflates file sizes. A Parquet column chunk is always written 
even if all its values are null; the schema metadata overhead alone adds up at 
scale. In our production environment with 1-minute checkpoint intervals, this 
results in far more and larger small files than necessary.
   
   ### 3. Inflated serialization cost between Flink operators
   
   `DynamicRecord` carries the full `RowData` across the network (Writer → 
Committer topology). A record with 500 columns is serialized even though only 
40 of those columns are meaningful. This wastes both CPU and network bandwidth 
within the Flink job.
   
   ### 4. Poor write isolation between log types
   
   `DynamicWriter` uses a `WriteTarget` as the writer factory key. With 
full-width rows, all records for the same table share one writer factory 
regardless of their logical log type. For use cases where log types need to be 
physically isolated (e.g., different sort orders per log type), this is 
inflexible.
   
   ## Proposed Solution
   
   Introduce an optional `writeSchema` field on `DynamicRecord` (and its 
internal counterpart `DynamicRecordInternal`). When set, `writeSchema` tells 
`DynamicWriter` to use only those columns when creating the Parquet 
`TaskWriter`, letting Iceberg fill the remaining optional columns with `null` 
at read time via its column projection mechanism.
   
   ```java
   DynamicRecord record = new DynamicRecord(...);
   // Only specify the columns this log type actually produces
   Schema logTypeASchema = tableSchema.select("event_time", "user_id", "level", 
"score");
   record.setWriteSchema(logTypeASchema);
   ```
   
   The `RowData` inside the record is expected to match `writeSchema` in column 
count and order — exactly like the existing schema-matching behavior, but 
without triggering schema evolution.
   
   ### Key design points
   
   **`writeSchema` vs `schema` (record schema)**
   
   | Field | Purpose |
   |-------|---------|
   | `schema` | Used for schema matching and evolution (existing behavior, 
unchanged) |
   | `writeSchema` | Used only for Parquet file writing; must be a subset of 
the resolved table schema |
   
   **`writeSchemaColumnIds` in `WriteTarget`**
   
   `WriteTarget` (the writer factory key) gains an ordered list of field IDs 
derived from `writeSchema`. This ensures records with different projected 
schemas get independent `TaskWriter` instances, which is required for 
correctness (each writer is created with a specific `RowType`).
   
   **Validation in `DynamicWriter`**
   
   Before creating a writer factory for a projected schema, the following are 
enforced:
   
   - **No required columns may be skipped** — Iceberg cannot fill a `required` 
column with `null` at read time; skipping one would produce unreadable files.
   - **All partition columns must be included** — the writer needs partition 
values to place files correctly.
   - **All equality-delete fields must be included** — required for correct 
upsert semantics.
   - **All field IDs in `writeSchema` must exist in the resolved table schema** 
— prevents silent field mismatches.
   
   **Aggregation in `DynamicWriteResultAggregator` and `DynamicCommitter`**
   
   `WriteResult`s produced by different projected schemas for the same table 
are merged at the table level before committing. A single checkpoint produces 
at most one Iceberg transaction per table, regardless of how many distinct 
`writeSchema` variants were active. This is consistent with the existing 
behavior for non-projected writes.
   
   ## Serialization Compatibility
   
   Introducing `writeSchema` requires changes to several serializers:
   
   | Serializer | Change |
   |---|---|
   | `DynamicRecordInternalSerializer` | Version bumped; `writeSchema` 
serialized as a length-prefixed JSON string (avoids 
`DataOutputStream.writeUTF`'s 65535-byte limit for large schemas); 
backward-compatible deserialization preserved |
   | `DynamicCommittableSerializer` | Version 1 → 2; V1 deserialization 
preserved for checkpoint recovery |
   | `DynamicWriteResultSerializer` | Version 1 → 2; same 
backward-compatibility guarantee |
   | `WriteTarget` (versioned serializer) | Version bumped to account for new 
`writeSchemaColumnIds` field |
   
   ## Performance Impact
   
   In production testing against a wide table (500 columns, 50 log types, 
average 40 active columns per log type, Flink 1.20):
   
   - **Write throughput**: up to **4x** improvement for wide schemas
   - **Parquet file size**: **several times smaller** with 1-minute checkpoint 
intervals (fewer column chunks per file, significantly reduced small-file 
problem)
   
   ## API Changes
   
   ```java
   // DynamicRecord.java (public API)
   public class DynamicRecord {
       // existing fields unchanged ...
   
       /**
        * Optional projected schema for writing. When set, the RowData columns
        * must match this schema rather than the full table schema.
        * The remaining optional columns in the table schema will be read back
        * as null by Iceberg at query time.
        */
       @Nullable
       public Schema writeSchema() { ... }
   
       public void setWriteSchema(@Nullable Schema writeSchema) { ... }
   }
   ```
   
   This is a purely additive, backward-compatible change. Callers that do not 
set `writeSchema` see no behavioral difference.
   
   ## Alternatives Considered
   
   **A. Null-pad at the source / upstream operator**
   This is what the current API forces. It moves the burden to every caller and 
does not reduce the serialization or storage cost — it just shifts where the 
nulls are constructed.
   
   **B. Separate tables per log type**
   This defeats the purpose of the wide-table model (unified querying, single 
schema governance) and creates operational overhead (hundreds of tables to 
manage).
   
   **C. Use Iceberg's existing read-side column projection**
   Iceberg's read-side column projection is well-established. This proposal 
applies the same concept to the write side within `DynamicIcebergSink`, where 
per-record write schema variance is the norm.
   
   ## Related Issues
   
   - #11536 — original DynamicIcebergSink tracking issue
   - #14090 — `DynamicWriteResultAggregator` producing multiple committables 
per table/branch/checkpoint; the table-level result aggregation introduced by 
this feature also addresses that issue as a side effect
   
   ## Implementation Status
   
   A working implementation exists and has been validated in production (Flink 
1.20, ~2M records/second, 500-column wide table). We intend to extend support 
to Flink 1.19 and 2.0 and submit a PR. We welcome feedback on the API design, 
particularly whether `writeSchema` belongs on `DynamicRecord` directly or 
should be expressed differently.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to