dawidwys commented on a change in pull request #12147:
URL: https://github.com/apache/flink/pull/12147#discussion_r425081822



##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
##########
@@ -273,6 +273,36 @@ static void adjustAutoCommitConfig(Properties properties, 
OffsetCommitMode offse
        //  Configuration
        // 
------------------------------------------------------------------------
 
+       /**
+        * Sets the given {@link WatermarkStrategy} on this consumer. These 
will be used to assign
+        * timestamps to records and generates watermarks to signal event time 
progress.
+        *
+        * <p>When a subtask of a FlinkKafkaConsumer source reads multiple 
Kafka partitions,

Review comment:
       nit: Could we restructure it to `what -> why`? Right now it reads `why 
-> what` I think it would be enough to change the order of the two paragraphs.

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
##########
@@ -202,11 +193,10 @@ protected AbstractFetcher(
                }
 
                // if we have periodic watermarks, kick off the interval 
scheduler
-               if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-                       @SuppressWarnings("unchecked")
-                       PeriodicWatermarkEmitter periodicEmitter = new 
PeriodicWatermarkEmitter(
+               if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR) {
+                       PeriodicWatermarkEmitter<KPH> periodicEmitter = new 
PeriodicWatermarkEmitter<>(

Review comment:
       Shall we modify the `PeriodicWatermarkEmitter` so that it does not 
register timers if the `autoWatermarkInterval <= 0`?
   
   Or even change this to `if (timestampWatermarkMode == 
WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0)`

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.eventtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link WatermarkOutputMultiplexer} combines the watermark (and idleness) 
updates of multiple
+ * partitions/shards/splits into one combined watermark update and forwards it 
to an underlying
+ * {@link WatermarkOutput}.
+ *
+ * <p>A multiplexed output can either be immediate or deferred. Watermark 
updates on an immediate
+ * output will potentially directly affect the combined watermark state, which 
will be forwarded to
+ * the underlying output immediately. Watermark updates on a deferred output 
will only update an
+ * internal state but not directly update the combined watermark state. Only 
when {@link
+ * #onPeriodicEmit()} is called will the deferred updates be combined and 
forwarded to the
+ * underlying output.
+ *
+ * <p>For registering a new multiplexed output, you must first call {@link 
#registerNewOutput()}
+ * and then call {@link #getImmediateOutput(int)} or {@link 
#getDeferredOutput(int)} with the output
+ * ID you get from that. You can get both an immediate and deferred output for 
a given output ID,
+ * you can also call the getters multiple times.
+ */
+@Internal
+public class WatermarkOutputMultiplexer {
+
+       /**
+        * The {@link WatermarkOutput} that we use to emit our multiplexed 
watermark updates. We assume
+        * that outside code holds a coordinating lock so we don't lock in this 
class when accessing
+        * this {@link WatermarkOutput}.
+        */
+       private final WatermarkOutput underlyingOutput;
+
+       /** The id to use for the next registered output. */
+       private int nextOutputId = 0;

Review comment:
       Will this class be always accessed from a single thread? Do we need to 
synchronize registering new outputs?

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/eventtime/SerializableTimestampAssigner.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.eventtime;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * A {@link TimestampAssigner} that is also {@link java.io.Serializable}.
+ */
+@PublicEvolving
+@FunctionalInterface
+public interface SerializableTimestampAssigner<T> extends 
TimestampAssigner<T>, Serializable {}

Review comment:
       I was thinking we could have it as a nested class in the 
WatermarkStrategies, so it is not really a user facing class. Primarily for use 
with lambdas. But I am also fine with having it here.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.eventtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link WatermarkOutputMultiplexer} combines the watermark (and idleness) 
updates of multiple
+ * partitions/shards/splits into one combined watermark update and forwards it 
to an underlying
+ * {@link WatermarkOutput}.
+ *
+ * <p>A multiplexed output can either be immediate or deferred. Watermark 
updates on an immediate
+ * output will potentially directly affect the combined watermark state, which 
will be forwarded to
+ * the underlying output immediately. Watermark updates on a deferred output 
will only update an
+ * internal state but not directly update the combined watermark state. Only 
when {@link
+ * #onPeriodicEmit()} is called will the deferred updates be combined and 
forwarded to the
+ * underlying output.
+ *
+ * <p>For registering a new multiplexed output, you must first call {@link 
#registerNewOutput()}
+ * and then call {@link #getImmediateOutput(int)} or {@link 
#getDeferredOutput(int)} with the output
+ * ID you get from that. You can get both an immediate and deferred output for 
a given output ID,
+ * you can also call the getters multiple times.
+ */
+@Internal
+public class WatermarkOutputMultiplexer {
+
+       /**
+        * The {@link WatermarkOutput} that we use to emit our multiplexed 
watermark updates. We assume
+        * that outside code holds a coordinating lock so we don't lock in this 
class when accessing
+        * this {@link WatermarkOutput}.
+        */
+       private final WatermarkOutput underlyingOutput;
+
+       /** The id to use for the next registered output. */
+       private int nextOutputId = 0;
+
+       /** The combined watermark over the per-output watermarks. */
+       private long combinedWatermark = Long.MIN_VALUE;
+
+       /**
+        * Map view, to allow finding them when requesting the {@link 
WatermarkOutput} for a given id.
+        */
+       private final Map<Integer, OutputState> watermarkPerOutputId;
+
+       /**
+        * List of all watermark outputs, for efficient access.
+        */
+       private final List<OutputState> watermarkOutputs;
+
+       /**
+        * Creates a new {@link WatermarkOutputMultiplexer} that emits combined 
updates to the given
+        * {@link WatermarkOutput}.
+        */
+       public WatermarkOutputMultiplexer(WatermarkOutput underlyingOutput) {
+               this.underlyingOutput = underlyingOutput;
+               this.watermarkPerOutputId = new HashMap<>();
+               this.watermarkOutputs = new ArrayList();

Review comment:
       ```suggestion
                this.watermarkOutputs = new ArrayList<>();
   ```

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
##########
@@ -301,11 +312,99 @@ public void testPeriodicWatermarks() throws Exception {
 
                TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
 
+               AssignerWithPeriodicWatermarksAdapter.Strategy<Long> 
testWmStrategy =

Review comment:
       Instead of copying the test you can just parameterize it:
   
   ```
   /**
    * Tests for the {@link AbstractFetcher}.
    */
   @SuppressWarnings("serial")
   @RunWith(Enclosed.class)
   public class AbstractFetcherTest {
   
        @RunWith(Parameterized.class)
        public static class PeriodicWatermarksSuite {
   
                @Parameterized.Parameters
                public static Collection<WatermarkStrategy<Long>> getParams() {
                        return Arrays.asList(
                                new 
AssignerWithPeriodicWatermarksAdapter.Strategy<>(new PeriodicTestExtractor()),
                                WatermarkStrategies
                                        .forGenerator(new 
PeriodicTestWatermarkGenerator())
                                        .withTimestampAssigner((event, 
previousTimestamp) -> event)
                                        .build()
                        );
                }
   
                @Parameterized.Parameter
                public WatermarkStrategy<Long> testWmStrategy;
   
                @Test
                public void test() throws Exception {
                        final String testTopic = "test topic name";
                        Map<KafkaTopicPartition, Long> originalPartitions = new 
HashMap<>();
                        originalPartitions.put(new 
KafkaTopicPartition(testTopic, 7), 
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
                        originalPartitions.put(new 
KafkaTopicPartition(testTopic, 13), 
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
                        originalPartitions.put(new 
KafkaTopicPartition(testTopic, 21), 
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
   
                        TestSourceContext<Long> sourceContext = new 
TestSourceContext<>();
   
                        TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
   
                        TestFetcher<Long> fetcher = new TestFetcher<>(
                                sourceContext,
                                originalPartitions,
                                new SerializedValue<>(testWmStrategy),
                                processingTimeService,
                                10);
   
                        final KafkaTopicPartitionState<Object> part1 = 
fetcher.subscribedPartitionStates().get(0);
                        final KafkaTopicPartitionState<Object> part2 = 
fetcher.subscribedPartitionStates().get(1);
                        final KafkaTopicPartitionState<Object> part3 = 
fetcher.subscribedPartitionStates().get(2);
                           ....
              }
          }
   ```

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
##########
@@ -361,80 +350,37 @@ protected void emitRecordsWithTimestamps(
                                // timestamps will be of the same size as 
records.
                                long timestamp = getTimestampForRecord(record, 
partitionState, kafkaEventTimestamp);
                                sourceContext.collectWithTimestamp(record, 
timestamp);
-                               if (timestampWatermarkMode == 
PUNCTUATED_WATERMARKS) {
-                                       emitPunctuatedWatermark(record, 
timestamp, partitionState);
-                               }
                        }
                        partitionState.setOffset(offset);
                }
        }
 
-       private void emitPunctuatedWatermark(
-                       T record,
-                       long timestamp,
-                       KafkaTopicPartitionState<KPH> partitionState) {
-               final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> 
withWatermarksState =
-                       (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, 
KPH>) partitionState;
-
-               Watermark newWatermark = 
withWatermarksState.checkAndGetNewWatermark(record, timestamp);
-
-               // if we also have a new per-partition watermark, check if that 
is also a
-               // new cross-partition watermark
-               if (newWatermark != null) {
-                       updateMinPunctuatedWatermark(newWatermark);
-               }
-       }
-
+       // This must be called under the checkpoint lock because we potentially 
emit watermarks in
+       // withWatermarkState.onRecord()
        protected long getTimestampForRecord(
                        T record,
                        KafkaTopicPartitionState<KPH> partitionState,
                        long kafkaEventTimestamp) {
                if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
                        return kafkaEventTimestamp;
-               } else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-                       final KafkaTopicPartitionStateWithPeriodicWatermarks<T, 
KPH> withWatermarksState =
-                               
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
-
-                       // extract timestamp - this accesses/modifies the 
per-partition state inside the
-                       // watermark generator instance, so we need to lock the 
access on the
-                       // partition state. concurrent access can happen from 
the periodic emitter
-                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
-                       synchronized (withWatermarksState) {
-                               return 
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
-                       }
                } else {
-                       final 
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
-                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
-
-                       // only one thread ever works on accessing timestamps 
and watermarks
-                       // from the punctuated extractor
-                       return 
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
-               }
-       }
+                       // must have a WatermarkGenerator
 
-       /**
-        *Checks whether a new per-partition watermark is also a new 
cross-partition watermark.
-        */
-       private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
-               if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
-                       long newMin = Long.MAX_VALUE;
+                       final KafkaTopicPartitionStateWithWatermarkGenerator<T, 
KPH> withWatermarksState =
+                               
(KafkaTopicPartitionStateWithWatermarkGenerator<T, KPH>) partitionState;
 
-                       for (KafkaTopicPartitionState<?> state : 
subscribedPartitionStates) {
-                               @SuppressWarnings("unchecked")
-                               final 
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
-                                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
+                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                       synchronized (withWatermarksState) {
 
-                               newMin = Math.min(newMin, 
withWatermarksState.getCurrentPartitionWatermark());
-                       }
+                               // You would expect that we don't have to do 
this under lock. You would be wrong:
+                               // A WatermarkStrategy can wrap an old-style 
combined
+                               // timestamp extractor/watermark assigner, in 
which case the TimestampAssigner and
+                               // WatermarkGenerator wrap one and the same 
object, where extracting the timestamp
+                               // updates the internal state of the assigner.
+                               long timestamp = 
withWatermarksState.extractTimestamp(record, kafkaEventTimestamp);
 
-                       // double-check locking pattern
-                       if (newMin > maxWatermarkSoFar) {
-                               synchronized (checkpointLock) {
-                                       if (newMin > maxWatermarkSoFar) {
-                                               maxWatermarkSoFar = newMin;
-                                               sourceContext.emitWatermark(new 
Watermark(newMin));
-                                       }
-                               }
+                               withWatermarksState.onEvent(record, timestamp);

Review comment:
       Shouldn't this happen after the record is collected? Otherwise the 
watermark will be emitted before the event it originates from.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.eventtime;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A {@code TimestampAssigner} assigns event time timestamps to elements.
+ * These timestamps are used by all functions that operate on event time,
+ * for example event time windows.
+ *
+ * <p>Timestamps can be an arbitrary {@code long} value, but all built-in 
implementations
+ * represent it as the milliseconds since the Epoch (midnight, January 1, 1970 
UTC),
+ * the same way as {@link System#currentTimeMillis()} does it.
+ *
+ * @param <T> The type of the elements to which this assigner assigns 
timestamps.
+ */
+@PublicEvolving
+@FunctionalInterface
+public interface TimestampAssigner<T> {
+
+       /**
+        * Assigns a timestamp to an element, in milliseconds since the Epoch.

Review comment:
       nit: `since the Epoch independent of a particular time zone or calendar`

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/eventtime/AscendingTimestampsWatermarks.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.eventtime;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.time.Duration;
+
+/**
+ * A watermark generator that assumes monotonically ascending timestamps 
within the
+ * stream split and periodically generates watermarks based on that assumption.
+ *
+ * <p>The current watermark is always one after the latest (highest) timestamp,
+ * because we assume that more records with the same timestamp may still 
follow.
+ *
+ * <p>The watermarks are generated periodically and tightly follow the latest
+ * timestamp in the data. The delay introduced by this strategy is mainly the 
periodic
+ * interval in which the watermarks are generated.

Review comment:
       Shall we point to the `autoWatermarkInterval` configuration? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to