twalthr commented on code in PR #27962:
URL: https://github.com/apache/flink/pull/27962#discussion_r3122829610


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java:
##########
@@ -88,6 +88,7 @@ public List<TableTestProgram> programs() {
                 ProcessTableFunctionTestPrograms.PROCESS_UNNAMED_TIMERS,
                 ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS,
                 ProcessTableFunctionTestPrograms.PROCESS_ROW_LATE_EVENTS,
+                
ProcessTableFunctionTestPrograms.PROCESS_CONSISTENT_WATERMARK_TIMERS,

Review Comment:
   Should we drop this test if it doesn't provide much value?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessRowTableOperator.java:
##########
@@ -82,11 +83,21 @@ public void processElement(StreamRecord<RowData> element) 
throws Exception {
                     0,
                     element.getValue(),
                     inputSemantics.timeColumn(),
-                    processTableRunner.getCurrentWatermark());
+                    combinedWatermark.getCombinedWatermark());
         }
         processTableRunner.processEval();
     }
 
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        // Advance combinedWatermark before super.processWatermark(mark): 
super.processWatermark()
+        // fires due timers, which read the watermark via timeContext(). V2's 
OneInputStreamOperator
+        // watermark path doesn't update combinedWatermark itself — only 
reportWatermark()
+        // (multi-input) does — so we do it here.
+        combinedWatermark.updateWatermark(0, mark.getTimestamp());
+        super.processWatermark(mark);

Review Comment:
   how about we simply call `reportWatermark` here? I think this fits better 
and also takes care of calling `processWatermark`. After that I think the 
comment is not necessary anymore, the code would be self explaining.



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java:
##########
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.process;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
+import 
org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.ProcessTableFunction.TimeContext;
+import org.apache.flink.table.runtime.generated.HashFunction;
+import org.apache.flink.table.runtime.generated.ProcessTableRunner;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that {@link ProcessSetTableOperator} drives {@link 
ProcessTableRunner} correctly when user
+ * timers are fired under unaligned checkpoints, both with and without 
interruptible timers.
+ *
+ * <p>A minimal {@link ProcessTableRunner} is used: {@link 
ProcessTableRunner#callEval()} emits a
+ * "record-$ts@wm$wm" row via {@code evalCollector} and registers an unnamed 
event-time timer at the
+ * row's event time (plus an additional named timer for the row at ts=1000); 
{@link
+ * ProcessTableRunner#callOnTimer()} emits a "fired-[name-]$ts@wm$wm" row via 
{@code
+ * onTimerCollector}. The operator is wrapped so that after each timer firing 
a non-deferrable
+ * "mail-[name-]$ts" mailbox mail is scheduled — the PTF user API does not 
expose the mailbox, so
+ * this is the only way to observe externally whether the mailbox is drained 
between timer firings.
+ *
+ * <p>With interruptible timers enabled, the mailbox is drained between 
successive timer firings, so
+ * each "fired" row is immediately followed by its "mail" row. With 
interruptible timers disabled,
+ * all timers in a batch fire first (and the watermark is emitted downstream) 
before any of the
+ * scheduled mails are processed. The watermark values on "record" and "fired" 
rows also validate
+ * that {@link TimeContext#currentWatermark()} advances consistently across 
input processing and
+ * timer firing.
+ */
+class ProcessSetTableOperatorInterruptibleTimersTest {
+
+    private static final InternalTypeInfo<RowData> INPUT_TYPE =
+            InternalTypeInfo.ofFields(new BigIntType(), new TimestampType(3));
+
+    // Output layout is driven by PassPartitionKeysCollector / 
PassAllCollector in
+    // AbstractProcessTableOperator: [partition-key, PTF-emitted label, 
rowtime].
+    private static final InternalTypeInfo<RowData> OUTPUT_TYPE =
+            InternalTypeInfo.ofFields(
+                    new BigIntType(), VarCharType.STRING_TYPE, new 
TimestampType(3));
+
+    private static final RowDataKeySelector KEY_SELECTOR =
+            HandwrittenSelectorUtil.getRowDataSelector(new int[] {0}, 
INPUT_TYPE.toRowFieldTypes());
+
+    private static final long KEY = 42L;
+
+    private static final String NAMED_TIMER = "n";
+
+    @ParameterizedTest(name = "interruptibleTimers={0}")
+    @ValueSource(booleans = {true, false})
+    void testTimersThroughProcessTableRunner(boolean interruptibleTimers) 
throws Exception {
+        try (StreamTaskMailboxTestHarness<RowData> harness = 
createHarness(interruptibleTimers)) {
+            // Each input row registers one event-time timer at its own 
timestamp. The row at
+            // ts=1000 additionally registers a named timer at the same 
timestamp, so the first
+            // batch contains three unnamed firings and one named firing at 
identical timestamps
+            // (ts=1000) — exercising both timer services advancing together.
+            for (long ts = 1000L; ts <= 3000L; ts += 1000L) {
+                harness.processElement(
+                        new StreamRecord<>(
+                                GenericRowData.of(KEY, 
TimestampData.fromEpochMillis(ts)), ts));
+            }
+            // Advance watermark past registered timers (first batch: three 
unnamed + one named).
+            harness.processElement(new Watermark(5000L));
+
+            // One more input row followed by a second watermark advance 
(second batch: one
+            // unnamed timer only).
+            harness.processElement(
+                    new StreamRecord<>(
+                            GenericRowData.of(KEY, 
TimestampData.fromEpochMillis(6000L)), 6000L));
+            harness.processElement(new Watermark(7000L));
+
+            List<String> labels =
+                    harness.getOutput().stream()
+                            
.map(ProcessSetTableOperatorInterruptibleTimersTest::describe)
+                            .collect(Collectors.toList());
+
+            // Records emitted from callEval carry the watermark visible 
during eval, which is null
+            // before any watermark has arrived and 5000 for the record 
processed between the two
+            // watermarks.
+            if (interruptibleTimers) {
+                // With interruptible timers, the mailbox is drained between 
successive timer
+                // firings, so each "fired-[name-]$ts" is immediately followed 
by its matching
+                // "mail-[name-]$ts". The watermark is only emitted downstream 
once all firings in
+                // the batch have completed.
+                assertThat(labels)
+                        .containsExactly(
+                                recordLabel(1000L, null),
+                                recordLabel(2000L, null),
+                                recordLabel(3000L, null),
+                                firedLabel(null, 1000L, 5000L),
+                                mailLabel(null, 1000L),
+                                firedLabel(null, 2000L, 5000L),
+                                mailLabel(null, 2000L),
+                                firedLabel(null, 3000L, 5000L),
+                                mailLabel(null, 3000L),
+                                firedLabel(NAMED_TIMER, 1000L, 5000L),
+                                mailLabel(NAMED_TIMER, 1000L),
+                                watermarkLabel(5000L),
+                                recordLabel(6000L, 5000L),
+                                firedLabel(null, 6000L, 7000L),
+                                mailLabel(null, 6000L),
+                                watermarkLabel(7000L));
+            } else {
+                // Without interruptible timers, all timers in a batch fire 
synchronously (emitting
+                // their "fired" rows) before the watermark is emitted 
downstream. Only afterwards
+                // does the mailbox drain the scheduled "mail" rows in FIFO 
order.
+                assertThat(labels)
+                        .containsExactly(
+                                recordLabel(1000L, null),
+                                recordLabel(2000L, null),
+                                recordLabel(3000L, null),
+                                firedLabel(null, 1000L, 5000L),
+                                firedLabel(null, 2000L, 5000L),
+                                firedLabel(null, 3000L, 5000L),
+                                firedLabel(NAMED_TIMER, 1000L, 5000L),
+                                watermarkLabel(5000L),
+                                mailLabel(null, 1000L),
+                                mailLabel(null, 2000L),
+                                mailLabel(null, 3000L),
+                                mailLabel(NAMED_TIMER, 1000L),
+                                recordLabel(6000L, 5000L),
+                                firedLabel(null, 6000L, 7000L),
+                                watermarkLabel(7000L),
+                                mailLabel(null, 6000L));
+            }
+        }
+    }
+
+    private StreamTaskMailboxTestHarness<RowData> createHarness(boolean 
interruptibleTimers)
+            throws Exception {
+        return new 
StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, OUTPUT_TYPE)
+                .addJobConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofSeconds(1))
+                .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, true)
+                .addJobConfig(
+                        
CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS,
+                        interruptibleTimers)
+                .setKeyType(KEY_SELECTOR.getProducedType())
+                .addInput(INPUT_TYPE, 1, KEY_SELECTOR)
+                .setupOperatorChain(new TestOperatorFactory())
+                .name("ptf")
+                .finishForSingletonOperatorChain(
+                        OUTPUT_TYPE.createSerializer(new 
SerializerConfigImpl()))
+                .build();
+    }
+
+    private static String describe(Object obj) {
+        if (obj instanceof Watermark) {
+            return watermarkLabel(((Watermark) obj).getTimestamp());
+        } else if (obj instanceof StreamRecord) {
+            Object value = ((StreamRecord<?>) obj).getValue();
+            if (value instanceof RowData) {
+                // Field 1 is the PTF-emitted label (see OUTPUT_TYPE); fields 
0 and 2 are the
+                // pass-through partition key and rowtime.
+                return ((RowData) value).getString(1).toString();
+            }
+            return value.toString();
+        }
+        return obj.toString();
+    }
+
+    private static String recordLabel(long ts, @Nullable Long watermark) {
+        return "record-" + ts + "@wm" + (watermark == null ? "null" : 
watermark);
+    }
+
+    private static String firedLabel(@Nullable String name, long ts, long 
watermark) {
+        return "fired-" + (name == null ? "" : name + "-") + ts + "@wm" + 
watermark;
+    }
+
+    private static String mailLabel(@Nullable String name, long ts) {
+        return "mail-" + (name == null ? "" : name + "-") + ts;
+    }
+
+    private static String watermarkLabel(long ts) {
+        return "Watermark@" + ts;
+    }
+
+    private static RuntimeTableSemantics tableSemantics() {
+        return new RuntimeTableSemantics(
+                "r",
+                0,
+                DataTypes.ROW(
+                        DataTypes.FIELD("k", DataTypes.BIGINT()),
+                        DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3))),

Review Comment:
   Define this in a variable and derive `INPUT_TYPE` from it via 
`InternalTypeInfo.of(dataType.getLogicalType)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to