This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch release-3.1
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/release-3.1 by this push:
     new 759b29449 [FLINK-35255][cdc][runtime] DataSinkWriterOperator adds 
overrides for the snapshotState and processWatermark methods (#3279)
759b29449 is described below

commit 759b2944968aec6de0aca97d1a5329ecaada1c56
Author: yanghuaiGit <38883656+yanghuai...@users.noreply.github.com>
AuthorDate: Mon Apr 29 15:24:05 2024 +0800

    [FLINK-35255][cdc][runtime] DataSinkWriterOperator adds overrides for the 
snapshotState and processWatermark methods (#3279)
---
 .../operators/sink/DataSinkWriterOperator.java     | 23 ++++++++++++++++++++++
 1 file changed, 23 insertions(+)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
index c8056cc7a..bdb384cbb 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -36,9 +37,11 @@ import 
org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
@@ -123,6 +126,26 @@ public class DataSinkWriterOperator<CommT> extends 
AbstractStreamOperator<Commit
                 .initializeState(context);
     }
 
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator()
+                .snapshotState(context);
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        super.processWatermark(mark);
+        
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator()
+                .processWatermark(mark);
+    }
+
+    @Override
+    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws 
Exception {
+        super.processWatermarkStatus(watermarkStatus);
+        
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator()
+                .processWatermarkStatus(watermarkStatus);
+    }
+
     @Override
     public void processElement(StreamRecord<Event> element) throws Exception {
         Event event = element.getValue();

Reply via email to