stevenzwu commented on pull request #3870:
URL: https://github.com/apache/iceberg/pull/3870#issuecomment-1009259729
Here is the smaller diff if we are only looking at `flink/source` dir
`git diff --no-index
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source`
```
diff --git
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
index 235b17332..c8efc2b59 100644
---
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
+++
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.source;
import java.io.IOException;
import java.util.Queue;
+import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.JavaSerializer;
@@ -29,7 +30,6 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import
org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import
org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -110,10 +110,10 @@ public class StreamingReaderOperator extends
AbstractStreamOperator<RowData>
getOperatorConfig().getTimeCharacteristic(),
getProcessingTimeService(),
new Object(), // no actual locking needed
- getContainingTask().getStreamStatusMaintainer(),
output,
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
- -1);
+ -1,
+ true);
// Enqueue to process the recovered input splits.
enqueueProcessSplits();
@@ -169,8 +169,8 @@ public class StreamingReaderOperator extends
AbstractStreamOperator<RowData>
}
@Override
- public void dispose() throws Exception {
- super.dispose();
+ public void close() throws Exception {
+ super.close();
if (format != null) {
format.close();
@@ -182,8 +182,8 @@ public class StreamingReaderOperator extends
AbstractStreamOperator<RowData>
}
@Override
- public void close() throws Exception {
- super.close();
+ public void finish() throws Exception {
+ super.finish();
output.close();
if (sourceContext != null) {
sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]