This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 86d1e39fb4e [HUDI-3661] Flink async compaction is not thread safe when 
use watermark (#7399)
86d1e39fb4e is described below

commit 86d1e39fb4e971b11e8c6394f6611b7bd7089bd4
Author: Danny Chan <yuzhao....@gmail.com>
AuthorDate: Wed Dec 7 18:31:26 2022 +0800

    [HUDI-3661] Flink async compaction is not thread safe when use watermark 
(#7399)
---
 .../hudi/sink/clustering/ClusteringOperator.java   |  8 +++-
 .../{CompactFunction.java => CompactOperator.java} | 31 ++++++++++---
 .../hudi/sink/compact/CompactionCommitEvent.java   |  2 +-
 .../hudi/sink/compact/HoodieFlinkCompactor.java    |  3 +-
 .../java/org/apache/hudi/sink/utils/Pipelines.java |  5 +-
 .../sink/compact/ITTestHoodieFlinkCompactor.java   |  5 +-
 .../hudi/sink/utils/ClusteringFunctionWrapper.java |  2 +-
 .../hudi/sink/utils/CompactFunctionWrapper.java    | 53 +++++++++++-----------
 .../sink/utils/StreamWriteFunctionWrapper.java     |  2 +-
 9 files changed, 66 insertions(+), 45 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index ca1cd54c1fe..e7bde41ca8b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -60,6 +60,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
@@ -159,7 +160,12 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
       this.executor = NonThrownExecutor.builder(LOG).build();
     }
 
-    collector = new StreamRecordCollector<>(output);
+    this.collector = new StreamRecordCollector<>(output);
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) {
+    // no need to propagate the watermark
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
similarity index 82%
rename from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
rename to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
index db05b0dbabe..65f70ad6aaf 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
@@ -32,7 +32,11 @@ import org.apache.hudi.util.FlinkWriteClients;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,11 +45,12 @@ import java.io.IOException;
 import java.util.List;
 
 /**
- * Function to execute the actual compaction task assigned by the compaction 
plan task.
+ * Operator to execute the actual compaction task assigned by the compaction 
plan task.
  * In order to execute scalable, the input should shuffle by the compact event 
{@link CompactionPlanEvent}.
  */
-public class CompactFunction extends ProcessFunction<CompactionPlanEvent, 
CompactionCommitEvent> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(CompactFunction.class);
+public class CompactOperator extends TableStreamOperator<CompactionCommitEvent>
+    implements OneInputStreamOperator<CompactionPlanEvent, 
CompactionCommitEvent> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactOperator.class);
 
   /**
    * Config options.
@@ -72,22 +77,34 @@ public class CompactFunction extends 
ProcessFunction<CompactionPlanEvent, Compac
    */
   private transient NonThrownExecutor executor;
 
-  public CompactFunction(Configuration conf) {
+  /**
+   * Output records collector.
+   */
+  private transient StreamRecordCollector<CompactionCommitEvent> collector;
+
+  public CompactOperator(Configuration conf) {
     this.conf = conf;
     this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
   }
 
   @Override
-  public void open(Configuration parameters) throws Exception {
+  public void open() throws Exception {
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
     this.writeClient = FlinkWriteClients.createWriteClient(conf, 
getRuntimeContext());
     if (this.asyncCompaction) {
       this.executor = NonThrownExecutor.builder(LOG).build();
     }
+    this.collector = new StreamRecordCollector<>(output);
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) {
+    // no need to propagate the watermark
   }
 
   @Override
-  public void processElement(CompactionPlanEvent event, Context context, 
Collector<CompactionCommitEvent> collector) throws Exception {
+  public void processElement(StreamRecord<CompactionPlanEvent> record) throws 
Exception {
+    final CompactionPlanEvent event = record.getValue();
     final String instantTime = event.getCompactionInstantTime();
     final CompactionOperation compactionOperation = event.getOperation();
     if (asyncCompaction) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
index 398dfcf6195..faad4c2338d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
@@ -24,7 +24,7 @@ import java.io.Serializable;
 import java.util.List;
 
 /**
- * Represents a commit event from the compaction task {@link CompactFunction}.
+ * Represents a commit event from the compaction task {@link CompactOperator}.
  */
 public class CompactionCommitEvent implements Serializable {
   private static final long serialVersionUID = 1L;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 34576cfb017..1475a493c1a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -41,7 +41,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import 
org.apache.flink.client.deployment.application.ApplicationExecutionException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -311,7 +310,7 @@ public class HoodieFlinkCompactor {
           .rebalance()
           .transform("compact_task",
               TypeInformation.of(CompactionCommitEvent.class),
-              new ProcessOperator<>(new CompactFunction(conf)))
+              new CompactOperator(conf))
           .setParallelism(compactionParallelism)
           .addSink(new CompactionCommitSink(conf))
           .name("compaction_commit")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index a045a9276c5..d17213dcc04 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -39,7 +39,7 @@ import org.apache.hudi.sink.clustering.ClusteringOperator;
 import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
 import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
 import org.apache.hudi.sink.common.WriteOperatorFactory;
-import org.apache.hudi.sink.compact.CompactFunction;
+import org.apache.hudi.sink.compact.CompactOperator;
 import org.apache.hudi.sink.compact.CompactionCommitEvent;
 import org.apache.hudi.sink.compact.CompactionCommitSink;
 import org.apache.hudi.sink.compact.CompactionPlanEvent;
@@ -57,7 +57,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -372,7 +371,7 @@ public class Pipelines {
         .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
         .transform("compact_task",
             TypeInformation.of(CompactionCommitEvent.class),
-            new ProcessOperator<>(new CompactFunction(conf)))
+            new CompactOperator(conf))
         .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
         .addSink(new CompactionCommitSink(conf))
         .name("compact_commit")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index 711b738288f..6157b5e9011 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -39,7 +39,6 @@ import org.apache.hudi.utils.TestSQL;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -158,7 +157,7 @@ public class ITTestHoodieFlinkCompactor {
         .rebalance()
         .transform("compact_task",
             TypeInformation.of(CompactionCommitEvent.class),
-            new ProcessOperator<>(new CompactFunction(conf)))
+            new CompactOperator(conf))
         .setParallelism(FlinkMiniCluster.DEFAULT_PARALLELISM)
         .addSink(new CompactionCommitSink(conf))
         .name("clean_commits")
@@ -282,7 +281,7 @@ public class ITTestHoodieFlinkCompactor {
         .rebalance()
         .transform("compact_task",
             TypeInformation.of(CompactionCommitEvent.class),
-            new ProcessOperator<>(new CompactFunction(conf)))
+            new CompactOperator(conf))
         .setParallelism(1)
         .addSink(new CompactionCommitSink(conf))
         .name("compaction_commit")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
index 55a79915d47..e3b75cbf637 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
@@ -69,6 +69,7 @@ public class ClusteringFunctionWrapper {
   private ClusteringCommitSink commitSink;
 
   public ClusteringFunctionWrapper(Configuration conf, StreamTask<?, ?> 
streamTask, StreamConfig streamConfig) {
+    this.conf = conf;
     this.ioManager = new IOManagerAsync();
     MockEnvironment environment = new MockEnvironmentBuilder()
         .setTaskName("mockTask")
@@ -76,7 +77,6 @@ public class ClusteringFunctionWrapper {
         .setIOManager(ioManager)
         .build();
     this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, 
environment);
-    this.conf = conf;
     this.streamTask = streamTask;
     this.streamConfig = streamConfig;
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
index 1dba81ce2b7..78a8305c9c5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.sink.utils;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.sink.compact.CompactFunction;
+import org.apache.hudi.sink.compact.CompactOperator;
 import org.apache.hudi.sink.compact.CompactionCommitEvent;
 import org.apache.hudi.sink.compact.CompactionCommitSink;
 import org.apache.hudi.sink.compact.CompactionPlanEvent;
@@ -33,14 +33,14 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 
 /**
- * A wrapper class to manipulate the {@link 
org.apache.hudi.sink.compact.CompactFunction} instance for testing.
+ * A wrapper class to manipulate the {@link CompactOperator} instance for 
testing.
  */
 public class CompactFunctionWrapper {
   private final Configuration conf;
@@ -48,20 +48,28 @@ public class CompactFunctionWrapper {
   private final IOManager ioManager;
   private final StreamingRuntimeContext runtimeContext;
 
+  private final StreamTask<?, ?> streamTask;
+  private final StreamConfig streamConfig;
+
   /**
    * Function that generates the {@link HoodieCompactionPlan}.
    */
   private CompactionPlanOperator compactionPlanOperator;
+  /**
+   * Output to collect the compaction commit events.
+   */
+  private CollectorOutput<CompactionCommitEvent> commitEventOutput;
   /**
    * Function that executes the compaction task.
    */
-  private CompactFunction compactFunction;
+  private CompactOperator compactOperator;
   /**
    * Stream sink to handle compaction commits.
    */
   private CompactionCommitSink commitSink;
 
-  public CompactFunctionWrapper(Configuration conf) throws Exception {
+  public CompactFunctionWrapper(Configuration conf, StreamTask<?, ?> 
streamTask, StreamConfig streamConfig) {
+    this.conf = conf;
     this.ioManager = new IOManagerAsync();
     MockEnvironment environment = new MockEnvironmentBuilder()
         .setTaskName("mockTask")
@@ -69,19 +77,23 @@ public class CompactFunctionWrapper {
         .setIOManager(ioManager)
         .build();
     this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, 
environment);
-    this.conf = conf;
+    this.streamTask = streamTask;
+    this.streamConfig = streamConfig;
   }
 
   public void openFunction() throws Exception {
     compactionPlanOperator = new CompactionPlanOperator(conf);
     compactionPlanOperator.open();
 
-    compactFunction = new CompactFunction(conf);
-    compactFunction.setRuntimeContext(runtimeContext);
-    compactFunction.open(conf);
+    compactOperator = new CompactOperator(conf);
+    // CAUTION: deprecated API used.
+    compactOperator.setProcessingTimeService(new TestProcessingTimeService());
+    commitEventOutput = new CollectorOutput<>();
+    compactOperator.setup(streamTask, streamConfig, commitEventOutput);
+    compactOperator.open();
     final NonThrownExecutor syncExecutor = new MockCoordinatorExecutor(
         new MockOperatorCoordinatorContext(new OperatorID(), 1));
-    compactFunction.setExecutor(syncExecutor);
+    compactOperator.setExecutor(syncExecutor);
 
     commitSink = new CompactionCommitSink(conf);
     commitSink.setRuntimeContext(runtimeContext);
@@ -94,22 +106,11 @@ public class CompactFunctionWrapper {
     compactionPlanOperator.setOutput(output);
     compactionPlanOperator.notifyCheckpointComplete(checkpointID);
     // collect the CompactCommitEvents
-    List<CompactionCommitEvent> compactCommitEvents = new ArrayList<>();
     for (CompactionPlanEvent event : output.getRecords()) {
-      compactFunction.processElement(event, null, new 
Collector<CompactionCommitEvent>() {
-        @Override
-        public void collect(CompactionCommitEvent event) {
-          compactCommitEvents.add(event);
-        }
-
-        @Override
-        public void close() {
-
-        }
-      });
+      compactOperator.processElement(new StreamRecord<>(event));
     }
     // handle and commit the compaction
-    for (CompactionCommitEvent event : compactCommitEvents) {
+    for (CompactionCommitEvent event : commitEventOutput.getRecords()) {
       commitSink.invoke(event, null);
     }
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index a1a14456e3c..db8ff36962b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -117,7 +117,6 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     // one function
     this.coordinatorContext = new MockOperatorCoordinatorContext(new 
OperatorID(), 1);
     this.coordinator = new StreamWriteOperatorCoordinator(conf, 
this.coordinatorContext);
-    this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
     this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
     this.stateInitializationContext = new MockStateInitializationContext();
     this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
@@ -127,6 +126,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
         .setConfig(new StreamConfig(conf))
         .setExecutionConfig(new ExecutionConfig().enableObjectReuse())
         .build();
+    this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf, 
this.streamTask, this.streamConfig);
   }
 
   public void openFunction() throws Exception {

Reply via email to