danny0405 commented on code in PR #7609:
URL: https://github.com/apache/hudi/pull/7609#discussion_r1063077476


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java:
##########
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.sink.compact;
 
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.hudi.client.HoodieFlinkWriteClient;

Review Comment:
   Fix the import sequence.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java:
##########
@@ -137,7 +146,8 @@ private void doCompaction(String instantTime,
         compactionOperation,
         instantTime, maxInstantTime,
         writeClient.getHoodieTable().getTaskContextSupplier());
-    collector.collect(new CompactionCommitEvent(instantTime, 
compactionOperation.getFileId(), writeStatuses, taskID));
+    collector.collect(new CompactionCommitEvent(instantTime, 
compactionOperation.getFileId(),

Review Comment:
   Unnecessary change.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java:
##########
@@ -98,10 +99,16 @@ public void open() throws Exception {
   }
 
   @Override
-  public void processWatermark(Watermark mark) {
+  public void processWatermark(Watermark mark) throws Exception {
     // no need to propagate the watermark
   }
 
+  @Override
+  public void processLatencyMarker(LatencyMarker latencyMarker)
+      throws Exception {
+    // no need to propagate the latencyMarker

Review Comment:
   No need to throw exception.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java:
##########
@@ -111,7 +118,9 @@ public void 
processElement(StreamRecord<CompactionPlanEvent> record) throws Exce
       // executes the compaction task asynchronously to not block the 
checkpoint barrier propagate.
       executor.execute(
           () -> doCompaction(instantTime, compactionOperation, collector, 
reloadWriteConfig()),
-          (errMsg, t) -> collector.collect(new 
CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
+          (errMsg, t) -> {
+            collector.collect(new CompactionCommitEvent(instantTime,

Review Comment:
   Unnecessary change.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java:
##########
@@ -98,10 +99,16 @@ public void open() throws Exception {
   }
 
   @Override
-  public void processWatermark(Watermark mark) {
+  public void processWatermark(Watermark mark) throws Exception {
     // no need to propagate the watermark

Review Comment:
   Unnecessary change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to