khandelwal-prateek commented on code in PR #4146:
URL: https://github.com/apache/gobblin/pull/4146#discussion_r2509613785


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.base.Optional;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
+import org.apache.gobblin.source.extractor.filebased.FileBasedSource;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitWeighter;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.binpacking.FieldWeighter;
+import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+
+/**
+ * Unified Iceberg source that supports partition-based data copying from 
Iceberg tables.
+ *
+ * This source reads job configuration, applies date partition filters with 
optional lookback period,
+ * and uses Iceberg's TableScan API to enumerate data files for specific 
partitions. It groups files
+ * into work units for parallel processing.
+ *
+ * <pre>
+ * # Basic configuration
+ * source.class=org.apache.gobblin.data.management.copy.iceberg.IcebergSource
+ * iceberg.database.name=db1
+ * iceberg.table.name=table1
+ * iceberg.catalog.uri=https://openhouse.com/catalog
+ *
+ * # Partition filtering with lookback
+ * iceberg.filter.enabled=true
+ * iceberg.filter.expr=datepartition=2025-04-01
+ * iceberg.lookback.days=3
+ * </pre>
+ */
+@Slf4j
+public class IcebergSource extends FileBasedSource<String, 
FileAwareInputStream> {
+
+    public static final String ICEBERG_DATABASE_NAME = "iceberg.database.name";
+    public static final String ICEBERG_TABLE_NAME = "iceberg.table.name";
+    public static final String ICEBERG_CATALOG_URI = "iceberg.catalog.uri";
+    public static final String ICEBERG_CATALOG_CLASS = "iceberg.catalog.class";
+    public static final String DEFAULT_ICEBERG_CATALOG_CLASS = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
+    public static final String ICEBERG_RECORD_PROCESSING_ENABLED = 
"iceberg.record.processing.enabled";
+    public static final boolean DEFAULT_RECORD_PROCESSING_ENABLED = false;
+    public static final String ICEBERG_FILES_PER_WORKUNIT = 
"iceberg.files.per.workunit";
+    public static final int DEFAULT_FILES_PER_WORKUNIT = 10;
+    public static final String ICEBERG_FILTER_ENABLED = 
"iceberg.filter.enabled";
+    public static final String ICEBERG_FILTER_EXPR = "iceberg.filter.expr"; // 
e.g., datepartition=2025-04-01
+    public static final String ICEBERG_LOOKBACK_DAYS = "iceberg.lookback.days";
+    public static final int DEFAULT_LOOKBACK_DAYS = 1;
+    public static final String ICEBERG_DATE_PARTITION_KEY = "datepartition"; 
// date partition key name
+    public static final String ICEBERG_PARTITION_KEY = "iceberg.partition.key";
+    public static final String ICEBERG_PARTITION_VALUES = 
"iceberg.partition.values";
+    public static final String ICEBERG_FILE_PARTITION_PATH = 
"iceberg.file.partition.path";
+    public static final String ICEBERG_SIMULATE = "iceberg.simulate";
+    public static final String ICEBERG_MAX_SIZE_MULTI_WORKUNITS = 
"iceberg.binPacking.maxSizePerBin";
+    public static final String ICEBERG_MAX_WORK_UNITS_PER_BIN = 
"iceberg.binPacking.maxWorkUnitsPerBin";
+    private static final String WORK_UNIT_WEIGHT = "iceberg.workUnitWeight";
+
+    private Optional<LineageInfo> lineageInfo;
+    private final WorkUnitWeighter weighter = new 
FieldWeighter(WORK_UNIT_WEIGHT);
+
+    /**
+     * Initialize file system helper based on mode (streaming vs record 
processing)
+     */
+    @Override
+    public void initFileSystemHelper(State state) throws 
FileBasedHelperException {
+        // For file streaming mode, we use IcebergFileStreamHelper
+        // For record processing mode, we'll use a different helper (future 
implementation)
+        boolean recordProcessingEnabled = state.getPropAsBoolean(
+            ICEBERG_RECORD_PROCESSING_ENABLED, 
DEFAULT_RECORD_PROCESSING_ENABLED);
+
+        if (recordProcessingEnabled) {
+            // Future: Initialize helper for record processing
+            throw new UnsupportedOperationException("Record processing mode 
not yet implemented. " +
+                "This will be added when SQL/Data Lake destinations are 
required.");
+        } else {
+            // Initialize helper for file streaming - now implements 
TimestampAwareFileBasedHelper
+            this.fsHelper = new IcebergFileStreamHelper(state);
+            this.fsHelper.connect();
+        }
+    }
+
+    /**
+     * Get work units by discovering files from Iceberg table
+     * @param state is the source state
+     * @return List<WorkUnit> list of work units
+     */
+    @Override
+    public List<WorkUnit> getWorkunits(SourceState state) {
+        this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
+
+        try {
+            initFileSystemHelper(state);
+
+            validateConfiguration(state);
+
+            IcebergCatalog catalog = createCatalog(state);
+            String database = state.getProp(ICEBERG_DATABASE_NAME);
+            String table = state.getProp(ICEBERG_TABLE_NAME);
+
+            IcebergTable icebergTable = catalog.openTable(database, table);
+            
Preconditions.checkArgument(catalog.tableAlreadyExists(icebergTable),
+                String.format("OpenHouse table not found: %s.%s", database, 
table));
+
+            List<IcebergTable.FilePathWithPartition> filesWithPartitions = 
discoverPartitionFilePaths(state, icebergTable);
+            log.info("Discovered {} files from Iceberg table {}.{}", 
filesWithPartitions.size(), database, table);
+
+            // Create work units from discovered files
+            List<WorkUnit> workUnits = 
createWorkUnitsFromFiles(filesWithPartitions, state, icebergTable);
+
+            // Handle simulate mode - log what would be copied without 
executing
+            if (state.contains(ICEBERG_SIMULATE) && 
state.getPropAsBoolean(ICEBERG_SIMULATE)) {
+                log.info("Simulate mode enabled. Will not execute the copy.");
+                logSimulateMode(workUnits, filesWithPartitions, state);
+                return Lists.newArrayList();
+            }
+
+            // Apply bin packing to work units if configured
+            List<? extends WorkUnit> packedWorkUnits = 
applyBinPacking(workUnits, state);
+            log.info("Work unit creation complete. Initial work units: {}, 
packed work units: {}",
+                workUnits.size(), packedWorkUnits.size());
+
+            return Lists.newArrayList(packedWorkUnits);
+
+        } catch (Exception e) {
+            log.error("Failed to create work units for Iceberg table", e);
+            throw new RuntimeException("Failed to create work units", e);
+        }
+    }
+
+    /**
+     * Get extractor based on mode (streaming vs record processing)
+     *
+     * @param state a {@link org.apache.gobblin.configuration.WorkUnitState} 
carrying properties needed by the returned {@link Extractor}
+     * @return
+     * @throws IOException
+     */
+    @Override
+    public Extractor<String, FileAwareInputStream> getExtractor(WorkUnitState 
state) throws IOException {
+        boolean recordProcessingEnabled = state.getPropAsBoolean(
+            ICEBERG_RECORD_PROCESSING_ENABLED, 
DEFAULT_RECORD_PROCESSING_ENABLED);
+
+        if (recordProcessingEnabled) {
+            // Return record processing extractor
+            throw new UnsupportedOperationException("Record processing mode 
not yet implemented.");
+        } else {
+            // Return file streaming extractor
+            return new IcebergFileStreamExtractor(state);
+        }
+    }
+
+    /**
+     * Discover partition data files using Iceberg TableScan API with optional 
lookback for date partitions.
+     *
+     * <p>This method supports two modes:
+     * <ol>
+     *   <li><b>Full table scan</b>: When {@code 
iceberg.filter.enabled=false}, returns all data files from current snapshot</li>
+     *   <li><b>Partition filter</b>: When {@code 
iceberg.filter.enabled=true}, uses Iceberg TableScan with partition
+     *       filter and applies lookback period for date partitions</li>
+     * </ol>
+     *
+     * <p>For date partitions (partition key = {@value 
#ICEBERG_DATE_PARTITION_KEY}), the lookback period allows copying data for the 
last N days.
+     * For example, with {@code iceberg.filter.expr=datepartition=2025-04-03} 
and {@code iceberg.lookback.days=3},
+     * this will discover files for partitions: datepartition=2025-04-03, 
datepartition=2025-04-02, datepartition=2025-04-01
+     *
+     * @param state source state containing filter configuration
+     * @param icebergTable the Iceberg table to scan
+     * @return list of file paths with partition metadata matching the filter 
criteria
+     * @throws IOException if table scan or file discovery fails
+     */
+    private List<IcebergTable.FilePathWithPartition> 
discoverPartitionFilePaths(SourceState state, IcebergTable icebergTable) throws 
IOException {
+        boolean filterEnabled = state.getPropAsBoolean(ICEBERG_FILTER_ENABLED, 
true);
+
+        if (!filterEnabled) {
+            log.info("Partition filter disabled, discovering all data files 
from current snapshot");
+            IcebergSnapshotInfo snapshot = 
icebergTable.getCurrentSnapshotInfo();
+            List<IcebergTable.FilePathWithPartition> result = 
Lists.newArrayList();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : 
snapshot.getManifestFiles()) {
+                for (String filePath : mfi.getListedFilePaths()) {
+                    result.add(new 
IcebergTable.FilePathWithPartition(filePath, Maps.newHashMap()));
+                }
+            }
+            log.info("Discovered {} data files from snapshot", result.size());
+            return result;
+        }
+
+        // Parse filter expression
+        String expr = state.getProp(ICEBERG_FILTER_EXPR);
+        Preconditions.checkArgument(!StringUtils.isBlank(expr),
+            "iceberg.filter.expr is required when 
iceberg.filter.enabled=true");
+        String[] parts = expr.split("=", 2);
+        Preconditions.checkArgument(parts.length == 2,
+            "Invalid iceberg.filter.expr. Expected key=value, got: %s", expr);
+        String key = parts[0].trim();
+        String value = parts[1].trim();
+
+        // Apply lookback period for date partitions
+        // lookbackDays=1 (default) means copy only the specified date
+        // lookbackDays=3 means copy specified date + 2 previous days (total 3 
days)
+        int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, 
DEFAULT_LOOKBACK_DAYS);
+        List<String> values = Lists.newArrayList();
+
+        if (ICEBERG_DATE_PARTITION_KEY.equals(key) && lookbackDays >= 1) {
+            log.info("Applying lookback period of {} days for date partition: 
{}", lookbackDays, value);
+            LocalDate start = LocalDate.parse(value);
+            for (int i = 0; i < lookbackDays; i++) {
+                String partitionValue = start.minusDays(i).toString();
+                values.add(partitionValue);
+                log.debug("Including partition: {}={}", 
ICEBERG_DATE_PARTITION_KEY, partitionValue);
+            }
+        } else {
+            log.error("Partition key is not correct or lookbackDays < 1, 
skipping lookback. Input: {}={}, expected: {}=<date>",
+                key, value, ICEBERG_DATE_PARTITION_KEY);
+            throw new IllegalArgumentException(String.format(
+                "Only date partition filter with lookback period is supported. 
Expected partition key: '%s', got: '%s'",
+                ICEBERG_DATE_PARTITION_KEY, key));
+        }
+
+        // Store partition info on state for downstream use (extractor, 
destination path mapping)
+        state.setProp(ICEBERG_PARTITION_KEY, key);
+        state.setProp(ICEBERG_PARTITION_VALUES, String.join(",", values));
+
+        // Use Iceberg TableScan API to get only data files (parquet/orc/avro) 
for specified partitions
+        // TableScan.planFiles() returns DataFiles only - no manifest files or 
metadata files
+        log.info("Executing TableScan with filter: {}={}", key, values);
+        Expression icebergExpr = null;
+        for (String val : values) {
+            Expression e = Expressions.equal(key, val);
+            icebergExpr = (icebergExpr == null) ? e : 
Expressions.or(icebergExpr, e);
+        }

Review Comment:
   Yes, a dedicated `IcebergExpressionGenerator` would provide easier extension 
for more filtering scenarios, similar to the `LookbackPartitionFilterGenerator` 
pattern in Hive.
   
   ### **Current Implementation**
   The inline approach covers these use cases:
   **Full table copy:** `iceberg.filter.enabled=false` (uses 
Expressions.alwaysTrue())
   **Single partition copy:** `iceberg.filter.date=2025-04-01` with 
`lookback.days=1`
   **Date range copy:** `iceberg.filter.date=2025-04-01` with `lookback.days=N` 
(creates OR expressions for N days)
   
   This enables the support for iceberg table copy to any destination & should 
be sufficient for most use-cases. We can keep the simpler inline approach for 
now and introduce IcebergExpressionGenerator in a follow-up PR to support (if 
there are requirements) for additional filter types?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to