[ 
https://issues.apache.org/jira/browse/FLINK-33933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801073#comment-17801073
 ] 

KarlManong edited comment on FLINK-33933 at 12/29/23 1:06 AM:
--------------------------------------------------------------

[~martijnvisser] yes, when exception occurs, We'll got  a 
completedExceptionally future,
{code:java}
@Override
public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
    try {
        // Perform the lookup and create a collection of RowData objects
        Collection<RowData> result = new ArrayList<>();
        // TODO: Add your lookup logic here to populate the result collection

        // Return the result as a completed future
        return CompletableFuture.completedFuture(result);
    } catch (Exception e) {
        // If an exception occurs during the lookup, complete the future 
exceptionally with the exception

       /*
       * code run to here
       */

        CompletableFuture<Collection<RowData>> future = new 
CompletableFuture<>();
        future.completeExceptionally(e);
        return future;
    }
} {code}
 

 

 

then Flink fatal error with a StackOverflowError 


was (Author: karlmanong):
[~martijnvisser] yes, when exception occurs, We'll got  a 
completedExceptionally future,
{code:java}
@Override
public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
    try {
        // Perform the lookup and create a collection of RowData objects
        Collection<RowData> result = new ArrayList<>();
        // TODO: Add your lookup logic here to populate the result collection

        // Return the result as a completed future
        return CompletableFuture.completedFuture(result);
    } catch (Exception e) {
        // If an exception occurs during the lookup, complete the future 
exceptionally with the exception
       // code run to here

        CompletableFuture<Collection<RowData>> future = new 
CompletableFuture<>();
        future.completeExceptionally(e);
        return future;
    }
} {code}
 

 

 

then Flink fatal error with a StackOverflowError 

> SerializedThrowable will be java.lang.StackOverflowError when 
> AsyncLookupFunction throw an exception
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33933
>                 URL: https://issues.apache.org/jira/browse/FLINK-33933
>             Project: Flink
>          Issue Type: Bug
>         Environment: tested from 1.16 to 1.18 , the same behavior 
>            Reporter: KarlManong
>            Priority: Minor
>
> Here is a simple example
> {code:java}
> // example
> public class TableA implements LookupTableSource {
> @Nullable
> private final LookupCache cache;
> public TableA(@Nullable LookupCache cache) {
> this.cache = cache;
> }
> @Override
> public LookupRuntimeProvider
> getLookupRuntimeProvider(LookupContext context) {
> FunctionA lookupFunction = new FunctionA();
> if (cache != null) {
> return PartialCachingAsyncLookupProvider.of(lookupFunction, cache);
> } else {
> return AsyncLookupFunctionProvider.of(lookupFunction);
> }
> }
> @Override
> public DynamicTableSource copy() {
> return new TableA(cache);
> }
> @Override
> public String asSummaryString() {
> return "Async Table";
> }
> }
> public class LookupFunctionA extends AsyncLookupFunction {
> @Override
> public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
> CompletableFuture<Collection<RowData>> future = new
> CompletableFuture<>();
> future.completeExceptionally(new IOException("request failed"));
> return future;
> }
> }
> {code}
> When using TableA, StackOverflowError occurs
>  
> {code:java}
> // code placeholder
> java.lang.StackOverflowError
>     at java.base/java.lang.Exception.<init>(Exception.java:66)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:66)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
>     at 
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
>     at 
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to