danny0405 commented on code in PR #7609: URL: https://github.com/apache/hudi/pull/7609#discussion_r1063077476
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java: ########## @@ -18,6 +18,7 @@ package org.apache.hudi.sink.compact; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.hudi.client.HoodieFlinkWriteClient; Review Comment: Fix the import sequence. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java: ########## @@ -137,7 +146,8 @@ private void doCompaction(String instantTime, compactionOperation, instantTime, maxInstantTime, writeClient.getHoodieTable().getTaskContextSupplier()); - collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID)); + collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), Review Comment: Unnecessary change. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java: ########## @@ -98,10 +99,16 @@ public void open() throws Exception { } @Override - public void processWatermark(Watermark mark) { + public void processWatermark(Watermark mark) throws Exception { // no need to propagate the watermark } + @Override + public void processLatencyMarker(LatencyMarker latencyMarker) + throws Exception { + // no need to propagate the latencyMarker Review Comment: No need to throw exception. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java: ########## @@ -111,7 +118,9 @@ public void processElement(StreamRecord<CompactionPlanEvent> record) throws Exce // executes the compaction task asynchronously to not block the checkpoint barrier propagate. executor.execute( () -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()), - (errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)), + (errMsg, t) -> { + collector.collect(new CompactionCommitEvent(instantTime, Review Comment: Unnecessary change. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java: ########## @@ -98,10 +99,16 @@ public void open() throws Exception { } @Override - public void processWatermark(Watermark mark) { + public void processWatermark(Watermark mark) throws Exception { // no need to propagate the watermark Review Comment: Unnecessary change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org