This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 9f2087a8944 [HUDI-6615] Fix the condition of isInputSorted in BulkInsertWriterHelper (#9314) 9f2087a8944 is described below commit 9f2087a89443e93079d061fd81bf2f768f9c6953 Author: Zouxxyy <zouxinyu....@alibaba-inc.com> AuthorDate: Thu Aug 3 08:50:31 2023 +0800 [HUDI-6615] Fix the condition of isInputSorted in BulkInsertWriterHelper (#9314) --- .../apache/hudi/configuration/OptionsResolver.java | 8 ++++++++ .../hudi/sink/bulk/BulkInsertWriterHelper.java | 3 ++- .../java/org/apache/hudi/sink/utils/Pipelines.java | 11 ++--------- .../apache/hudi/streamer/HoodieFlinkStreamer.java | 2 +- .../org/apache/hudi/table/HoodieTableSink.java | 5 ++--- .../apache/hudi/sink/ITTestDataStreamWrite.java | 2 +- .../hudi/sink/bucket/ITTestBucketStreamWrite.java | 23 +--------------------- .../bucket/ITTestConsistentBucketStreamWrite.java | 5 ++--- 8 files changed, 19 insertions(+), 40 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 8f4b013de04..944e795dc2f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -76,6 +76,14 @@ public class OptionsResolver { return operationType == WriteOperationType.INSERT; } + /** + * Returns whether the table operation is 'bulk_insert'. + */ + public static boolean isBulkInsertOperation(Configuration conf) { + WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); + return operationType == WriteOperationType.BULK_INSERT; + } + /** * Returns whether it is a MERGE_ON_READ table. */ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java index 56f668e32f0..3c0d4fb7662 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle; import org.apache.hudi.table.HoodieTable; @@ -84,7 +85,7 @@ public class BulkInsertWriterHelper { this.taskEpochId = taskEpochId; this.rowType = preserveHoodieMetadata ? rowType : addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch up with metadata fields this.preserveHoodieMetadata = preserveHoodieMetadata; - this.isInputSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT); + this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) && conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT); this.fileIdPrefix = UUID.randomUUID().toString(); this.keyGen = preserveHoodieMetadata ? null : RowDataKeyGen.instance(conf, rowType); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 5d945d07aa1..fe51fe435e1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -202,19 +202,12 @@ public class Pipelines { * @param conf The configuration * @param rowType The input row type * @param dataStream The input data stream - * @param bounded Whether the input stream is bounded * @return the appending data stream sink */ public static DataStream<Object> append( Configuration conf, RowType rowType, - DataStream<RowData> dataStream, - boolean bounded) { - if (!bounded) { - // In principle, the config should be immutable, but the boundedness - // is only visible when creating the sink pipeline. - conf.setBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT, false); - } + DataStream<RowData> dataStream) { WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType); return dataStream @@ -469,7 +462,7 @@ public class Pipelines { } return clusteringStream.addSink(new ClusteringCommitSink(conf)) .name("clustering_commit") - .setParallelism(1); // compaction commit should be singleton + .setParallelism(1); // clustering commit should be singleton } public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index b45f9ca3879..62d22869f64 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -103,7 +103,7 @@ public class HoodieFlinkStreamer { DataStream<Object> pipeline; // Append mode if (OptionsResolver.isAppendMode(conf)) { - pipeline = Pipelines.append(conf, rowType, dataStream, false); + pipeline = Pipelines.append(conf, rowType, dataStream); if (OptionsResolver.needsAsyncClustering(conf)) { Pipelines.cluster(conf, rowType, pipeline); } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 0096fb3476f..ec0db6b1262 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -85,14 +85,13 @@ public class HoodieTableSink implements RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType(); // bulk_insert mode - final String writeOperation = this.conf.get(FlinkOptions.OPERATION); - if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) { + if (OptionsResolver.isBulkInsertOperation(conf)) { return Pipelines.bulkInsert(conf, rowType, dataStream); } // Append mode if (OptionsResolver.isAppendMode(conf)) { - DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded()); + DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream); if (OptionsResolver.needsAsyncClustering(conf)) { return Pipelines.cluster(conf, rowType, pipeline); } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 2aec8e5d5fc..954ca6593c3 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -314,7 +314,7 @@ public class ITTestDataStreamWrite extends TestLogger { .setParallelism(4); OptionsInference.setupSinkTasks(conf, execEnv.getParallelism()); - DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, true); + DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream); execEnv.addOperator(pipeline.getTransformation()); Pipelines.cluster(conf, rowType, pipeline); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java index ecd31cd719e..3d6d0918ef0 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java @@ -27,15 +27,12 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.index.HoodieIndex.IndexType; -import org.apache.hudi.sink.clustering.FlinkClusteringConfig; -import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.FlinkMiniCluster; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; import org.apache.hudi.utils.TestSQL; -import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.internal.TableEnvironmentImpl; @@ -95,26 +92,8 @@ public class ITTestBucketStreamWrite { } private static void doDeleteCommit(String tablePath, boolean isCow) throws Exception { - // make configuration and setAvroSchema - FlinkClusteringConfig cfg = new FlinkClusteringConfig(); - cfg.path = tablePath; - Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); - // create metaClient - HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); - - conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableType().name()); - - // set the table name - conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); - - // set record key field - conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp()); - // set partition field - conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp()); - - // set table schema - CompactionUtil.setAvroSchema(conf, metaClient); + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(tablePath, new org.apache.hadoop.conf.Configuration()); // should only contain one instant HoodieTimeline activeCompletedTimeline = metaClient.getActiveTimeline().filterCompletedInstants(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java index 4882552f0a7..5309b2225fb 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java @@ -21,11 +21,11 @@ package org.apache.hudi.sink.bucket; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsInference; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.util.AvroSchemaConverter; @@ -187,8 +187,7 @@ public class ITTestConsistentBucketStreamWrite extends TestLogger { OptionsInference.setupSinkTasks(conf, execEnv.getParallelism()); DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, dataStream); // bulk_insert mode - final String writeOperation = conf.get(FlinkOptions.OPERATION); - if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) { + if (OptionsResolver.isBulkInsertOperation(conf)) { Pipelines.bulkInsert(conf, rowType, dataStream); } else { DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);