ahmedabu98 commented on code in PR #38821:
URL: https://github.com/apache/beam/pull/38821#discussion_r3471549875


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/CdcReadUtils.java:
##########
@@ -0,0 +1,698 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.cdc;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.iceberg.IcebergScanConfig;
+import org.apache.beam.sdk.io.iceberg.ReadUtils;
+import org.apache.beam.sdk.io.iceberg.SerializableDeleteFile;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.BaseDeleteLoader;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.data.DeleteLoader;
+import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.schema.MessageType;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Read-side helpers specific to the CDC source. Keeps {@link ReadUtils} 
focused on the
+ * general-purpose append-only read path; everything that takes a {@link 
SerializableChangelogTask},
+ * references {@link DeleteReader}, or implements the delete-pushdown 
row-group skipping lives here.
+ */
+public final class CdcReadUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CdcReadUtils.class);
+
+  /**
+   * Maximum size of an equality delete set to push down as a Parquet residual 
{@code IN}
+   * expression. Matches {@link 
ParquetMetricsRowGroupFilter#IN_PREDICATE_LIMIT}.
+   */
+  private static final int IN_PREDICATE_LIMIT = 200;
+
+  public static CloseableIterable<Record> createReader(
+      SerializableChangelogTask task,
+      Table table,
+      IcebergScanConfig scanConfig,
+      Schema outputSchema) {
+    return createReader(task, table, scanConfig, outputSchema, 
Expressions.alwaysTrue());
+  }
+
+  /**
+   * Same as {@link #createReader(SerializableChangelogTask, Table, 
IcebergScanConfig, Schema)} but
+   * ANDs {@code extraResidual} into the task's residual expression. The 
combined expression is
+   * passed to Iceberg's Parquet reader, which uses it as a row-group-level 
filter (skips row groups
+   * whose column statistics cannot match). The caller is still responsible 
for applying the
+   * residual at the row level.
+   *
+   * <p>This is used to push extra predicates (e.g. an equality-delete {@code 
IN} expression) down
+   * to the reader for cheap row-group skipping.
+   */
+  public static CloseableIterable<Record> createReader(
+      SerializableChangelogTask task,
+      Table table,
+      IcebergScanConfig scanConfig,
+      Schema outputSchema,
+      Expression extraResidual) {
+    return createReader(
+        task, table, scanConfig, outputSchema, extraResidual, task.getStart(), 
task.getLength());
+  }
+
+  /**
+   * Same as {@link #createReader(SerializableChangelogTask, Table, 
IcebergScanConfig, Schema,
+   * Expression)} but reads the byte range {@code [start, start + length)} of 
the DataFile.
+   * Iceberg's Parquet reader selects the row groups whose starting offset 
falls within this range,
+   * allowing us to prune row-groups by byte-range.
+   *
+   * <p>Callers are responsible for ensuring the requested range stays within 
the task's assigned
+   * range, to avoid reading a section that is meant for another worker.
+   */
+  public static CloseableIterable<Record> createReader(
+      SerializableChangelogTask task,
+      Table table,
+      IcebergScanConfig scanConfig,
+      Schema outputSchema,
+      Expression extraResidual,
+      long start,
+      long length) {
+    Expression baseResidual = task.getExpression(table.schema());
+    Expression combined =
+        extraResidual.op() == Expression.Operation.TRUE
+            ? baseResidual
+            : Expressions.and(baseResidual, extraResidual);
+    return ReadUtils.createReader(
+        table,
+        scanConfig,
+        outputSchema,
+        checkStateNotNull(table.specs().get(task.getSpecId())),
+        task.getDataFile().createDataFile(table.specs()),
+        task.getDataFile().getFileSequenceNumber(),
+        start,
+        length,
+        combined);
+  }
+
+  /** Returns a filter that skips records marked for deletion. */
+  public static DeleteFilter<Record> genericDeleteFilter(
+      Table table, Schema outputSchema, String dataFilePath, 
List<SerializableDeleteFile> deletes) {
+    return new GenericDeleteFilter(
+        table.io(),
+        dataFilePath,
+        table.schema(),
+        outputSchema,
+        deletes.stream()
+            .map(sdf -> sdf.createDeleteFile(table.specs(), 
table.sortOrders()))
+            .collect(Collectors.toList()));
+  }
+
+  /** Returns a delete reader that reuses delete structures already loaded by 
CDC planning. */
+  public static DeleteReader<Record> genericDeleteReader(
+      Table table,
+      Schema outputSchema,
+      String dataFilePath,
+      List<SerializableDeleteFile> deletes,
+      DeleteReader.PreloadedDeletes preloadedDeletes) {
+    return new GenericDeleteReader(
+        table.io(),
+        dataFilePath,
+        table.schema(),
+        outputSchema,
+        deletes.stream()
+            .map(sdf -> sdf.createDeleteFile(table.specs(), 
table.sortOrders()))
+            .collect(Collectors.toList()),
+        preloadedDeletes);
+  }
+
+  /**
+   * Opens the records that a CDC reader should process for a single {@link
+   * SerializableChangelogTask}, applying the appropriate delete-filter / 
delete-reader chain for
+   * the task's type:
+   *
+   * <ul>
+   *   <li>{@code ADDED_ROWS}: Collect and return the records that became live 
in this commit:
+   *       <ul>
+   *         <li>1. Iterate over records in the added DataFile
+   *         <li>2. Filter out records matched by any added deletes
+   *       </ul>
+   *   <li>{@code DELETED_ROWS}: Return records in the DataFile that are 
marked for deletion by new
+   *       DeleteFiles, making sure to first ignore records that have already 
been marked by
+   *       previous DeleteFiles:
+   *       <ul>
+   *         <li>1. Iterate over records in the referenced DataFile
+   *         <li>2. Filter out records matched from existing deletes.
+   *         <li>3. Filter out records NOT matched from added deletes
+   *       </ul>
+   *   <li>{@code DELETED_FILE} — every record in the DataFile that wasn't 
already deleted by {@code
+   *       existingDeletes}.
+   *       <ul>
+   *         <li>1. Iterate over records in the referenced DataFile
+   *         <li>2. Filter out records matched from existing deletes.
+   *       </ul>
+   * </ul>
+   *
+   * <p>Projection pushdown should not be used when reading bi-directional 
tasks because we need to
+   * compare all record columns to accurately identify updates. Otherwise, 
user-configured
+   * projection may drop a column that contains real updates. If this happens, 
the downstream
+   * resolver will mistakenly determine the (delete, insert) pair to be a 
duplicate.
+   *
+   * <p>If CDC metadata columns are requested, this method only adds 
row-sourced metadata columns
+   * ({@code _row_id}, {@code _last_updated_sequence_number}) to the Iceberg 
read schema. Changelog
+   * context columns are added later by {@link CdcOutputUtils#outputRow}.
+   */
+  public static CloseableIterable<Record> changelogRecordsForTask(
+      SerializableChangelogTask task,
+      Table table,
+      IcebergScanConfig scanConfig,
+      boolean useProjectedSchema) {
+    String dataFilePath = task.getDataFile().getPath();
+    Schema outputSchema =
+        CdcOutputUtils.readSchemaWithRowMetadata(
+            scanConfig.getMetadataColumns(),
+            useProjectedSchema ? scanConfig.getRequiredSchema() : 
table.schema());
+    switch (task.getType()) {
+      case ADDED_ROWS:
+        DeleteFilter<Record> addedDeletesFilter =
+            genericDeleteFilter(table, outputSchema, dataFilePath, 
task.getAddedDeletes());
+        return addedDeletesFilter.filter(
+            createReader(task, table, scanConfig, 
addedDeletesFilter.requiredSchema()));
+      case DELETED_FILE:
+        DeleteFilter<Record> existingDeletesFilter =
+            genericDeleteFilter(table, outputSchema, dataFilePath, 
task.getExistingDeletes());
+        return existingDeletesFilter.filter(
+            createReader(task, table, scanConfig, 
existingDeletesFilter.requiredSchema()));
+      case DELETED_ROWS:
+        return deletedRowsForTask(task, table, scanConfig, outputSchema);
+      default:
+        throw new IllegalStateException("Unknown ChangelogScanTask type: " + 
task.getType());
+    }
+  }
+
+  /**
+   * Builds the reader chain for a {@code DELETED_ROWS} task with row-group 
pushdown when possible.
+   * This helps the reader skip entire row groups. For unskipped row groups, 
the reader should still
+   * apply per-record position + equality checks at the row level.
+   *
+   * <p>We use two pushdown strategies, depending on the type of {@link 
DeleteFile} in the task
+   * (Position Delete vs. Equality Delete). The two strategies can be combined 
if both {@link
+   * DeleteFile} types are present.
+   *
+   * <ol>
+   *   <li><b>Byte-range pushdown for Position Deletes:</b> pre-load the {@link
+   *       PositionDeleteIndex}, read the Parquet footer, and compute a single 
contiguous byte range
+   *       covering the row groups that contain at least one deleted position.
+   *   <li><b>IN-expression pushdown for Equality Deletes:</b> build an 
Iceberg {@code IN}
+   *       expression and pass it as a Parquet residual so the metrics 
row-group filter can skip
+   *       non-matching row groups.
+   * </ol>
+   *
+   * <p>If Position and Equality deletes are both present, both strategies are 
used to get one
+   * contiguous range. We read only that range, skipping leading and trailing 
row groups that
+   * contain no deletions.
+   *
+   * <p>Note: Equality pushdown is only used when all delete files share a 
single equality field.
+   * Multi-column equality requires an exploded OR expression that Parquet's 
metrics filter handles
+   * poorly.
+   */
+  private static CloseableIterable<Record> deletedRowsForTask(
+      SerializableChangelogTask task,
+      Table table,
+      IcebergScanConfig scanConfig,
+      Schema outputSchema) {
+    String dataFilePath = task.getDataFile().getPath();
+    List<SerializableDeleteFile> addedDeletes = task.getAddedDeletes();
+
+    // Split into position vs equality.
+    List<DeleteFile> posFiles = new ArrayList<>();
+    List<DeleteFile> eqFiles = new ArrayList<>();
+    for (SerializableDeleteFile sd : addedDeletes) {
+      DeleteFile df = sd.createDeleteFile(table.specs(), table.sortOrders());
+      if (df.content() == FileContent.POSITION_DELETES) {
+        posFiles.add(df);
+      } else if (df.content() == FileContent.EQUALITY_DELETES) {
+        eqFiles.add(df);
+      }
+    }
+
+    // Strategy 1: byte-range pushdown around row groups with position deletes 
(+ eq
+    // matches).
+    DeleteReader.PreloadedDeletes preloadedDeletes = 
DeleteReader.PreloadedDeletes.empty();
+    if (!posFiles.isEmpty()) {
+      @Nullable
+      PositionPushdownResult pushdown =
+          tryPositionByteRangePushdown(
+              task, table, scanConfig, outputSchema, posFiles, eqFiles, 
addedDeletes);
+      if (pushdown != null) {
+        if (pushdown.deletedRecords != null) {
+          return pushdown.deletedRecords;
+        }
+        preloadedDeletes = pushdown.preloadedDeletes;
+      }
+      // fall through to the default chain on failure
+    }
+
+    // Strategy 2: equality IN-expression pushdown applied as a reader 
residual.
+    // Only safe when no position deletes are present. when both exist, the
+    // byte-range path above already incorporates the eq filter
+    Expression eqResidual = Expressions.alwaysTrue();
+    if (posFiles.isEmpty() && !eqFiles.isEmpty()) {
+      EqualityPushdownResult eqPushdown = buildEqualityDeletePushdown(table, 
eqFiles);
+      eqResidual = eqPushdown.applicable ? eqPushdown.residual : 
Expressions.alwaysTrue();
+      preloadedDeletes = eqPushdown.preloadedDeletes(null);
+    }
+
+    DeleteFilter<Record> existingDeletesFilter =
+        genericDeleteFilter(table, outputSchema, dataFilePath, 
task.getExistingDeletes());
+    DeleteReader<Record> addedDeletesReader =
+        genericDeleteReader(table, outputSchema, dataFilePath, addedDeletes, 
preloadedDeletes);
+    Schema requiredSchema =
+        TypeUtil.join(existingDeletesFilter.requiredSchema(), 
addedDeletesReader.requiredSchema());
+
+    CloseableIterable<Record> records =
+        createReader(task, table, scanConfig, requiredSchema, eqResidual);
+    CloseableIterable<Record> liveRecords = 
existingDeletesFilter.filter(records);
+    return addedDeletesReader.read(liveRecords);
+  }
+
+  /**
+   * Path-A byte-range position-delete pushdown. Returns {@code null} if 
pushdown isn't applicable
+   * or any step fails, signaling to the caller to fall back. Returns an empty 
iterable if every row
+   * group is pruned.
+   */
+  private static @Nullable PositionPushdownResult tryPositionByteRangePushdown(
+      SerializableChangelogTask task,
+      Table table,
+      IcebergScanConfig scanConfig,
+      Schema outputSchema,
+      List<DeleteFile> posFiles,
+      List<DeleteFile> eqFiles,
+      List<SerializableDeleteFile> addedDeletes) {
+    String dataFilePath = task.getDataFile().getPath();
+
+    // 1. pre-load the position index for this data file.
+    PositionDeleteIndex posIndex;
+    try {
+      DeleteLoader loader = new BaseDeleteLoader(df -> 
table.io().newInputFile(df.location()));
+      posIndex = loader.loadPositionDeletes(posFiles, dataFilePath);
+    } catch (RuntimeException e) {
+      LOG.info(
+          "Failed to pre-load position deletes for {}; falling back to default 
reader chain.",
+          dataFilePath,
+          e);
+      return null;
+    }
+    if (posIndex.isEmpty()) {
+      // the pos-delete files don't actually target this data file (rare but 
possible
+      // after metadata operations). Fall back so the eq pushdown does not run 
here either.
+      return PositionPushdownResult.fallback(
+          DeleteReader.PreloadedDeletes.of(posIndex, Collections.emptyMap()));
+    }
+
+    // 2. optional equality filter (used to extend the byte range to include 
row groups
+    // whose stats match the equality IN values).
+    @Nullable ParquetMetricsRowGroupFilter eqFilter = null;
+    EqualityPushdownResult eqPushdown = EqualityPushdownResult.notApplicable();
+    if (!eqFiles.isEmpty()) {
+      eqPushdown = buildEqualityDeletePushdown(table, eqFiles);
+      if (!eqPushdown.applicable) {
+        // eq deletes are present but we can't safely identify which row 
groups they target.
+        // A narrowed position-only range could drop eq-deleted rows, so fall 
back to the
+        // default full-range reader. DeleteReader will still apply residual 
per record.
+        return 
PositionPushdownResult.fallback(eqPushdown.preloadedDeletes(posIndex));
+      }
+      eqFilter = new ParquetMetricsRowGroupFilter(table.schema(), 
eqPushdown.residual);
+    }
+    DeleteReader.PreloadedDeletes preloadedDeletes = 
eqPushdown.preloadedDeletes(posIndex);
+
+    // 3. read the footer and compute the task byte range covering every row 
group that
+    // contains a position delete or matches the eq filter.
+    long taskStart = task.getStart();
+    long taskEnd = taskStart + task.getLength();
+    long minStart = Long.MAX_VALUE;
+    long maxEnd = Long.MIN_VALUE;
+    long[] sortedDeletePositions = sortedDeletePositions(posIndex);

Review Comment:
   Done



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java:
##########
@@ -143,13 +148,13 @@ public org.apache.iceberg.Schema getRequiredSchema() {
 
   @Pure
   @Nullable
-  public Evaluator getEvaluator() {
+  public Evaluator getEvaluator(org.apache.iceberg.Schema requiredSchema) {
     @Nullable Expression filter = getFilter();
     if (filter == null) {
       return null;
     }
     if (cachedEvaluator == null) {
-      cachedEvaluator = new Evaluator(getRequiredSchema().asStruct(), filter);
+      cachedEvaluator = new Evaluator(requiredSchema.asStruct(), filter);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to