This is an automated email from the ASF dual-hosted git repository. akudinkin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 7064c380506 [MINOR] Restoring existing behavior for `DeltaStreamer` Incremental Source (#7810) 7064c380506 is described below commit 7064c380506814964dd85773e2ee7b7f187b88c3 Author: Alexey Kudinkin <alexey.kudin...@gmail.com> AuthorDate: Wed Feb 1 11:19:45 2023 -0800 [MINOR] Restoring existing behavior for `DeltaStreamer` Incremental Source (#7810) This is restoring existing behavior for DeltaStreamer Incremental Source, as the change in #7769 removed _hoodie_partition_path field from the dataset making it impossible to be accessed from the DS Transformers for ex --- .../org/apache/hudi/config/HoodieWriteConfig.java | 2 +- .../apache/hudi/common/config/HoodieConfig.java | 8 -------- .../org/apache/hudi/utilities/UtilHelpers.java | 13 ++++++++++++ .../hudi/utilities/deltastreamer/DeltaSync.java | 8 ++------ .../hudi/utilities/sources/HoodieIncrSource.java | 23 ++++++++++++++++++++-- 5 files changed, 37 insertions(+), 17 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index e6525a2b1dc..f56defe7eac 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1034,7 +1034,7 @@ public class HoodieWriteConfig extends HoodieConfig { } public HoodieRecordMerger getRecordMerger() { - List<String> mergers = getSplitStringsOrDefault(RECORD_MERGER_IMPLS).stream() + List<String> mergers = StringUtils.split(getStringOrDefault(RECORD_MERGER_IMPLS), ",").stream() .map(String::trim) .distinct() .collect(Collectors.toList()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java index a48e4202bf9..223b93e5744 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java @@ -142,14 +142,6 @@ public class HoodieConfig implements Serializable { return StringUtils.split(getString(configProperty), delimiter); } - public <T> List<String> getSplitStringsOrDefault(ConfigProperty<T> configProperty) { - return getSplitStringsOrDefault(configProperty, ","); - } - - public <T> List<String> getSplitStringsOrDefault(ConfigProperty<T> configProperty, String delimiter) { - return StringUtils.split(getStringOrDefault(configProperty), delimiter); - } - public String getString(String key) { return props.getProperty(key); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index d159fee0be4..45a9750c3b3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -29,12 +29,16 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.ConfigUtils; +import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; @@ -109,6 +113,15 @@ public class UtilHelpers { private static final Logger LOG = LogManager.getLogger(UtilHelpers.class); + public static HoodieRecordMerger createRecordMerger(Properties props) { + List<String> recordMergerImplClasses = ConfigUtils.split2List(props.getProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), + HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue())); + HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(null, EngineType.SPARK, recordMergerImplClasses, + props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue())); + + return recordMerger; + } + public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) throws IOException { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 63cf706c174..77079fe0f4f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -42,7 +42,6 @@ import org.apache.hudi.common.model.HoodieSparkRecord; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -60,8 +59,6 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CommitUtils; -import org.apache.hudi.common.util.ConfigUtils; -import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.IdentityIterator; import org.apache.hudi.common.util.MappingIterator; import org.apache.hudi.common.util.Option; @@ -139,6 +136,7 @@ import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT; import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC; +import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; @@ -473,9 +471,7 @@ public class DeltaSync implements Serializable, Closeable { } private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSource(Option<String> resumeCheckpointStr) { - HoodieRecordType recordType = HoodieRecordUtils.createRecordMerger(null, EngineType.SPARK, - ConfigUtils.split2List(props.getProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue())), - props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue())).getRecordType(); + HoodieRecordType recordType = createRecordMerger(props).getRecordType(); if (recordType == HoodieRecordType.SPARK && HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ && HoodieLogBlockType.fromId(props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "avro")) != HoodieLogBlockType.PARQUET_DATA_BLOCK) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 7134e8ff7cc..c484038b04b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -37,6 +37,8 @@ import org.apache.spark.sql.SparkSession; import java.util.Collections; +import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger; + public class HoodieIncrSource extends RowSource { private static final Logger LOG = LogManager.getLogger(HoodieIncrSource.class); @@ -89,6 +91,12 @@ public class HoodieIncrSource extends RowSource { */ static final String SOURCE_FILE_FORMAT = "hoodie.deltastreamer.source.hoodieincr.file.format"; static final String DEFAULT_SOURCE_FILE_FORMAT = "parquet"; + + /** + * Drops all meta fields from the source hudi table while ingesting into sink hudi table. + */ + static final String HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = "hoodie.deltastreamer.source.hoodieincr.drop.all.meta.fields.from.source"; + public static final Boolean DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = false; } public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, @@ -153,8 +161,19 @@ public class HoodieIncrSource extends RowSource { queryTypeAndInstantEndpts.getRight().getRight())); } - // Remove Hoodie meta columns - final Dataset<Row> src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new)); + HoodieRecord.HoodieRecordType recordType = createRecordMerger(props).getRecordType(); + + boolean shouldDropMetaFields = props.getBoolean(Config.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE, + Config.DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE) + // NOTE: In case when Spark native [[RecordMerger]] is used, we have to make sure + // all meta-fields have been properly cleaned up from the incoming dataset + // + || recordType == HoodieRecord.HoodieRecordType.SPARK; + + // Remove Hoodie meta columns except partition path from input source + String[] colsToDrop = shouldDropMetaFields ? HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) : + HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new); + final Dataset<Row> src = source.drop(colsToDrop); return Pair.of(Option.of(src), queryTypeAndInstantEndpts.getRight().getRight()); } }