aljoscha commented on a change in pull request #12147:
URL: https://github.com/apache/flink/pull/12147#discussion_r425684214



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.eventtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link WatermarkOutputMultiplexer} combines the watermark (and idleness) 
updates of multiple
+ * partitions/shards/splits into one combined watermark update and forwards it 
to an underlying
+ * {@link WatermarkOutput}.
+ *
+ * <p>A multiplexed output can either be immediate or deferred. Watermark 
updates on an immediate
+ * output will potentially directly affect the combined watermark state, which 
will be forwarded to
+ * the underlying output immediately. Watermark updates on a deferred output 
will only update an
+ * internal state but not directly update the combined watermark state. Only 
when {@link
+ * #onPeriodicEmit()} is called will the deferred updates be combined and 
forwarded to the
+ * underlying output.
+ *
+ * <p>For registering a new multiplexed output, you must first call {@link 
#registerNewOutput()}
+ * and then call {@link #getImmediateOutput(int)} or {@link 
#getDeferredOutput(int)} with the output
+ * ID you get from that. You can get both an immediate and deferred output for 
a given output ID,
+ * you can also call the getters multiple times.
+ */
+@Internal
+public class WatermarkOutputMultiplexer {
+
+       /**
+        * The {@link WatermarkOutput} that we use to emit our multiplexed 
watermark updates. We assume
+        * that outside code holds a coordinating lock so we don't lock in this 
class when accessing
+        * this {@link WatermarkOutput}.
+        */
+       private final WatermarkOutput underlyingOutput;
+
+       /** The id to use for the next registered output. */
+       private int nextOutputId = 0;

Review comment:
       I implemented this with Kafka in mind, which doesn't use multiple 
threads for interacting with this. Also updating the watermark from the 
`WatermarkOutputs` that we give out is not thread safe.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to