This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4a0c2e40506 [FLINK-37521][runtime] Implement async state version of
KeyedCoProcessOperator (#26328)
4a0c2e40506 is described below
commit 4a0c2e40506558e877b6b4188ab2ea5218c908a8
Author: Yanfei Lei <[email protected]>
AuthorDate: Thu Apr 3 10:27:45 2025 +0800
[FLINK-37521][runtime] Implement async state version of
KeyedCoProcessOperator (#26328)
---
.../DeclaringAsyncKeyedCoProcessFunction.java | 130 ++++
.../AbstractAsyncStateStreamOperator.java | 18 +-
.../{ => co}/AsyncIntervalJoinOperator.java | 8 +-
.../operators/co/AsyncKeyedCoProcessOperator.java | 238 +++++++
...ncKeyedCoProcessOperatorWithWatermarkDelay.java | 48 ++
.../streaming/api/datastream/ConnectedStreams.java | 38 +-
.../streaming/api/datastream/KeyedStream.java | 2 +-
.../transformations/TwoInputTransformation.java | 11 +
.../AbstractAsyncStateStreamOperatorTest.java | 7 +-
.../operators/AsyncIntervalJoinOperatorTest.java | 1 +
.../operators/AsyncKeyedCoProcessOperatorTest.java | 746 +++++++++++++++++++++
.../nodes/exec/stream/StreamExecIntervalJoin.java | 4 +-
12 files changed, 1236 insertions(+), 15 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/functions/DeclaringAsyncKeyedCoProcessFunction.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/functions/DeclaringAsyncKeyedCoProcessFunction.java
new file mode 100644
index 00000000000..ab27c0095d3
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/functions/DeclaringAsyncKeyedCoProcessFunction.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/**
+ * A function that processes elements of two keyed streams and produces a
single output stream.
+ *
+ * <p>The function will be called for every element in the input streams and
can produce zero or
+ * more output elements. Contrary to the {@link CoFlatMapFunction}, this
function can also query the
+ * time (both event and processing) and set timers, through the provided
{@link Context}. When
+ * reacting to the firing of timers the function can emit yet more elements.
+ *
+ * <p>An example use case for connected streams is the application of a set of
rules that change
+ * over time ({@code stream A}) to the elements contained in another stream
(stream {@code B}). The
+ * rules contained in {@code stream A} can be stored in the state and wait for
new elements to
+ * arrive on {@code stream B}. Upon reception of a new element on {@code
stream B}, the function can
+ * apply the previously stored rules to the element and emit a result, and/or
register a timer that
+ * will trigger an action in the future.
+ *
+ * @param <K> Type of the key.
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Output type.
+ */
+@Internal
+public abstract class DeclaringAsyncKeyedCoProcessFunction<K, IN1, IN2, OUT>
+ extends KeyedCoProcessFunction<K, IN1, IN2, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Override and finalize this method. Please use {@link #declareProcess1}
instead. */
+ @Override
+ public final void processElement1(IN1 value, Context ctx, Collector<OUT>
out) throws Exception {
+ throw new IllegalAccessException("This method is replaced by
declareProcess1.");
+ }
+
+ /** Override and finalize this method. Please use {@link #declareProcess2}
instead. */
+ @Override
+ public final void processElement2(IN2 value, Context ctx, Collector<OUT>
out) throws Exception {
+ throw new IllegalAccessException("This method is replaced by
declareProcess2.");
+ }
+
+ /** Override and finalize this method. Please use {@link #declareOnTimer}
instead. */
+ public final void onTimer(long timestamp, OnTimerContext ctx,
Collector<OUT> out)
+ throws Exception {
+ throw new IllegalAccessException("This method is replaced by
declareOnTimer.");
+ }
+
+ /**
+ * Declaring variables before {@link #declareProcess1} and {@link
#declareProcess2} and {@link
+ * #declareOnTimer}.
+ */
+ public void declareVariables(DeclarationContext context) {}
+
+ /**
+ * Declare a process for one element from the first of the connected
streams.
+ *
+ * <p>This function can output zero or more elements using the {@link
Collector} parameter and
+ * also update internal state or set timers using the {@link Context}
parameter.
+ *
+ * @param context the context that provides useful methods to define named
callbacks.
+ * @param ctx A {@link Context} that allows querying the timestamp of the
element and getting a
+ * {@link TimerService} for registering timers and querying the time.
The context is only
+ * valid during the invocation of this method, do not store it.
+ * @param out The collector for returning result values.
+ * @return the whole processing logic just like {@code processElement}.
+ */
+ public abstract ThrowingConsumer<IN1, Exception> declareProcess1(
+ DeclarationContext context, Context ctx, Collector<OUT> out)
+ throws DeclarationException;
+
+ /**
+ * Declare a process for one element from the second of the connected
streams.
+ *
+ * <p>This function can output zero or more elements using the {@link
Collector} parameter and
+ * also update internal state or set timers using the {@link Context}
parameter.
+ *
+ * @param context the context that provides useful methods to define named
callbacks.
+ * @param ctx A {@link Context} that allows querying the timestamp of the
element and getting a
+ * {@link TimerService} for registering timers and querying the time.
The context is only
+ * valid during the invocation of this method, do not store it.
+ * @param out The collector for returning result values.
+ * @return the whole processing logic just like {@code processElement}.
+ */
+ public abstract ThrowingConsumer<IN2, Exception> declareProcess2(
+ DeclarationContext context, Context ctx, Collector<OUT> out)
+ throws DeclarationException;
+
+ /**
+ * Declare a procedure which is called when a timer set using {@link
TimerService} fires.
+ *
+ * @param context the context that provides useful methods to define named
callbacks.
+ * @param ctx An {@link OnTimerContext} that allows querying the
timestamp, the {@link
+ * TimeDomain}, and the key of the firing timer and getting a {@link
TimerService} for
+ * registering timers and querying the time. The context is only valid
during the invocation
+ * of this method, do not store it.
+ * @param out The processor for processing timestamps.
+ */
+ public ThrowingConsumer<Long, Exception> declareOnTimer(
+ DeclarationContext context, OnTimerContext ctx, Collector<OUT> out)
+ throws DeclarationException {
+ return (t) -> {};
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
index df79c512a1d..0c702ed483e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
@@ -383,12 +383,18 @@ public abstract class
AbstractAsyncStateStreamOperator<OUT> extends AbstractStre
}
/**
- * A hook that will be invoked after finishing advancing the watermark. It
is not recommended to
- * perform async state here. Only some synchronous logic is suggested.
+ * A hook that will be invoked after finishing advancing the watermark and
right before the
+ * watermark being emitting downstream. Here is a chance for customization
of the emitting
+ * watermark. It is not recommended to perform async state here. Only some
synchronous logic is
+ * suggested.
*
* @param watermark the advanced watermark.
+ * @return the watermark that should be emitted to downstream. Null if
there is no need for
+ * following emitting.
*/
- public void postProcessWatermark(Watermark watermark) throws Exception {}
+ public Watermark postProcessWatermark(Watermark watermark) throws
Exception {
+ return watermark;
+ }
/**
* Process a watermark when receiving it. Do not override this method
since the async processing
@@ -425,8 +431,10 @@ public abstract class
AbstractAsyncStateStreamOperator<OUT> extends AbstractStre
},
() -> {
if (watermarkRef.get() != null) {
- output.emitWatermark(watermarkRef.get());
- postProcessWatermark(watermarkRef.get());
+ Watermark postProcessWatermark =
postProcessWatermark(watermarkRef.get());
+ if (postProcessWatermark != null) {
+ output.emitWatermark(postProcessWatermark);
+ }
}
});
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncIntervalJoinOperator.java
similarity index 97%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncIntervalJoinOperator.java
index 1480fc0e4cd..9eb54a23a5a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncIntervalJoinOperator.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.asyncprocessing.operators;
+package org.apache.flink.runtime.asyncprocessing.operators.co;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
@@ -28,6 +28,8 @@ import
org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
+import
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
+import
org.apache.flink.runtime.asyncprocessing.operators.TimestampedCollectorWithDeclaredVariable;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -417,12 +419,12 @@ public class AsyncIntervalJoinOperator<K, T1, T2, OUT>
}
@VisibleForTesting
- MapState<Long, List<IntervalJoinOperator.BufferEntry<T1>>> getLeftBuffer()
{
+ public MapState<Long, List<IntervalJoinOperator.BufferEntry<T1>>>
getLeftBuffer() {
return leftBuffer;
}
@VisibleForTesting
- MapState<Long, List<IntervalJoinOperator.BufferEntry<T2>>>
getRightBuffer() {
+ public MapState<Long, List<IntervalJoinOperator.BufferEntry<T2>>>
getRightBuffer() {
return rightBuffer;
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperator.java
new file mode 100644
index 00000000000..03454e22709
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperator.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
+import
org.apache.flink.runtime.asyncprocessing.functions.DeclaringAsyncKeyedCoProcessFunction;
+import
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
+import
org.apache.flink.runtime.asyncprocessing.operators.TimestampedCollectorWithDeclaredVariable;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for
executing keyed {@link
+ * KeyedCoProcessFunction KeyedCoProcessFunction}.
+ */
+@Internal
+public class AsyncKeyedCoProcessOperator<K, IN1, IN2, OUT>
+ extends AbstractAsyncStateUdfStreamOperator<OUT,
KeyedCoProcessFunction<K, IN1, IN2, OUT>>
+ implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K,
VoidNamespace> {
+
+ private static final long serialVersionUID = 1L;
+
+ // Shared timestamp variable for collector, context and onTimerContext.
+ private transient DeclaredVariable<Long> sharedTimestamp;
+
+ private transient TimestampedCollectorWithDeclaredVariable<OUT> collector;
+
+ private transient ContextImpl<K, IN1, IN2, OUT> context;
+
+ private transient OnTimerContextImpl<K, IN1, IN2, OUT> onTimerContext;
+
+ private transient ThrowingConsumer<IN1, Exception> processor1;
+ private transient ThrowingConsumer<IN2, Exception> processor2;
+ private transient ThrowingConsumer<Long, Exception> timerProcessor;
+
+ public AsyncKeyedCoProcessOperator(
+ KeyedCoProcessFunction<K, IN1, IN2, OUT> keyedCoProcessFunction) {
+ super(keyedCoProcessFunction);
+ }
+
+ @Override
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void open() throws Exception {
+ super.open();
+ sharedTimestamp =
+ declarationContext.declareVariable(
+ LongSerializer.INSTANCE,
+ "_AsyncCoKeyedProcessOperator$sharedTimestamp",
+ null);
+
+ collector = new TimestampedCollectorWithDeclaredVariable<>(output,
sharedTimestamp);
+
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers",
VoidNamespaceSerializer.INSTANCE, this);
+
+ TimerService timerService = new
SimpleTimerService(internalTimerService);
+
+ context = new ContextImpl<>(userFunction, timerService,
sharedTimestamp);
+ onTimerContext = new OnTimerContextImpl<>(userFunction, timerService,
declarationContext);
+ if (userFunction instanceof DeclaringAsyncKeyedCoProcessFunction) {
+ DeclaringAsyncKeyedCoProcessFunction declaringFunction =
+ (DeclaringAsyncKeyedCoProcessFunction) userFunction;
+ declaringFunction.declareVariables(declarationContext);
+ processor1 = declaringFunction.declareProcess1(declarationContext,
context, collector);
+ processor2 = declaringFunction.declareProcess2(declarationContext,
context, collector);
+ timerProcessor =
+ declaringFunction.declareOnTimer(declarationContext,
onTimerContext, collector);
+ } else {
+ processor1 = (in) -> userFunction.processElement1(in, context,
collector);
+ processor2 = (in) -> userFunction.processElement2(in, context,
collector);
+ timerProcessor = (in) -> userFunction.onTimer(in, onTimerContext,
collector);
+ }
+ }
+
+ @Override
+ public void processElement1(StreamRecord<IN1> element) throws Exception {
+ collector.setTimestamp(element);
+ processor1.accept(element.getValue());
+ }
+
+ @Override
+ public void processElement2(StreamRecord<IN2> element) throws Exception {
+ collector.setTimestamp(element);
+ processor2.accept(element.getValue());
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws
Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ invokeUserFunction(TimeDomain.EVENT_TIME, timer);
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws
Exception {
+ collector.eraseTimestamp();
+ invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
+ }
+
+ private void invokeUserFunction(TimeDomain timeDomain, InternalTimer<K,
VoidNamespace> timer)
+ throws Exception {
+ onTimerContext.setTime(timer.getTimestamp(), timeDomain);
+ timerProcessor.accept(timer.getTimestamp());
+ }
+
+ public class ContextImpl<K, IN1, IN2, OUT>
+ extends KeyedCoProcessFunction<K, IN1, IN2, OUT>.Context {
+
+ private final TimerService timerService;
+
+ private final DeclaredVariable<Long> timestamp;
+
+ ContextImpl(
+ KeyedCoProcessFunction<K, IN1, IN2, OUT> function,
+ TimerService timerService,
+ DeclaredVariable<Long> timestamp) {
+ function.super();
+ this.timerService = checkNotNull(timerService);
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public Long timestamp() {
+ return timestamp.get();
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+
+ @Override
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ if (outputTag == null) {
+ throw new IllegalArgumentException("OutputTag must not be
null.");
+ }
+
+ output.collect(outputTag, new StreamRecord<>(value,
timestamp.get()));
+ }
+
+ @Override
+ public K getCurrentKey() {
+ return (K) AsyncKeyedCoProcessOperator.this.getCurrentKey();
+ }
+ }
+
+ private class OnTimerContextImpl<K, IN1, IN2, OUT>
+ extends KeyedCoProcessFunction<K, IN1, IN2, OUT>.OnTimerContext {
+
+ private final TimerService timerService;
+
+ private final DeclaredVariable<String> timeDomain;
+
+ private final DeclaredVariable<Long> timestamp;
+
+ OnTimerContextImpl(
+ KeyedCoProcessFunction<K, IN1, IN2, OUT> function,
+ TimerService timerService,
+ DeclarationContext declarationContext) {
+ function.super();
+ this.timerService = checkNotNull(timerService);
+ this.timeDomain =
+ declarationContext.declareVariable(
+ StringSerializer.INSTANCE,
"_OnTimerContextImpl$timeDomain", null);
+ this.timestamp =
+ declarationContext.declareVariable(
+ LongSerializer.INSTANCE,
"_OnTimerContextImpl$timestamp", null);
+ }
+
+ public void setTime(long time, TimeDomain one) {
+ timestamp.set(time);
+ timeDomain.set(one.name());
+ }
+
+ @Override
+ public Long timestamp() {
+ checkState(timestamp.get() != null);
+ return timestamp.get();
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+
+ @Override
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ if (outputTag == null) {
+ throw new IllegalArgumentException("OutputTag must not be
null.");
+ }
+
+ output.collect(outputTag, new StreamRecord<>(value, timestamp()));
+ }
+
+ @Override
+ public TimeDomain timeDomain() {
+ checkState(timeDomain.get() != null);
+ return TimeDomain.valueOf(timeDomain.get());
+ }
+
+ @Override
+ public K getCurrentKey() {
+ return (K) AsyncKeyedCoProcessOperator.this.getCurrentKey();
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperatorWithWatermarkDelay.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperatorWithWatermarkDelay.java
new file mode 100644
index 00000000000..ace1297a0a9
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperatorWithWatermarkDelay.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.operators.co;
+
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.Preconditions;
+
+/** A {@link KeyedCoProcessOperator} that supports holding back watermarks
with a static delay. */
+public class AsyncKeyedCoProcessOperatorWithWatermarkDelay<K, IN1, IN2, OUT>
+ extends AsyncKeyedCoProcessOperator<K, IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private final long watermarkDelay;
+
+ public AsyncKeyedCoProcessOperatorWithWatermarkDelay(
+ KeyedCoProcessFunction<K, IN1, IN2, OUT> keyedCoProcessFunction,
long watermarkDelay) {
+ super(keyedCoProcessFunction);
+ Preconditions.checkArgument(
+ watermarkDelay >= 0, "The watermark delay should be
non-negative.");
+ this.watermarkDelay = watermarkDelay;
+ }
+
+ @Override
+ public Watermark postProcessWatermark(Watermark watermark) throws
Exception {
+ if (watermarkDelay == 0) {
+ return watermark;
+ } else {
+ return new Watermark(watermark.getTimestamp() - watermarkDelay);
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 3c9347c9a25..45296ddfad8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.api.datastream;
+import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
@@ -26,6 +27,7 @@ import
org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import
org.apache.flink.runtime.asyncprocessing.operators.co.AsyncKeyedCoProcessOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -69,12 +71,20 @@ public class ConnectedStreams<IN1, IN2> {
protected final StreamExecutionEnvironment environment;
protected final DataStream<IN1> inputStream1;
protected final DataStream<IN2> inputStream2;
+ protected boolean isEnableAsyncState;
protected ConnectedStreams(
StreamExecutionEnvironment env, DataStream<IN1> input1,
DataStream<IN2> input2) {
this.environment = requireNonNull(env);
this.inputStream1 = requireNonNull(input1);
this.inputStream2 = requireNonNull(input2);
+ if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof
KeyedStream)) {
+ this.isEnableAsyncState =
+ ((KeyedStream) inputStream1).isEnableAsyncState()
+ && ((KeyedStream)
inputStream2).isEnableAsyncState();
+ } else {
+ this.isEnableAsyncState = false;
+ }
}
public StreamExecutionEnvironment getExecutionEnvironment() {
@@ -439,7 +449,12 @@ public class ConnectedStreams<IN1, IN2> {
TwoInputStreamOperator<IN1, IN2, R> operator;
if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof
KeyedStream)) {
- operator = new
KeyedCoProcessOperator<>(inputStream1.clean(keyedCoProcessFunction));
+ operator =
+ isEnableAsyncState
+ ? new AsyncKeyedCoProcessOperator<>(
+ inputStream1.clean(keyedCoProcessFunction))
+ : new KeyedCoProcessOperator<>(
+
inputStream1.clean(keyedCoProcessFunction));
} else {
throw new UnsupportedOperationException(
"KeyedCoProcessFunction can only be used "
@@ -523,4 +538,25 @@ public class ConnectedStreams<IN1, IN2> {
return returnStream;
}
+
+ /**
+ * Enable the async state processing for following keyed processing
function on connected
+ * streams. This also requires only State V2 APIs are used in the function.
+ *
+ * @return the configured ConnectedStreams itself.
+ */
+ @Experimental
+ public ConnectedStreams<IN1, IN2> enableAsyncState() {
+ if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof
KeyedStream)) {
+ ((KeyedStream<?, ?>) inputStream1).enableAsyncState();
+ ((KeyedStream<?, ?>) inputStream2).enableAsyncState();
+ this.isEnableAsyncState = true;
+ } else {
+ throw new UnsupportedOperationException(
+ "The connected streams do not support async state, "
+ + "please ensure that two input streams of your
connected streams are "
+ + "keyed stream(not behind a keyBy()).");
+ }
+ return this;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index dd4818fe30f..f082f11b8e6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -35,8 +35,8 @@ import
org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import
org.apache.flink.runtime.asyncprocessing.operators.AsyncIntervalJoinOperator;
import org.apache.flink.runtime.asyncprocessing.operators.AsyncStreamFlatMap;
+import
org.apache.flink.runtime.asyncprocessing.operators.co.AsyncIntervalJoinOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
index 007f908c809..43566988b1e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
@@ -27,6 +27,7 @@ import
org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
import java.util.ArrayList;
import java.util.List;
@@ -239,4 +240,14 @@ public class TwoInputTransformation<IN1, IN2, OUT> extends
PhysicalTransformatio
public boolean isInternalSorterSupported() {
return
operatorFactory.getOperatorAttributes().isInternalSorterSupported();
}
+
+ @Override
+ public void enableAsyncState() {
+ TwoInputStreamOperator<IN1, IN2, OUT> operator =
+ (TwoInputStreamOperator<IN1, IN2, OUT>)
+ ((SimpleOperatorFactory<OUT>)
operatorFactory).getOperator();
+ if (!(operator instanceof AsyncStateProcessingOperator)) {
+ super.enableAsyncState();
+ }
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
index bb234af0eef..e5452f51656 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
@@ -371,8 +371,8 @@ public class AbstractAsyncStateStreamOperatorTest {
expectedOutput.add(new StreamRecord<>(1002L));
expectedOutput.add(new StreamRecord<>(1L));
expectedOutput.add(new StreamRecord<>(3L));
- expectedOutput.add(new Watermark(3L));
expectedOutput.add(new StreamRecord<>(103L));
+ expectedOutput.add(new Watermark(3L));
testHarness.processWatermark1(new Watermark(4L));
testHarness.processWatermark2(new Watermark(4L));
expectedOutput.add(new StreamRecord<>(1004L));
@@ -380,8 +380,8 @@ public class AbstractAsyncStateStreamOperatorTest {
testHarness.processWatermark2(new Watermark(5L));
expectedOutput.add(new StreamRecord<>(1005L));
expectedOutput.add(new StreamRecord<>(4L));
- expectedOutput.add(new Watermark(6L));
expectedOutput.add(new StreamRecord<>(106L));
+ expectedOutput.add(new Watermark(6L));
TestHarnessUtil.assertOutputEquals(
"Output was not correct", expectedOutput,
testHarness.getOutput());
@@ -690,10 +690,11 @@ public class AbstractAsyncStateStreamOperatorTest {
}
@Override
- public void postProcessWatermark(Watermark watermark) throws Exception
{
+ public Watermark postProcessWatermark(Watermark watermark) throws
Exception {
if (postProcessFunction != null) {
postProcessFunction.accept(watermark);
}
+ return watermark;
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java
index dc08ed7f76c..2c250380969 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
+import
org.apache.flink.runtime.asyncprocessing.operators.co.AsyncIntervalJoinOperator;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/asyncprocessing/operators/AsyncKeyedCoProcessOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/asyncprocessing/operators/AsyncKeyedCoProcessOperatorTest.java
new file mode 100644
index 00000000000..d7113471c35
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/asyncprocessing/operators/AsyncKeyedCoProcessOperatorTest.java
@@ -0,0 +1,746 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.asyncprocessing.operators;
+
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.runtime.asyncprocessing.declare.ContextVariable;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException;
+import
org.apache.flink.runtime.asyncprocessing.functions.DeclaringAsyncKeyedCoProcessFunction;
+import
org.apache.flink.runtime.asyncprocessing.operators.co.AsyncKeyedCoProcessOperator;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import
org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AsyncKeyedCoProcessOperator}. */
+class AsyncKeyedCoProcessOperatorTest {
+
+ @Test
+ void testDeclareProcessor() throws Exception {
+ TestChainDeclarationFunction function = new
TestChainDeclarationFunction();
+ AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+ new AsyncKeyedCoProcessOperator<>(function);
+
+ AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String,
String> testHarness =
+ AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ ArrayList<StreamRecord<String>> expectedOutput = new ArrayList<>();
+
+ testHarness.open();
+ testHarness.processElement1(new StreamRecord<>(5));
+ expectedOutput.add(new StreamRecord<>("11"));
+ assertThat(function.value.get()).isEqualTo(11);
+ testHarness.processElement2(new StreamRecord<>("6"));
+ expectedOutput.add(new StreamRecord<>("6"));
+ assertThat(function.value.get()).isEqualTo(17);
+
assertThat(testHarness.getOutput()).containsExactly(expectedOutput.toArray());
+ }
+
+ @Test
+ void testTimestampAndWatermarkQuerying() throws Exception {
+
+ AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+ new AsyncKeyedCoProcessOperator<>(new
WatermarkQueryingProcessFunction());
+
+ AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String,
String> testHarness =
+ AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark1(new Watermark(17));
+ testHarness.processWatermark2(new Watermark(17));
+ testHarness.processElement1(new StreamRecord<>(5, 12L));
+
+ testHarness.processWatermark1(new Watermark(42));
+ testHarness.processWatermark2(new Watermark(42));
+ testHarness.processElement2(new StreamRecord<>("6", 13L));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(17L));
+ expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L));
+ expectedOutput.add(new Watermark(42L));
+ expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L));
+
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ void testTimestampAndProcessingTimeQuerying() throws Exception {
+
+ AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+ new AsyncKeyedCoProcessOperator<>(new
ProcessingTimeQueryingProcessFunction());
+
+ AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String,
String> testHarness =
+ AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(17);
+ testHarness.processElement1(new StreamRecord<>(5));
+
+ testHarness.setProcessingTime(42);
+ testHarness.processElement2(new StreamRecord<>("6"));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("5PT:17 TS:null"));
+ expectedOutput.add(new StreamRecord<>("6PT:42 TS:null"));
+
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ void testEventTimeTimers() throws Exception {
+
+ AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+ new AsyncKeyedCoProcessOperator<>(new
EventTimeTriggeringProcessFunction());
+
+ AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String,
String> testHarness =
+ AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement1(new StreamRecord<>(17, 42L));
+ testHarness.processElement2(new StreamRecord<>("18", 42L));
+
+ testHarness.processWatermark1(new Watermark(5));
+ testHarness.processWatermark2(new Watermark(5));
+
+ testHarness.processWatermark1(new Watermark(6));
+ testHarness.processWatermark2(new Watermark(6));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
+ expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
+ expectedOutput.add(new StreamRecord<>("17:1777", 5L));
+ expectedOutput.add(new Watermark(5L));
+ expectedOutput.add(new StreamRecord<>("18:1777", 6L));
+ expectedOutput.add(new Watermark(6L));
+
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ void testProcessingTimeTimers() throws Exception {
+
+ AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+ new AsyncKeyedCoProcessOperator<>(new
ProcessingTimeTriggeringProcessFunction());
+
+ AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String,
String> testHarness =
+ AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement1(new StreamRecord<>(17));
+ testHarness.processElement2(new StreamRecord<>("18"));
+
+ testHarness.setProcessingTime(5);
+ testHarness.setProcessingTime(6);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+ expectedOutput.add(new StreamRecord<>("INPUT2:18"));
+ expectedOutput.add(new StreamRecord<>("1777"));
+ expectedOutput.add(new StreamRecord<>("1777"));
+
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ /** Verifies that we don't have leakage between different keys. */
+ @Test
+ void testEventTimeTimerWithState() throws Exception {
+
+ AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+ new AsyncKeyedCoProcessOperator<>(new
EventTimeTriggeringStatefulProcessFunction());
+
+ AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String,
String> testHarness =
+ AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark1(new Watermark(1));
+ testHarness.processWatermark2(new Watermark(1));
+ testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set
timer for 6
+ testHarness.processElement1(new StreamRecord<>(13, 0L)); // should set
timer for 6
+
+ testHarness.processWatermark1(new Watermark(2));
+ testHarness.processWatermark2(new Watermark(2));
+ testHarness.processElement1(new StreamRecord<>(13, 1L)); // should
delete timer
+ testHarness.processElement2(new StreamRecord<>("42", 1L)); // should
set timer for 7
+
+ testHarness.processWatermark1(new Watermark(6));
+ testHarness.processWatermark2(new Watermark(6));
+
+ testHarness.processWatermark1(new Watermark(7));
+ testHarness.processWatermark2(new Watermark(7));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(1L));
+ expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L));
+ expectedOutput.add(new StreamRecord<>("INPUT1:13", 0L));
+ expectedOutput.add(new Watermark(2L));
+ expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L));
+ expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+ expectedOutput.add(new Watermark(6L));
+ expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+ expectedOutput.add(new Watermark(7L));
+
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ /** Verifies that we don't have leakage between different keys. */
+ @Test
+ void testProcessingTimeTimerWithState() throws Exception {
+
+ AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+ new AsyncKeyedCoProcessOperator<>(
+ new ProcessingTimeTriggeringStatefulProcessFunction());
+
+ AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String,
String> testHarness =
+ AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(1);
+ testHarness.processElement1(new StreamRecord<>(17)); // should set
timer for 6
+ testHarness.processElement1(new StreamRecord<>(13)); // should set
timer for 6
+
+ testHarness.setProcessingTime(2);
+ testHarness.processElement1(new StreamRecord<>(13)); // should delete
timer again
+ testHarness.processElement2(new StreamRecord<>("42")); // should set
timer for 7
+
+ testHarness.setProcessingTime(6);
+ testHarness.setProcessingTime(7);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+ expectedOutput.add(new StreamRecord<>("INPUT1:13"));
+ expectedOutput.add(new StreamRecord<>("INPUT2:42"));
+ expectedOutput.add(new StreamRecord<>("STATE:17"));
+ expectedOutput.add(new StreamRecord<>("STATE:42"));
+
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ void testSnapshotAndRestore() throws Exception {
+
+ AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+ new AsyncKeyedCoProcessOperator<>(new
BothTriggeringProcessFunction());
+
+ AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String,
String> testHarness =
+ AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement1(new StreamRecord<>(5, 12L));
+ testHarness.processElement2(new StreamRecord<>("5", 12L));
+
+ // snapshot and restore from scratch
+ OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
+
+ testHarness.close();
+
+ operator = new AsyncKeyedCoProcessOperator<>(new
BothTriggeringProcessFunction());
+
+ testHarness =
+ AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.setProcessingTime(5);
+ testHarness.processWatermark1(new Watermark(6));
+ testHarness.processWatermark2(new Watermark(6));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("PROC:1777"));
+ expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+ expectedOutput.add(new Watermark(6));
+
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ void testGetCurrentKeyFromContext() throws Exception {
+ AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+ new AsyncKeyedCoProcessOperator<>(new
AppendCurrentKeyProcessFunction());
+
+ AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String,
String> testHarness =
+ AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement1(new StreamRecord<>(5));
+ testHarness.processElement1(new StreamRecord<>(6));
+ testHarness.processElement2(new StreamRecord<>("hello"));
+ testHarness.processElement2(new StreamRecord<>("world"));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("5,5"));
+ expectedOutput.add(new StreamRecord<>("6,6"));
+ expectedOutput.add(new StreamRecord<>("hello,hello"));
+ expectedOutput.add(new StreamRecord<>("world,world"));
+
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ private static class IntToStringKeySelector<T> implements
KeySelector<Integer, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Integer value) throws Exception {
+ return "" + value;
+ }
+ }
+
+ private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T getKey(T value) throws Exception {
+ return value;
+ }
+ }
+
+ private static class TestChainDeclarationFunction
+ extends DeclaringAsyncKeyedCoProcessFunction<String, Integer,
String, String> {
+
+ final AtomicInteger value = new AtomicInteger(0);
+
+ @Override
+ public ThrowingConsumer<Integer, Exception> declareProcess1(
+ DeclarationContext context,
+ KeyedCoProcessFunction<String, Integer, String,
String>.Context ctx,
+ Collector<String> out)
+ throws DeclarationException {
+ ContextVariable<Integer> inputValue =
context.declareVariable(null);
+ return context.<Integer>declareChain()
+ .thenCompose(
+ e -> {
+ if (inputValue.get() == null) {
+ inputValue.set(e);
+ }
+ value.addAndGet(e);
+ return StateFutureUtils.completedVoidFuture();
+ })
+ .thenCompose(v ->
StateFutureUtils.completedFuture(value.incrementAndGet()))
+ .withName("adder")
+ .thenAccept(
+ (v) -> {
+ value.addAndGet(inputValue.get());
+ out.collect(String.valueOf(value.get()));
+ })
+ .withName("doubler")
+ .finish();
+ }
+
+ @Override
+ public ThrowingConsumer<String, Exception> declareProcess2(
+ DeclarationContext context,
+ KeyedCoProcessFunction<String, Integer, String,
String>.Context ctx,
+ Collector<String> out)
+ throws DeclarationException {
+ return context.<String>declareChain()
+ .thenAccept(
+ v -> {
+ out.collect(v);
+ value.addAndGet(Integer.valueOf(v));
+ })
+ .withName("pass")
+ .finish();
+ }
+ }
+
+ private static class WatermarkQueryingProcessFunction
+ extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx,
Collector<String> out)
+ throws Exception {
+ out.collect(
+ value
+ + "WM:"
+ + ctx.timerService().currentWatermark()
+ + " TS:"
+ + ctx.timestamp());
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx,
Collector<String> out)
+ throws Exception {
+ out.collect(
+ value
+ + "WM:"
+ + ctx.timerService().currentWatermark()
+ + " TS:"
+ + ctx.timestamp());
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx,
Collector<String> out)
+ throws Exception {}
+ }
+
+ private static class EventTimeTriggeringProcessFunction
+ extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx,
Collector<String> out)
+ throws Exception {
+ out.collect("INPUT1:" + value);
+ ctx.timerService().registerEventTimeTimer(5);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx,
Collector<String> out)
+ throws Exception {
+ out.collect("INPUT2:" + value);
+ ctx.timerService().registerEventTimeTimer(6);
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx,
Collector<String> out)
+ throws Exception {
+ assertThat(ctx.timeDomain()).isEqualTo(TimeDomain.EVENT_TIME);
+ out.collect(ctx.getCurrentKey() + ":" + 1777);
+ }
+ }
+
+ private static class EventTimeTriggeringStatefulProcessFunction
+ extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<String> state =
+ new ValueStateDescriptor<>("seen-element",
StringSerializer.INSTANCE);
+
+ @Override
+ public void processElement1(Integer value, Context ctx,
Collector<String> out)
+ throws Exception {
+ handleValue(value, out, ctx.timerService(), 1);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx,
Collector<String> out)
+ throws Exception {
+ handleValue(value, out, ctx.timerService(), 2);
+ }
+
+ private void handleValue(
+ Object value, Collector<String> out, TimerService
timerService, int channel)
+ throws IOException {
+ final ValueState<String> state =
getRuntimeContext().getState(this.state);
+ state.asyncValue()
+ .thenAccept(
+ v -> {
+ if (v == null) {
+ state.asyncUpdate(String.valueOf(value))
+ .thenAccept(
+ VO ->
+ out.collect(
+ "INPUT" +
channel + ":"
+ +
value));
+ timerService.registerEventTimeTimer(
+ timerService.currentWatermark() +
5);
+ } else {
+ state.asyncClear();
+ timerService.deleteEventTimeTimer(
+ timerService.currentWatermark() +
4);
+ }
+ });
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx,
Collector<String> out)
+ throws Exception {
+ assertThat(ctx.timeDomain()).isEqualTo(TimeDomain.EVENT_TIME);
+ getRuntimeContext()
+ .getState(state)
+ .asyncValue()
+ .thenAccept(
+ v -> {
+ out.collect("STATE:" + v);
+ });
+ }
+ }
+
+ private static class ProcessingTimeTriggeringProcessFunction
+ extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx,
Collector<String> out)
+ throws Exception {
+ out.collect("INPUT1:" + value);
+ ctx.timerService().registerProcessingTimeTimer(5);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx,
Collector<String> out)
+ throws Exception {
+ out.collect("INPUT2:" + value);
+ ctx.timerService().registerProcessingTimeTimer(6);
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx,
Collector<String> out)
+ throws Exception {
+ assertThat(ctx.timeDomain()).isEqualTo(TimeDomain.PROCESSING_TIME);
+ out.collect("" + 1777);
+ }
+ }
+
+ private static class ProcessingTimeQueryingProcessFunction
+ extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx,
Collector<String> out)
+ throws Exception {
+ out.collect(
+ value
+ + "PT:"
+ + ctx.timerService().currentProcessingTime()
+ + " TS:"
+ + ctx.timestamp());
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx,
Collector<String> out)
+ throws Exception {
+ out.collect(
+ value
+ + "PT:"
+ + ctx.timerService().currentProcessingTime()
+ + " TS:"
+ + ctx.timestamp());
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx,
Collector<String> out)
+ throws Exception {}
+ }
+
+ private static class ProcessingTimeTriggeringStatefulProcessFunction
+ extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<String> state =
+ new ValueStateDescriptor<>("seen-element",
StringSerializer.INSTANCE);
+
+ @Override
+ public void processElement1(Integer value, Context ctx,
Collector<String> out)
+ throws Exception {
+ handleValue(value, out, ctx.timerService(), 1);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx,
Collector<String> out)
+ throws Exception {
+ handleValue(value, out, ctx.timerService(), 2);
+ }
+
+ private void handleValue(
+ Object value, Collector<String> out, TimerService
timerService, int channel)
+ throws IOException {
+ final ValueState<String> state =
getRuntimeContext().getState(this.state);
+ state.asyncValue()
+ .thenAccept(
+ v -> {
+ if (v == null) {
+ state.asyncUpdate(String.valueOf(value))
+ .thenAccept(
+ VO ->
+ out.collect(
+ "INPUT" +
channel + ":"
+ +
value));
+ timerService.registerProcessingTimeTimer(
+
timerService.currentProcessingTime() + 5);
+ } else {
+ state.asyncClear();
+ timerService.deleteProcessingTimeTimer(
+
timerService.currentProcessingTime() + 4);
+ }
+ });
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx,
Collector<String> out)
+ throws Exception {
+ assertThat(ctx.timeDomain()).isEqualTo(TimeDomain.PROCESSING_TIME);
+ out.collect("STATE:" +
getRuntimeContext().getState(state).value());
+ }
+ }
+
+ private static class BothTriggeringProcessFunction
+ extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx,
Collector<String> out)
+ throws Exception {
+ ctx.timerService().registerProcessingTimeTimer(3);
+ ctx.timerService().registerEventTimeTimer(6);
+ ctx.timerService().deleteProcessingTimeTimer(3);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx,
Collector<String> out)
+ throws Exception {
+ ctx.timerService().registerEventTimeTimer(4);
+ ctx.timerService().registerProcessingTimeTimer(5);
+ ctx.timerService().deleteEventTimeTimer(4);
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx,
Collector<String> out)
+ throws Exception {
+ if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
+ out.collect("EVENT:1777");
+ } else {
+ out.collect("PROC:1777");
+ }
+ }
+ }
+
+ private static class AppendCurrentKeyProcessFunction
+ extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+ @Override
+ public void processElement1(Integer value, Context ctx,
Collector<String> out)
+ throws Exception {
+ out.collect(value + "," + ctx.getCurrentKey());
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx,
Collector<String> out)
+ throws Exception {
+ out.collect(value + "," + ctx.getCurrentKey());
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
index 3d89e414ae7..2ac0af781e0 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
@@ -367,7 +367,7 @@ public class StreamExecIntervalJoin extends
ExecNodeBase<RowData>
leftTypeInfo,
rightTypeInfo,
joinFunction);
-
+ // TODO: add async version procJoinFunc to use
AsyncKeyedCoProcessOperator
return ExecNodeUtil.createTwoInputTransformation(
leftInputTransform,
rightInputTransform,
@@ -404,7 +404,7 @@ public class StreamExecIntervalJoin extends
ExecNodeBase<RowData>
joinFunction,
windowBounds.getLeftTimeIdx(),
windowBounds.getRightTimeIdx());
-
+ // TODO: add async version rowJoinFunc to use
AsyncKeyedCoProcessOperator
return ExecNodeUtil.createTwoInputTransformation(
leftInputTransform,
rightInputTransform,