This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 62b37ed22086b5bbaf26307fa51261aca442d8a2
Author: Fabian Hueske <[email protected]>
AuthorDate: Wed Apr 22 15:49:37 2026 +0200

    [FLINK-39437][table] Support interruptible timers in PTFs (#27962)
    
    Override `useInterruptibleTimers()` in `AbstractProcessTableOperator` to
    return `true`, activating the `MailboxWatermarkProcessor` for PTF operators.
    This allows timer firing to be interrupted between mailbox iterations,
    improving throughput by not blocking mailbox processing during large timer
    batches.
    
    Also removes `currentWatermark` from `ProcessTableRunner` and redirect 
reads to
    `AbstractStreamOperatorV2.combinedWatermark`.
    
    Add a test (`ProcessSetTableOperatorInterruptibleTimersTest`) to assert 
correct behavior
    of PTF framework when timer processing is interrupted.
---
 .../runtime/generated/ProcessTableRunner.java      |   9 -
 .../process/AbstractProcessTableOperator.java      |  10 +-
 .../operators/process/ProcessRowTableOperator.java |  13 +-
 ...essSetTableOperatorInterruptibleTimersTest.java | 352 +++++++++++++++++++++
 4 files changed, 371 insertions(+), 13 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
index b90a201649e..5477fc4523f 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
@@ -65,7 +65,6 @@ public abstract class ProcessTableRunner extends 
AbstractRichFunction {
 
     // Current time
     private long tableWatermark = Long.MIN_VALUE;
-    private long currentWatermark = Long.MIN_VALUE;
     private @Nullable Long rowtime;
     private @Nullable StringData timerName;
 
@@ -131,10 +130,6 @@ public abstract class ProcessTableRunner extends 
AbstractRichFunction {
         tableWatermark = Long.MIN_VALUE;
     }
 
-    public void ingestCurrentWatermarkEvent(long watermark) {
-        currentWatermark = watermark;
-    }
-
     public void clearAllState() {
         Arrays.fill(stateCleared, true);
     }
@@ -143,10 +138,6 @@ public abstract class ProcessTableRunner extends 
AbstractRichFunction {
         stateCleared[statePos] = true;
     }
 
-    public long getCurrentWatermark() {
-        return currentWatermark;
-    }
-
     public long getTableWatermark() {
         return tableWatermark;
     }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
index cf3a6149af6..d3b58b5c886 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
@@ -145,11 +146,14 @@ public abstract class AbstractProcessTableOperator 
extends AbstractStreamOperato
         FunctionUtils.openFunction(processTableRunner, 
DefaultOpenContext.INSTANCE);
     }
 
+    @Override
+    public final boolean useInterruptibleTimers(ReadableConfig config) {
+        return true;
+    }
+
     @Override
     public void processWatermark(Watermark mark) throws Exception {
         super.processWatermark(mark);
-        // TODO this line has issues with interruptible timers, see FLINK-39437
-        processTableRunner.ingestCurrentWatermarkEvent(mark.getTimestamp());
     }
 
     @Override
@@ -220,7 +224,7 @@ public abstract class AbstractProcessTableOperator extends 
AbstractStreamOperato
 
             internalTimeContext.setTime(
                     processTableRunner.getTableWatermark(),
-                    processTableRunner.getCurrentWatermark(),
+                    combinedWatermark.getCombinedWatermark(),
                     processTableRunner.getTime());
 
             return (TimeContext<TimeType>)
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessRowTableOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessRowTableOperator.java
index 1964b2d2376..80528c1e63f 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessRowTableOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessRowTableOperator.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.runtime.operators.process;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@@ -82,11 +83,21 @@ public class ProcessRowTableOperator extends 
AbstractProcessTableOperator
                     0,
                     element.getValue(),
                     inputSemantics.timeColumn(),
-                    processTableRunner.getCurrentWatermark());
+                    combinedWatermark.getCombinedWatermark());
         }
         processTableRunner.processEval();
     }
 
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        // Advance combinedWatermark before super.processWatermark(mark): 
super.processWatermark()
+        // fires due timers, which read the watermark via timeContext(). V2's 
OneInputStreamOperator
+        // watermark path doesn't update combinedWatermark itself — only 
reportWatermark()
+        // (multi-input) does — so we do it here.
+        combinedWatermark.updateWatermark(0, mark.getTimestamp());
+        super.processWatermark(mark);
+    }
+
     @Override
     public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws 
Exception {
         super.processWatermarkStatus(watermarkStatus, 1);
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java
new file mode 100644
index 00000000000..be390ab5f55
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.process;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
+import 
org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.ProcessTableFunction.TimeContext;
+import org.apache.flink.table.runtime.generated.HashFunction;
+import org.apache.flink.table.runtime.generated.ProcessTableRunner;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that {@link ProcessSetTableOperator} drives {@link 
ProcessTableRunner} correctly when user
+ * timers are fired under unaligned checkpoints, both with and without 
interruptible timers.
+ *
+ * <p>A minimal {@link ProcessTableRunner} is used: {@link 
ProcessTableRunner#callEval()} emits a
+ * "record-$ts@wm$wm" row via {@code evalCollector} and registers an unnamed 
event-time timer at the
+ * row's event time (plus an additional named timer for the row at ts=1000); 
{@link
+ * ProcessTableRunner#callOnTimer()} emits a "fired-[name-]$ts@wm$wm" row via 
{@code
+ * onTimerCollector}. The operator is wrapped so that after each timer firing 
a non-deferrable
+ * "mail-[name-]$ts" mailbox mail is scheduled — the PTF user API does not 
expose the mailbox, so
+ * this is the only way to observe externally whether the mailbox is drained 
between timer firings.
+ *
+ * <p>With interruptible timers enabled, the mailbox is drained between 
successive timer firings, so
+ * each "fired" row is immediately followed by its "mail" row. With 
interruptible timers disabled,
+ * all timers in a batch fire first (and the watermark is emitted downstream) 
before any of the
+ * scheduled mails are processed. The watermark values on "record" and "fired" 
rows also validate
+ * that {@link TimeContext#currentWatermark()} advances consistently across 
input processing and
+ * timer firing.
+ */
+class ProcessSetTableOperatorInterruptibleTimersTest {
+
+    private static final DataType INPUT_TYPE =
+            DataTypes.ROW(
+                    DataTypes.FIELD("k", DataTypes.BIGINT()),
+                    DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)));
+
+    // Output layout is driven by PassPartitionKeysCollector / 
PassAllCollector in
+    // AbstractProcessTableOperator: [partition-key, PTF-emitted label, 
rowtime].
+    private static final DataType OUTPUT_TYPE =
+            DataTypes.ROW(
+                    DataTypes.FIELD("k", DataTypes.BIGINT()),
+                    DataTypes.FIELD("label", DataTypes.STRING()),
+                    DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)));
+
+    private static final InternalTypeInfo<RowData> INPUT_TYPE_INFO =
+            InternalTypeInfo.of(INPUT_TYPE.getLogicalType());
+
+    private static final InternalTypeInfo<RowData> OUTPUT_TYPE_INFO =
+            InternalTypeInfo.of(OUTPUT_TYPE.getLogicalType());
+
+    private static final RowDataKeySelector KEY_SELECTOR =
+            HandwrittenSelectorUtil.getRowDataSelector(
+                    new int[] {0}, INPUT_TYPE_INFO.toRowFieldTypes());
+
+    private static final long KEY = 42L;
+
+    private static final String NAMED_TIMER = "n";
+
+    @ParameterizedTest(name = "interruptibleTimers={0}")
+    @ValueSource(booleans = {true, false})
+    void testTimersThroughProcessTableRunner(boolean interruptibleTimers) 
throws Exception {
+        try (StreamTaskMailboxTestHarness<RowData> harness = 
createHarness(interruptibleTimers)) {
+            // Each input row registers one event-time timer at its own 
timestamp. The row at
+            // ts=1000 additionally registers a named timer at the same 
timestamp, so the first
+            // batch contains three unnamed firings and one named firing at 
identical timestamps
+            // (ts=1000) — exercising both timer services advancing together.
+            for (long ts = 1000L; ts <= 3000L; ts += 1000L) {
+                harness.processElement(
+                        new StreamRecord<>(
+                                GenericRowData.of(KEY, 
TimestampData.fromEpochMillis(ts)), ts));
+            }
+            // Advance watermark past registered timers (first batch: three 
unnamed + one named).
+            harness.processElement(new Watermark(5000L));
+
+            // One more input row followed by a second watermark advance 
(second batch: one
+            // unnamed timer only).
+            harness.processElement(
+                    new StreamRecord<>(
+                            GenericRowData.of(KEY, 
TimestampData.fromEpochMillis(6000L)), 6000L));
+            harness.processElement(new Watermark(7000L));
+
+            List<String> labels =
+                    harness.getOutput().stream()
+                            
.map(ProcessSetTableOperatorInterruptibleTimersTest::describe)
+                            .collect(Collectors.toList());
+
+            // Records emitted from callEval carry the watermark visible 
during eval, which is null
+            // before any watermark has arrived and 5000 for the record 
processed between the two
+            // watermarks.
+            if (interruptibleTimers) {
+                // With interruptible timers, the mailbox is drained between 
successive timer
+                // firings, so each "fired-[name-]$ts" is immediately followed 
by its matching
+                // "mail-[name-]$ts". The watermark is only emitted downstream 
once all firings in
+                // the batch have completed.
+                assertThat(labels)
+                        .containsExactly(
+                                recordLabel(1000L, null),
+                                recordLabel(2000L, null),
+                                recordLabel(3000L, null),
+                                firedLabel(null, 1000L, 5000L),
+                                mailLabel(null, 1000L),
+                                firedLabel(null, 2000L, 5000L),
+                                mailLabel(null, 2000L),
+                                firedLabel(null, 3000L, 5000L),
+                                mailLabel(null, 3000L),
+                                firedLabel(NAMED_TIMER, 1000L, 5000L),
+                                mailLabel(NAMED_TIMER, 1000L),
+                                watermarkLabel(5000L),
+                                recordLabel(6000L, 5000L),
+                                firedLabel(null, 6000L, 7000L),
+                                mailLabel(null, 6000L),
+                                watermarkLabel(7000L));
+            } else {
+                // Without interruptible timers, all timers in a batch fire 
synchronously (emitting
+                // their "fired" rows) before the watermark is emitted 
downstream. Only afterwards
+                // does the mailbox drain the scheduled "mail" rows in FIFO 
order.
+                assertThat(labels)
+                        .containsExactly(
+                                recordLabel(1000L, null),
+                                recordLabel(2000L, null),
+                                recordLabel(3000L, null),
+                                firedLabel(null, 1000L, 5000L),
+                                firedLabel(null, 2000L, 5000L),
+                                firedLabel(null, 3000L, 5000L),
+                                firedLabel(NAMED_TIMER, 1000L, 5000L),
+                                watermarkLabel(5000L),
+                                mailLabel(null, 1000L),
+                                mailLabel(null, 2000L),
+                                mailLabel(null, 3000L),
+                                mailLabel(NAMED_TIMER, 1000L),
+                                recordLabel(6000L, 5000L),
+                                firedLabel(null, 6000L, 7000L),
+                                watermarkLabel(7000L),
+                                mailLabel(null, 6000L));
+            }
+        }
+    }
+
+    private StreamTaskMailboxTestHarness<RowData> createHarness(boolean 
interruptibleTimers)
+            throws Exception {
+        return new StreamTaskMailboxTestHarnessBuilder<>(
+                        MultipleInputStreamTask::new, OUTPUT_TYPE_INFO)
+                .addJobConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofSeconds(1))
+                .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, true)
+                .addJobConfig(
+                        
CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS,
+                        interruptibleTimers)
+                .setKeyType(KEY_SELECTOR.getProducedType())
+                .addInput(INPUT_TYPE_INFO, 1, KEY_SELECTOR)
+                .setupOperatorChain(new TestOperatorFactory())
+                .name("ptf")
+                .finishForSingletonOperatorChain(
+                        OUTPUT_TYPE_INFO.createSerializer(new 
SerializerConfigImpl()))
+                .build();
+    }
+
+    private static String describe(Object obj) {
+        if (obj instanceof Watermark) {
+            return watermarkLabel(((Watermark) obj).getTimestamp());
+        } else if (obj instanceof StreamRecord) {
+            Object value = ((StreamRecord<?>) obj).getValue();
+            if (value instanceof RowData) {
+                // Field 1 is the PTF-emitted label (see OUTPUT_TYPE); fields 
0 and 2 are the
+                // pass-through partition key and rowtime.
+                return ((RowData) value).getString(1).toString();
+            }
+            return value.toString();
+        }
+        return obj.toString();
+    }
+
+    private static String recordLabel(long ts, @Nullable Long watermark) {
+        return "record-" + ts + "@wm" + (watermark == null ? "null" : 
watermark);
+    }
+
+    private static String firedLabel(@Nullable String name, long ts, long 
watermark) {
+        return "fired-" + (name == null ? "" : name + "-") + ts + "@wm" + 
watermark;
+    }
+
+    private static String mailLabel(@Nullable String name, long ts) {
+        return "mail-" + (name == null ? "" : name + "-") + ts;
+    }
+
+    private static String watermarkLabel(long ts) {
+        return "Watermark@" + ts;
+    }
+
+    private static RuntimeTableSemantics tableSemantics() {
+        return new RuntimeTableSemantics(
+                "r",
+                0,
+                INPUT_TYPE,
+                new int[] {0},
+                new int[0],
+                new RuntimeTableSemantics.SortDirection[0],
+                RuntimeChangelogMode.serialize(ChangelogMode.insertOnly()),
+                /* passColumnsThrough */ false,
+                /* hasSetSemantics */ true,
+                /* timeColumn */ 1);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Factory and operator
+    // 
--------------------------------------------------------------------------------------------
+
+    private static class TestOperatorFactory extends 
AbstractStreamOperatorFactory<RowData> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <T extends StreamOperator<RowData>> T createStreamOperator(
+                StreamOperatorParameters<RowData> parameters) {
+            return (T) new TestProcessSetTableOperator(parameters);
+        }
+
+        @Override
+        @SuppressWarnings("rawtypes")
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return TestProcessSetTableOperator.class;
+        }
+    }
+
+    /**
+     * {@link ProcessSetTableOperator} with a test-only override of {@link 
#onEventTime} that
+     * schedules a non-deferrable "mail-[name-]$ts" emission after each timer 
firing. The PTF user
+     * API does not expose the mailbox; this test-only shortcut is the only 
way to observe whether
+     * interruption happens between timer firings driven by {@link
+     * ProcessTableRunner#callOnTimer()}.
+     */
+    private static class TestProcessSetTableOperator extends 
ProcessSetTableOperator {
+
+        private final MailboxExecutor mailboxExecutor;
+
+        TestProcessSetTableOperator(StreamOperatorParameters<RowData> 
parameters) {
+            super(
+                    parameters,
+                    List.of(tableSemantics()),
+                    List.of(),
+                    // No ORDER BY on the input, so the comparator slot is 
null (the operator
+                    // short-circuits sort-buffer setup when orderByColumns is 
empty). The array
+                    // size must still match the number of inputs.
+                    new RecordComparator[] {null},
+                    new TestProcessTableRunner(),
+                    new HashFunction[0],
+                    new RecordEqualiser[0],
+                    RuntimeChangelogMode.serialize(ChangelogMode.insertOnly()),
+                    List.of());
+            this.mailboxExecutor = parameters.getMailboxExecutor();
+        }
+
+        @Override
+        public void onEventTime(InternalTimer<RowData, Object> timer) throws 
Exception {
+            // Runs the full PTF path: ingestTimerEvent -> processOnTimer -> 
callOnTimer, emitting
+            // "fired-[name-]$ts" via onTimerCollector.
+            super.onEventTime(timer);
+            long ts = timer.getTimestamp();
+            long keyValue = timer.getKey().getLong(0);
+            final Object ns = timer.getNamespace();
+            final String name = (ns instanceof StringData) ? ns.toString() : 
null;
+            mailboxExecutor.execute(
+                    () ->
+                            output.collect(
+                                    new StreamRecord<>(
+                                            GenericRowData.of(
+                                                    keyValue,
+                                                    
StringData.fromString(mailLabel(name, ts)),
+                                                    
TimestampData.fromEpochMillis(ts)))),
+                    mailLabel(name, ts));
+        }
+    }
+
+    /**
+     * Minimal {@link ProcessTableRunner}: {@link #callEval} emits a 
"record-$ts@wm$wm" row and
+     * registers an unnamed event-time timer at the input row's event time 
(plus a named timer for
+     * the row at ts=1000); {@link #callOnTimer} emits a 
"fired-[name-]$ts@wm$wm" row.
+     */
+    private static class TestProcessTableRunner extends ProcessTableRunner {
+
+        @Override
+        public void callEval() {
+            final TimeContext<Long> tc = runnerContext.timeContext(Long.class);
+            final long ts = Objects.requireNonNull(tc.time(), "input row event 
time");
+            final Long wm = tc.currentWatermark();
+            tc.registerOnTime(ts);
+            if (ts == 1000L) {
+                tc.registerOnTime(NAMED_TIMER, ts);
+            }
+            
evalCollector.collect(GenericRowData.of(StringData.fromString(recordLabel(ts, 
wm))));
+        }
+
+        @Override
+        public void callOnTimer() {
+            final TimeContext<Long> tc = 
runnerOnTimerContext.timeContext(Long.class);
+            final long ts = Objects.requireNonNull(tc.time(), "timer fire 
time");
+            final long wm =
+                    Objects.requireNonNull(tc.currentWatermark(), "watermark 
at timer fire");
+            final String name = runnerOnTimerContext.currentTimer();
+            onTimerCollector.collect(
+                    GenericRowData.of(StringData.fromString(firedLabel(name, 
ts, wm))));
+        }
+    }
+}

Reply via email to