AHeise commented on a change in pull request #12441:
URL: https://github.com/apache/flink/pull/12441#discussion_r434746169



##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TimeTestUtil.scala
##########
@@ -52,14 +56,47 @@ object TimeTestUtil {
     }
   }
 
+  /**
+   * A streaming operator to emit records and watermark depends on the input 
data.
+   * The last emitted watermark will be stored in state to emit it again on 
recovery.
+   * This is necessary for late arrival testing with 
[[FailingCollectionSource]].
+   */
   class EventTimeProcessOperator[T]
     extends AbstractStreamOperator[T]
       with OneInputStreamOperator[Either[(Long, T), Long], T] {
 
+    private var currentWatermark: Long = 0L
+    private var watermarkState: ListState[JLong] = _
+
+    override def snapshotState(context: StateSnapshotContext): Unit = {
+      super.snapshotState(context)
+      watermarkState.clear()
+      watermarkState.add(currentWatermark)
+    }
+
+    override def initializeState(context: StateInitializationContext): Unit = {
+      super.initializeState(context)
+      // use union list state to get the max watermark
+      watermarkState = context.getOperatorStateStore.getUnionListState(

Review comment:
       What's the idea of using max watermark?
   
   In the used test cases, the sources have only a DOP of 1 (at least from what 
I have seen). So it seems more complicated than necessary as the union state is 
actually not used with more than one element afaik.
   
   Even if we have a DOP>1, it might even be harmful. Let's say we have two 
source subtask. By using the max watermark on all sources, we might declare 
some elements late, even though they are not, because we used the watermark of 
the other source subtasks.
   
   I also haven't seen any indication that this class is used for rescaling. So 
union state might be over-engineered. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to