pnowojski commented on code in PR #26122:
URL: https://github.com/apache/flink/pull/26122#discussion_r1984611394
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java:
##########
@@ -1428,4 +1551,36 @@ public void timeout(Integer input, ResultFuture<Integer>
resultFuture) {
resultFuture.complete(Collections.singletonList(-1));
}
}
+
+ private static class CallThreadAsyncFunction extends
MyAbstractAsyncFunction<Integer> {
+ private static final long serialVersionUID = -1504699677704123889L;
+
+ @Override
+ public void asyncInvoke(final Integer input, final
ResultFuture<Integer> resultFuture)
+ throws Exception {
+ final Thread callThread = Thread.currentThread();
+ executorService.submit(
+ () ->
+ resultFuture.complete(
+ () -> {
+ assertEquals(callThread,
Thread.currentThread());
+ return Collections.singletonList(input
* 2);
+ }));
+ }
+ }
+
+ private static class CallThreadAsyncFunctionError extends
MyAbstractAsyncFunction<Integer> {
+ private static final long serialVersionUID = -1504699677704123889L;
+
+ @Override
+ public void asyncInvoke(final Integer input, final
ResultFuture<Integer> resultFuture)
+ throws Exception {
+ executorService.submit(
+ () ->
+ resultFuture.complete(
+ () -> {
Review Comment:
nit: `assertEquals(callThread, Thread.currentThread());`? But feel free to
completely ignore this.
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java:
##########
@@ -1321,6 +1327,123 @@ private void
testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.Ou
}
}
+ @Test
+ public void testProcessingTimeWithMailboxThreadOrdered() throws Exception {
+ testProcessingTimeWithCallThread(AsyncDataStream.OutputMode.ORDERED,
NO_RETRY_STRATEGY);
+ }
+
+ @Test
+ public void testProcessingTimeWithMailboxThreadUnordered() throws
Exception {
+ testProcessingTimeWithCallThread(AsyncDataStream.OutputMode.UNORDERED,
NO_RETRY_STRATEGY);
+ }
+
+ @Test
+ public void testProcessingTimeWithMailboxThreadOrderedWithRetry() throws
Exception {
+ testProcessingTimeWithCallThread(
+ AsyncDataStream.OutputMode.ORDERED, exceptionRetryStrategy);
+ }
+
+ @Test
+ public void testProcessingTimeWithMailboxThreadUnorderedWithRetry() throws
Exception {
+ testProcessingTimeWithCallThread(
+ AsyncDataStream.OutputMode.UNORDERED, exceptionRetryStrategy);
+ }
+
+ @Test
+ public void testProcessingTimeWithMailboxThreadError() throws Exception {
+ testProcessingTimeWithMailboxThreadError(NO_RETRY_STRATEGY);
+ }
+
+ @Test
+ public void testProcessingTimeWithMailboxThreadErrorWithRetry() throws
Exception {
+ testProcessingTimeWithMailboxThreadError(exceptionRetryStrategy);
+ }
+
+ public void testProcessingTimeWithMailboxThreadError(
+ @Nullable AsyncRetryStrategy<Integer> asyncRetryStrategy) throws
Exception {
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(
+ OneInputStreamTask::new,
BasicTypeInfo.INT_TYPE_INFO)
+ .addInput(BasicTypeInfo.INT_TYPE_INFO);
+ try (StreamTaskMailboxTestHarness<Integer> testHarness =
+ builder.setupOutputForSingletonOperatorChain(
+ new AsyncWaitOperatorFactory<>(
+ new CallThreadAsyncFunctionError(),
+ TIMEOUT,
+ 1,
+ AsyncDataStream.OutputMode.UNORDERED,
+ asyncRetryStrategy))
+ .build()) {
+ final long initialTime = 0L;
+ AtomicReference<Throwable> error = new AtomicReference<>();
+
testHarness.getStreamMockEnvironment().setExternalExceptionHandler(error::set);
+
+ // Sometimes, processElement invoke the async function
immediately, so we should catch
+ // any exception.
+ try {
+ testHarness.processElement(new StreamRecord<>(1, initialTime +
1));
+ while (error.get() == null) {
+ testHarness.processAll();
+ }
+ } catch (Exception e) {
+ // This simulates a mailbox failure failing the job
+ error.set(e);
+ }
+
+ ExceptionUtils.assertThrowable(error.get(),
ExpectedTestException.class);
+
+ testHarness.endInput();
+ }
+ }
+
+ private void testProcessingTimeWithCallThread(
Review Comment:
`WithCallThread` name seems incorrect/missleading? 🤔
`testProcessingTimeWithMailboxThreadError` vs
`testProcessingTimeWithCallThread`? Especially that the callers of
`testProcessingTimeWithCallThread` are also actually named differently, like
`testProcessingTimeWithMailboxThreadUnorderedWithRetry`.
The only difference between `testProcessingTimeWithMailboxThreadError` vs
`testProcessingTimeWithCallThread` is that the former one throws an error, the
latter process the records? Maybe rename actually both:
`testProcessingTimeWithCallThread` ->
`testProcessingTimeWithCollectFromMailboxThread`
`testProcessingTimeWithMailboxThreadError` ->
`testProcessingTimeWithErrorFromMailboxThread`
?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/ResultFuture.java:
##########
@@ -47,4 +48,13 @@ public interface ResultFuture<OUT> {
* @param error A Throwable object.
*/
void completeExceptionally(Throwable error);
+
+ /**
+ * The same as complete, but will execute the supplier on the Mailbox
thread which initiated the
+ * asynchronous process.
+ *
+ * <p>Note that if an exception is thrown while executing the supplier,
the result should be the
+ * same as calling {@link ResultFuture#completeExceptionally(Throwable)}.
+ */
+ void complete(SupplierWithException<Collection<OUT>, Exception> supplier);
Review Comment:
I would define
```
public interface CollectionSupplier<OUT> {
Collection<OUT> get() throws Exception;
}
```
and use that here. This would make navigating the code using IDE much easier
(like finding implementations wouldn't return tons of false results).
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/collector/TableFunctionResultFuture.java:
##########
@@ -60,4 +63,9 @@ public ResultFuture<?> getResultFuture() {
public void completeExceptionally(Throwable error) {
this.resultFuture.completeExceptionally(error);
}
+
+ @Override
+ public void complete(SupplierWithException<Collection<T>, Exception>
supplier) {
+ throw new UnsupportedOperationException();
Review Comment:
Why is this unsupported?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/AsyncLookupJoinRunner.java:
##########
@@ -274,6 +275,11 @@ public void completeExceptionally(Throwable error) {
realOutput.completeExceptionally(error);
}
+ @Override
+ public void complete(SupplierWithException<Collection<Object>,
Exception> supplier) {
+ throw new UnsupportedOperationException();
Review Comment:
same here and below? Maybe add a comment explaining why is it so?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]