This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f2087a8944 [HUDI-6615] Fix the condition of isInputSorted in 
BulkInsertWriterHelper (#9314)
9f2087a8944 is described below

commit 9f2087a89443e93079d061fd81bf2f768f9c6953
Author: Zouxxyy <zouxinyu....@alibaba-inc.com>
AuthorDate: Thu Aug 3 08:50:31 2023 +0800

    [HUDI-6615] Fix the condition of isInputSorted in BulkInsertWriterHelper 
(#9314)
---
 .../apache/hudi/configuration/OptionsResolver.java |  8 ++++++++
 .../hudi/sink/bulk/BulkInsertWriterHelper.java     |  3 ++-
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 11 ++---------
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |  2 +-
 .../org/apache/hudi/table/HoodieTableSink.java     |  5 ++---
 .../apache/hudi/sink/ITTestDataStreamWrite.java    |  2 +-
 .../hudi/sink/bucket/ITTestBucketStreamWrite.java  | 23 +---------------------
 .../bucket/ITTestConsistentBucketStreamWrite.java  |  5 ++---
 8 files changed, 19 insertions(+), 40 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 8f4b013de04..944e795dc2f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -76,6 +76,14 @@ public class OptionsResolver {
     return operationType == WriteOperationType.INSERT;
   }
 
+  /**
+   * Returns whether the table operation is 'bulk_insert'.
+   */
+  public static boolean isBulkInsertOperation(Configuration conf) {
+    WriteOperationType operationType = 
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+    return operationType == WriteOperationType.BULK_INSERT;
+  }
+
   /**
    * Returns whether it is a MERGE_ON_READ table.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 56f668e32f0..3c0d4fb7662 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
 import org.apache.hudi.table.HoodieTable;
@@ -84,7 +85,7 @@ public class BulkInsertWriterHelper {
     this.taskEpochId = taskEpochId;
     this.rowType = preserveHoodieMetadata ? rowType : 
addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch 
up with metadata fields
     this.preserveHoodieMetadata = preserveHoodieMetadata;
-    this.isInputSorted = 
conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
+    this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) && 
conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
     this.fileIdPrefix = UUID.randomUUID().toString();
     this.keyGen = preserveHoodieMetadata ? null : RowDataKeyGen.instance(conf, 
rowType);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 5d945d07aa1..fe51fe435e1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -202,19 +202,12 @@ public class Pipelines {
    * @param conf       The configuration
    * @param rowType    The input row type
    * @param dataStream The input data stream
-   * @param bounded    Whether the input stream is bounded
    * @return the appending data stream sink
    */
   public static DataStream<Object> append(
       Configuration conf,
       RowType rowType,
-      DataStream<RowData> dataStream,
-      boolean bounded) {
-    if (!bounded) {
-      // In principle, the config should be immutable, but the boundedness
-      // is only visible when creating the sink pipeline.
-      conf.setBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT, false);
-    }
+      DataStream<RowData> dataStream) {
     WriteOperatorFactory<RowData> operatorFactory = 
AppendWriteOperator.getFactory(conf, rowType);
 
     return dataStream
@@ -469,7 +462,7 @@ public class Pipelines {
     }
     return clusteringStream.addSink(new ClusteringCommitSink(conf))
         .name("clustering_commit")
-        .setParallelism(1); // compaction commit should be singleton
+        .setParallelism(1); // clustering commit should be singleton
   }
 
   public static DataStreamSink<Object> clean(Configuration conf, 
DataStream<Object> dataStream) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index b45f9ca3879..62d22869f64 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -103,7 +103,7 @@ public class HoodieFlinkStreamer {
     DataStream<Object> pipeline;
     // Append mode
     if (OptionsResolver.isAppendMode(conf)) {
-      pipeline = Pipelines.append(conf, rowType, dataStream, false);
+      pipeline = Pipelines.append(conf, rowType, dataStream);
       if (OptionsResolver.needsAsyncClustering(conf)) {
         Pipelines.cluster(conf, rowType, pipeline);
       } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 0096fb3476f..ec0db6b1262 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -85,14 +85,13 @@ public class HoodieTableSink implements
       RowType rowType = (RowType) 
schema.toSinkRowDataType().notNull().getLogicalType();
 
       // bulk_insert mode
-      final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
-      if (WriteOperationType.fromValue(writeOperation) == 
WriteOperationType.BULK_INSERT) {
+      if (OptionsResolver.isBulkInsertOperation(conf)) {
         return Pipelines.bulkInsert(conf, rowType, dataStream);
       }
 
       // Append mode
       if (OptionsResolver.isAppendMode(conf)) {
-        DataStream<Object> pipeline = Pipelines.append(conf, rowType, 
dataStream, context.isBounded());
+        DataStream<Object> pipeline = Pipelines.append(conf, rowType, 
dataStream);
         if (OptionsResolver.needsAsyncClustering(conf)) {
           return Pipelines.cluster(conf, rowType, pipeline);
         } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 2aec8e5d5fc..954ca6593c3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -314,7 +314,7 @@ public class ITTestDataStreamWrite extends TestLogger {
         .setParallelism(4);
 
     OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
-    DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, 
true);
+    DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream);
     execEnv.addOperator(pipeline.getTransformation());
 
     Pipelines.cluster(conf, rowType, pipeline);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
index ecd31cd719e..3d6d0918ef0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
@@ -27,15 +27,12 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.index.HoodieIndex.IndexType;
-import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
-import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.FlinkMiniCluster;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestSQL;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
@@ -95,26 +92,8 @@ public class ITTestBucketStreamWrite {
   }
 
   private static void doDeleteCommit(String tablePath, boolean isCow) throws 
Exception {
-    // make configuration and setAvroSchema
-    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
-    cfg.path = tablePath;
-    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
-
     // create metaClient
-    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
-
-    conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableType().name());
-
-    // set the table name
-    conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
-
-    // set record key field
-    conf.setString(FlinkOptions.RECORD_KEY_FIELD, 
metaClient.getTableConfig().getRecordKeyFieldProp());
-    // set partition field
-    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, 
metaClient.getTableConfig().getPartitionFieldProp());
-
-    // set table schema
-    CompactionUtil.setAvroSchema(conf, metaClient);
+    HoodieTableMetaClient metaClient = 
StreamerUtil.createMetaClient(tablePath, new 
org.apache.hadoop.conf.Configuration());
 
     // should only contain one instant
     HoodieTimeline activeCompletedTimeline = 
metaClient.getActiveTimeline().filterCompletedInstants();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
index 4882552f0a7..5309b2225fb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
@@ -21,11 +21,11 @@ package org.apache.hudi.sink.bucket;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsInference;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.utils.Pipelines;
 import org.apache.hudi.util.AvroSchemaConverter;
@@ -187,8 +187,7 @@ public class ITTestConsistentBucketStreamWrite extends 
TestLogger {
     OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
     DataStream<HoodieRecord> hoodieRecordDataStream = 
Pipelines.bootstrap(conf, rowType, dataStream);
     // bulk_insert mode
-    final String writeOperation = conf.get(FlinkOptions.OPERATION);
-    if (WriteOperationType.fromValue(writeOperation) == 
WriteOperationType.BULK_INSERT) {
+    if (OptionsResolver.isBulkInsertOperation(conf)) {
       Pipelines.bulkInsert(conf, rowType, dataStream);
     } else {
       DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, 
hoodieRecordDataStream);

Reply via email to