Blazer-007 commented on code in PR #4146:
URL: https://github.com/apache/gobblin/pull/4146#discussion_r2510048427


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.base.Optional;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopySource;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
+import org.apache.gobblin.source.extractor.filebased.FileBasedSource;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitWeighter;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.binpacking.FieldWeighter;
+import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+
+/**
+ * Unified Iceberg source that supports partition-based data copying from 
Iceberg tables.
+ *
+ * This source reads job configuration, applies date partition filters with 
optional lookback period,
+ * and uses Iceberg's TableScan API to enumerate data files for specific 
partitions. It groups files
+ * into work units for parallel processing.
+ *
+ * <pre>
+ * # Basic configuration
+ * source.class=org.apache.gobblin.data.management.copy.iceberg.IcebergSource
+ * iceberg.database.name=db1
+ * iceberg.table.name=table1
+ * iceberg.catalog.uri=ICEBERG_CATALOG_URI
+ *
+ * # Partition filtering with lookback - Static date
+ * iceberg.filter.enabled=true
+ * iceberg.partition.column=datepartition  # Optional, defaults to 
"datepartition"
+ * iceberg.filter.date=2025-04-01         # Date value
+ * iceberg.lookback.days=3
+ *
+ * # Partition filtering with lookback - Scheduled flows (dynamic date)
+ * iceberg.filter.enabled=true
+ * iceberg.filter.date=CURRENT_DATE       # Uses current date at runtime
+ * iceberg.lookback.days=3
+ *
+ * # Copy all data (no filtering)
+ * iceberg.filter.enabled=false
+ * # No filter.date needed - will copy all partitions from current snapshot
+ *
+ * # Bin packing for better resource utilization
+ * gobblin.copy.binPacking.maxSizePerBin=1000000000  # 1GB per bin
+ * </pre>
+ */
+@Slf4j
+public class IcebergSource extends FileBasedSource<String, 
FileAwareInputStream> {
+
+  public static final String ICEBERG_DATABASE_NAME = "iceberg.database.name";
+  public static final String ICEBERG_TABLE_NAME = "iceberg.table.name";
+  public static final String ICEBERG_CATALOG_URI = "iceberg.catalog.uri";
+  public static final String ICEBERG_CATALOG_CLASS = "iceberg.catalog.class";
+  public static final String DEFAULT_ICEBERG_CATALOG_CLASS = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
+  public static final String ICEBERG_RECORD_PROCESSING_ENABLED = 
"iceberg.record.processing.enabled";
+  public static final boolean DEFAULT_RECORD_PROCESSING_ENABLED = false;
+  public static final String ICEBERG_FILES_PER_WORKUNIT = 
"iceberg.files.per.workunit";
+  public static final int DEFAULT_FILES_PER_WORKUNIT = 10;
+  public static final String ICEBERG_FILTER_ENABLED = "iceberg.filter.enabled";
+  public static final String ICEBERG_FILTER_DATE = "iceberg.filter.date"; // 
Date value (e.g., 2025-04-01 or CURRENT_DATE)
+  public static final String ICEBERG_LOOKBACK_DAYS = "iceberg.lookback.days";
+  public static final int DEFAULT_LOOKBACK_DAYS = 1;
+  public static final String ICEBERG_PARTITION_COLUMN = 
"iceberg.partition.column"; // configurable partition column name
+  public static final String DEFAULT_DATE_PARTITION_COLUMN = "datepartition"; 
// default date partition column name
+  public static final String CURRENT_DATE_PLACEHOLDER = "CURRENT_DATE"; // 
placeholder for current date
+  public static final String ICEBERG_PARTITION_KEY = "iceberg.partition.key";
+  public static final String ICEBERG_PARTITION_VALUES = 
"iceberg.partition.values";
+  public static final String ICEBERG_FILE_PARTITION_PATH = 
"iceberg.file.partition.path";
+  private static final String WORK_UNIT_WEIGHT = "iceberg.workUnitWeight";
+
+  private Optional<LineageInfo> lineageInfo;
+  private final WorkUnitWeighter weighter = new 
FieldWeighter(WORK_UNIT_WEIGHT);
+
+  /**
+   * Initialize file system helper based on mode (streaming vs record 
processing)
+   */
+  @Override
+  public void initFileSystemHelper(State state) throws 
FileBasedHelperException {
+    // For file streaming mode, we use IcebergFileStreamHelper
+    // For record processing mode, we'll use a different helper (future 
implementation)
+    boolean recordProcessingEnabled = state.getPropAsBoolean(
+      ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED);
+
+    if (recordProcessingEnabled) {
+      // Future: Initialize helper for record processing
+      throw new UnsupportedOperationException("Record processing mode not yet 
implemented. " +
+        "This will be added when SQL/Data Lake destinations are required.");
+    } else {
+      // Initialize helper for file streaming - now implements 
TimestampAwareFileBasedHelper
+      this.fsHelper = new IcebergFileStreamHelper(state);
+      this.fsHelper.connect();
+    }
+  }
+
+  /**
+   * Get work units by discovering files from Iceberg table
+   * @param state is the source state
+   * @return List<WorkUnit> list of work units
+   */
+  @Override
+  public List<WorkUnit> getWorkunits(SourceState state) {
+    this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
+
+    try {
+      initFileSystemHelper(state);
+
+      validateConfiguration(state);
+
+      IcebergCatalog catalog = createCatalog(state);
+      String database = state.getProp(ICEBERG_DATABASE_NAME);
+      String table = state.getProp(ICEBERG_TABLE_NAME);
+      IcebergTable icebergTable = catalog.openTable(database, table);
+
+      List<IcebergTable.FilePathWithPartition> filesWithPartitions = 
discoverPartitionFilePaths(state, icebergTable);
+      log.info("Discovered {} files from Iceberg table {}.{}", 
filesWithPartitions.size(), database, table);
+
+      // Create work units from discovered files
+      List<WorkUnit> workUnits = createWorkUnitsFromFiles(filesWithPartitions, 
state, icebergTable);
+
+      // Handle simulate mode - log what would be copied without executing
+      if (state.contains(CopySource.SIMULATE) && 
state.getPropAsBoolean(CopySource.SIMULATE)) {
+        log.info("Simulate mode enabled. Will not execute the copy.");
+        logSimulateMode(workUnits, filesWithPartitions, state);
+        return Lists.newArrayList();
+      }
+
+      // Apply bin packing to work units if configured
+      List<? extends WorkUnit> packedWorkUnits = applyBinPacking(workUnits, 
state);
+      log.info("Work unit creation complete. Initial work units: {}, packed 
work units: {}",
+        workUnits.size(), packedWorkUnits.size());
+
+      return Lists.newArrayList(packedWorkUnits);
+
+    } catch (Exception e) {
+      log.error("Failed to create work units for Iceberg table", e);
+      throw new RuntimeException("Failed to create work units", e);
+    }
+  }
+
+  /**
+   * Get extractor based on mode (streaming vs record processing)
+   *
+   * @param state a {@link org.apache.gobblin.configuration.WorkUnitState} 
carrying properties needed by the returned {@link Extractor}
+   * @return
+   * @throws IOException
+   */
+  @Override
+  public Extractor<String, FileAwareInputStream> getExtractor(WorkUnitState 
state) throws IOException {
+    boolean recordProcessingEnabled = state.getPropAsBoolean(
+      ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED);
+
+    if (recordProcessingEnabled) {
+      // Return record processing extractor
+      throw new UnsupportedOperationException("Record processing mode not yet 
implemented.");
+    } else {
+      // Return file streaming extractor
+      return new IcebergFileStreamExtractor(state);
+    }
+  }
+
+  /**
+   * Discover partition data files using Iceberg TableScan API with optional 
lookback for date partitions.
+   *
+   * <p>This method supports three modes:
+   * <ol>
+   * <li><b>Full table scan (copy all data)</b>: Set {@code 
iceberg.filter.enabled=false}.
+   * Returns all data files from current snapshot. Use this for one-time full 
copies or backfills.</li>
+   * <li><b>Static date partition filter</b>: Set {@code 
iceberg.filter.enabled=true} with a specific date
+   * (e.g., {@code iceberg.filter.date=2025-04-01}). Use this for ad-hoc 
historical data copies.</li>
+   * <li><b>Dynamic date partition filter</b>: Set {@code 
iceberg.filter.enabled=true} with
+   * {@code iceberg.filter.date=CURRENT_DATE}. The {@value 
#CURRENT_DATE_PLACEHOLDER} placeholder
+   * is resolved to the current date at runtime. Use this for daily scheduled 
flows.</li>
+   * </ol>
+   *
+   * <p>The partition column name is configurable via {@code 
iceberg.partition.column}
+   * (defaults to {@value #DEFAULT_DATE_PARTITION_COLUMN}). The date value is 
specified separately via
+   * {@code iceberg.filter.date}, eliminating redundancy and reducing 
configuration errors.
+   *
+   * <p><b>Configuration Examples:</b>
+   * <ul>
+   * <li>Static: {@code iceberg.partition.column=datepartition, 
iceberg.filter.date=2025-04-03, iceberg.lookback.days=3}
+   * discovers: datepartition=2025-04-03, 2025-04-02, 2025-04-01</li>
+   * <li>Dynamic: {@code iceberg.filter.date=CURRENT_DATE, 
iceberg.lookback.days=1}
+   * discovers today's partition only (resolved at runtime)</li>
+   * </ul>
+   *
+   * @param state source state containing filter configuration
+   * @param icebergTable the Iceberg table to scan
+   * @return list of file paths with partition metadata matching the filter 
criteria
+   * @throws IOException if table scan or file discovery fails
+   */
+  private List<IcebergTable.FilePathWithPartition> 
discoverPartitionFilePaths(SourceState state, IcebergTable icebergTable) throws 
IOException {
+    boolean filterEnabled = state.getPropAsBoolean(ICEBERG_FILTER_ENABLED, 
true);
+
+    if (!filterEnabled) {
+      log.info("Partition filter disabled, discovering all data files with 
partition metadata from current snapshot");
+      // Use TableScan without filter to get all files with partition metadata 
preserved
+      // This ensures partition structure is maintained even for full table 
copies
+      List<IcebergTable.FilePathWithPartition> result = 
icebergTable.getFilePathsWithPartitionsForFilter(Expressions.alwaysTrue());
+      log.info("Discovered {} data files from current snapshot with partition 
metadata", result.size());
+      return result;
+    }
+
+    String datePartitionColumn = state.getProp(ICEBERG_PARTITION_COLUMN, 
DEFAULT_DATE_PARTITION_COLUMN);
+        
+    String dateValue = state.getProp(ICEBERG_FILTER_DATE);
+    Preconditions.checkArgument(!StringUtils.isBlank(dateValue),
+      "iceberg.filter.date is required when iceberg.filter.enabled=true");
+
+    // Handle CURRENT_DATE placeholder for flows
+    if (CURRENT_DATE_PLACEHOLDER.equalsIgnoreCase(dateValue)) {
+      dateValue = LocalDate.now().toString();
+      log.info("Resolved {} placeholder to current date: {}", 
CURRENT_DATE_PLACEHOLDER, dateValue);
+    }
+
+    // Apply lookback period for date partitions
+    // lookbackDays=1 (default) means copy only the specified date
+    // lookbackDays=3 means copy specified date + 2 previous days (total 3 
days)
+    int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, 
DEFAULT_LOOKBACK_DAYS);
+    List<String> values = Lists.newArrayList();
+
+    if (lookbackDays >= 1) {
+      log.info("Applying lookback period of {} days for date partition column 
'{}': {}", lookbackDays, datePartitionColumn, dateValue);
+      LocalDate start = LocalDate.parse(dateValue);
+      for (int i = 0; i < lookbackDays; i++) {

Review Comment:
   `LocalDate.parse("yyyy-mm-dd-hh")` will give 
`java.time.format.DateTimeParseException` , we need to handle this case as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to