stevenzwu commented on pull request #3870:
URL: https://github.com/apache/iceberg/pull/3870#issuecomment-1009259729


   Here is the smaller diff if we are only looking at `flink/source` dir
   
   `git diff --no-index 
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source`
   
   ```
   diff --git 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
   index 235b17332..c8efc2b59 100644
   --- 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
   +++ 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
   @@ -21,6 +21,7 @@ package org.apache.iceberg.flink.source;
   
    import java.io.IOException;
    import java.util.Queue;
   +import org.apache.flink.api.common.operators.MailboxExecutor;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.runtime.state.JavaSerializer;
   @@ -29,7 +30,6 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    import 
org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
   -import org.apache.flink.streaming.api.operators.MailboxExecutor;
    import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    import 
org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
    import org.apache.flink.streaming.api.operators.StreamOperator;
   @@ -110,10 +110,10 @@ public class StreamingReaderOperator extends 
AbstractStreamOperator<RowData>
            getOperatorConfig().getTimeCharacteristic(),
            getProcessingTimeService(),
            new Object(), // no actual locking needed
   -        getContainingTask().getStreamStatusMaintainer(),
            output,
            getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
   -        -1);
   +        -1,
   +        true);
   
        // Enqueue to process the recovered input splits.
        enqueueProcessSplits();
   @@ -169,8 +169,8 @@ public class StreamingReaderOperator extends 
AbstractStreamOperator<RowData>
      }
   
      @Override
   -  public void dispose() throws Exception {
   -    super.dispose();
   +  public void close() throws Exception {
   +    super.close();
   
        if (format != null) {
          format.close();
   @@ -182,8 +182,8 @@ public class StreamingReaderOperator extends 
AbstractStreamOperator<RowData>
      }
   
      @Override
   -  public void close() throws Exception {
   -    super.close();
   +  public void finish() throws Exception {
   +    super.finish();
        output.close();
        if (sourceContext != null) {
          sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to