[ 
https://issues.apache.org/jira/browse/FLINK-33251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849961#comment-17849961
 ] 

Keith Lee edited comment on FLINK-33251 at 5/28/24 10:21 AM:
-------------------------------------------------------------

The timeout could come from any of:

1. SQL Gateway not responding/in time to connection

2. Bug in ExecutorImpl

3. Bug in Flink's RestClient

4. Bug in Netty's Bootstrap (unlikely as it is).

I've ran the following looped cURL commands to rule out SQL Gateway and can 
confirm that connection timeouts are NOT seen.
{quote}SESSION_HANDLE=$(curl --request POST [http://localhost:8083/v1/sessions] 
| jq -r .sessionHandle)
OPERATION_HANDLE=$(curl --request POST 
[http://localhost:8083/v1/sessions/$]{SESSION_HANDLE}/statements/ --data 
'\{"statement": "CREATE TABLE Orders (order_number BIGINT, order_time 
TIMESTAMP(3)) WITH ('"'connector'"' = '"'datagen'"');"}' | jq -r 
.operationHandle)
curl --request GET 
[http://localhost:8083/v1/sessions/$]{SESSION_HANDLE}/operations/${OPERATION_HANDLE}/result/0
 | jq .

OPERATION_HANDLE=$(curl --request POST 
[http://localhost:8083/v1/sessions/$]{SESSION_HANDLE}/statements/ --data 
'\{"statement": "SELECT * FROM Orders;"}' | jq -r .operationHandle)
NEXT_RESULT_URI=$(curl --request GET 
[http://localhost:8083/v1/sessions/$]{SESSION_HANDLE}/operations/${OPERATION_HANDLE}/result/0
 | jq -r .nextResultUri)

while true; do echo $NEXT_RESULT_URI; NEXT_RESULT_URI_TEMP=$(curl --request GET 
[http://localhost:8083/$]{NEXT_RESULT_URI} | jq -r .nextResultUri); 
NEXT_RESULT_URI=$NEXT_RESULT_URI_TEMP; done
 
{quote}
{{So I think the bug is in either ExecutorImpl or RestClient or Netty's 
bootstrap class.}}

{{{}One interesting observation I have is the fetchResultWithInterval flag 
which controls if a 100ms sleep is added{}}}{{{}: 
[https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java#L469-L471]{}}}

 
{quote}{{    private FetchResultsResponseBody getFetchResultResponse(}}
{{            OperationHandle operationHandle,}}
{{            long token,}}
{{            boolean fetchResultWithInterval,}}
{{            Function<InterruptedException, SqlExecutionException> 
interruptedExceptionHandler) {}}
{{        try {}}
{{            if (fetchResultWithInterval) {}}
{{                Thread.sleep(100);}}

}
{quote}
 

{{[~fsk119] can you elaborate on why the interval is necessary? As the other 
observation that I have is that the ConnectTimeoutException seems to correlate 
with how quickly we call getFetchResultResponse.}}


was (Author: JIRAUSER305392):
The timeout could come from any of:

1. SQL Gateway not responding/in time to connection

2. Bug in ExecutorImpl

3. Bug in Flink's RestClient

4. Bug in Netty's Bootstrap (unlikely as it is).

I've ran the following looped cURL commands to rule out SQL Gateway and can 
confirm that connection timeouts are NOT seen.
{quote}SESSION_HANDLE=$(curl --request POST http://localhost:8083/v1/sessions | 
jq -r .sessionHandle)
OPERATION_HANDLE=$(curl --request POST 
http://localhost:8083/v1/sessions/${SESSION_HANDLE}/statements/ --data 
'\{"statement": "CREATE TABLE Orders (order_number BIGINT, order_time 
TIMESTAMP(3)) WITH ('"'connector'"' = '"'datagen'"');"}' | jq -r 
.operationHandle)
curl --request GET 
http://localhost:8083/v1/sessions/${SESSION_HANDLE}/operations/${OPERATION_HANDLE}/result/0
 | jq .

OPERATION_HANDLE=$(curl --request POST 
http://localhost:8083/v1/sessions/${SESSION_HANDLE}/statements/ --data 
'\{"statement": "SELECT * FROM Orders;"}' | jq -r .operationHandle)
NEXT_RESULT_URI=$(curl --request GET 
http://localhost:8083/v1/sessions/${SESSION_HANDLE}/operations/${OPERATION_HANDLE}/result/0
 | jq -r .nextResultUri)

while true; do echo $NEXT_RESULT_URI; NEXT_RESULT_URI_TEMP=$(curl --request GET 
http://localhost:8083/${NEXT_RESULT_URI} | jq -r .nextResultUri); 
NEXT_RESULT_URI=$NEXT_RESULT_URI_TEMP; done
 {quote}
{{So I think the bug is in either ExecutorImpl or RestClient or Netty's 
bootstrap class.}}

{{{}One interesting observation I have is the 
}}\{{{}fetchResultWithInterval{}}}{{{}: 
[https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java#L469-L471]{}}}

 
{quote}{{    private FetchResultsResponseBody getFetchResultResponse(}}
{{            OperationHandle operationHandle,}}
{{            long token,}}
{{            boolean fetchResultWithInterval,}}
{{            Function<InterruptedException, SqlExecutionException> 
interruptedExceptionHandler) {}}
{{        try {}}
{{            if (fetchResultWithInterval) {}}
{{                Thread.sleep(100);}}
Unknown macro: {
Unknown macro: \{            }
}}
{quote}
 

{{[~fsk119] can you elaborate on why the interval is necessary? As the other 
observation that I have is that the ConnectTimeoutException seems to correlate 
with how quickly we call getFetchResultResponse.}}

> SQL Client query execution aborts after a few seconds: ConnectTimeoutException
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-33251
>                 URL: https://issues.apache.org/jira/browse/FLINK-33251
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Client
>    Affects Versions: 1.18.0, 1.17.1
>         Environment: Macbook Pro 
> Apple M1 Max
>  
> {code:java}
> $ uname -a
> Darwin asgard08 23.0.0 Darwin Kernel Version 23.0.0: Fri Sep 15 14:41:43 PDT 
> 2023; root:xnu-10002.1.13~1/RELEASE_ARM64_T6000 arm64
> {code}
> {code:bash}
> $ java --version
> openjdk 11.0.20.1 2023-08-24
> OpenJDK Runtime Environment Homebrew (build 11.0.20.1+0)
> OpenJDK 64-Bit Server VM Homebrew (build 11.0.20.1+0, mixed mode)
> $ mvn --version
> Apache Maven 3.9.5 (57804ffe001d7215b5e7bcb531cf83df38f93546)
> Maven home: /opt/homebrew/Cellar/maven/3.9.5/libexec
> Java version: 11.0.20.1, vendor: Homebrew, runtime: 
> /opt/homebrew/Cellar/openjdk@11/11.0.20.1/libexec/openjdk.jdk/Contents/Home
> Default locale: en_GB, platform encoding: UTF-8
> OS name: "mac os x", version: "14.0", arch: "aarch64", family: "mac"
> {code}
>            Reporter: Robin Moffatt
>            Priority: Major
>         Attachments: log.zip, reproduce_FLINK-33251.java
>
>
> If I run a streaming query from an unbounded connector from the SQL Client, 
> it bombs out after ~15 seconds.
> {code:java}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: 
> connection timed out: localhost/127.0.0.1:52596
> {code}
> This *doesn't* happen on 1.16.2. It *does* happen on *1.17.1* and *1.18* that 
> I have just built locally (git repo hash `9b837727b6d`). 
> The corresponding task's status in the Web UI shows as `CANCELED`. 
> ---
> h2. To reproduce
> Launch local cluster and SQL client
> {code}
> ➜  flink-1.18-SNAPSHOT ./bin/start-cluster.sh 
> Starting cluster.
> Starting standalonesession daemon on host asgard08.
> Starting taskexecutor daemon on host asgard08.
> ➜  flink-1.18-SNAPSHOT ./bin/sql-client.sh
> […]
> Flink SQL>
> {code}
> Set streaming mode and result mode
> {code:sql}
> Flink SQL> SET 'execution.runtime-mode' = 'STREAMING';
> [INFO] Execute statement succeed.
> Flink SQL> SET 'sql-client.execution.result-mode' = 'changelog';
> [INFO] Execute statement succeed.
> {code}
> Define a table to read data from CSV files in a folder
> {code:sql}
> CREATE TABLE firewall (
>   event_time STRING,
>   source_ip  STRING,
>   dest_ip    STRING,
>   source_prt INT,
>   dest_prt   INT
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = 'file:///tmp/firewall/',
>   'format' = 'csv',
>   'source.monitor-interval' = '1' -- unclear from the docs what the unit is 
> here
> );
> {code}
> Create a CSV file to read in
> {code:bash}
> $ mkdir /tmp/firewall
> $ cat > /tmp/firewall/data.csv <<EOF
> 2018-05-11 00:19:34,151.35.34.162,125.26.20.222,2014,68
> 2018-05-11 22:20:43,114.24.126.190,21.68.21.69,379,1619
> EOF
> {code}
> Run a streaming query 
> {code}
> SELECT * FROM firewall;
> {code}
> You will get results showing (and if you add another data file it will show 
> up) - but after ~30 seconds the query aborts and throws an error back to the 
> user at the SQL Client prompt
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: 
> connection timed out: localhost/127.0.0.1:58470
> Flink SQL>
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to