[jira] [Commented] (FLINK-33188) PyFlink MapState with Types.ROW() throws exception
[ https://issues.apache.org/jira/browse/FLINK-33188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772336#comment-17772336 ] Elkhan Dadashov commented on FLINK-33188: - After digging into the flink-python code, It seems if `PYFLINK_GATEWAY_DISABLED` is set to false in an environment variable, then using Types.LIST(Types.ROW([...])) does not have any issue, once Java Gateway is launched. It was unexpected for Flink local run to set this flag to false explicitly. This is a workaround for this issue: def open(self, runtime_context: RuntimeContext): state_ttl_config = ( StateTtlConfig.new_builder(Time.seconds(1)) .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) .disable_cleanup_in_background() .build() ) import os os.environ["PYFLINK_GATEWAY_DISABLED"] = "0" > PyFlink MapState with Types.ROW() throws exception > -- > > Key: FLINK-33188 > URL: https://issues.apache.org/jira/browse/FLINK-33188 > Project: Flink > Issue Type: Bug > Components: API / Python, API / Type Serialization System >Affects Versions: 1.17.1 >Reporter: Elkhan Dadashov >Priority: Major > > I'm trying to use MapState, where the value will be a list of 'pyflink.common.types.Row'> type elements. > > Wanted to check if anyone else faced the same issue while trying to use > MapState in PyFlink with complex types. > > Here is the code: > > from pyflink.common import Time > from pyflink.common.typeinfo import Types > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.datastream.functions import ( > KeyedCoProcessFunction, > KeySelector, > RuntimeContext, > ) > from pyflink.datastream.state import ( > MapStateDescriptor, > StateTtlConfig, > ValueStateDescriptor, > ListStateDescriptor > ) > from pyflink.table import DataTypes, StreamTableEnvironment > class MyKeyedCoProcessFunction(KeyedCoProcessFunction): > def __init__(self): > self.my_map_state = None > def open(self, runtime_context: RuntimeContext): > state_ttl_config = ( > StateTtlConfig.new_builder(Time.seconds(1)) > .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) > .disable_cleanup_in_background() > .build() > ) > my_map_state_descriptor = MapStateDescriptor( > "my_map_state", > Types.SQL_TIMESTAMP(), > Types.LIST(Types.ROW([ > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.SQL_TIMESTAMP(), > Types.SQL_TIMESTAMP(), > Types.SQL_TIMESTAMP(), > Types.BIG_INT() > ])) > ) > my_map_state_descriptor.enable_time_to_live(state_ttl_config) > self.my_map_state = > runtime_context.get_map_state(my_map_state_descriptor) > > But while running this code, it fails with this exception at job startup (at > runtime_context.get_map_state(my_map_state_descriptor)), even without trying > to add anything to the state. > > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 249, in > pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation.__init__ > File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 132, in > pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ > File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py", > line 127, in open > self.open_func() > File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py", > line 296, in open_func > process_function.open(runtime_context) > File"/tmp/ipykernel_83481/1603226134.py", line 57, in open > File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/runtime_context.py", > line 125, in get_map_state > map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder > File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", > line 812, in from_type_info > from_type_info(type_info._key_type_info), > from_type_info(type_info._value_type_info)) > File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", > line 809, in from_type_info > returnGenericArrayCoder(from_type_info(type_info.elem_type)) > File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", > line 819, in from_type_info > [f for f in type_info.get_field_names()]) >
[jira] [Created] (FLINK-33188) PyFlink MapState with Types.ROW() throws exception
Elkhan Dadashov created FLINK-33188: --- Summary: PyFlink MapState with Types.ROW() throws exception Key: FLINK-33188 URL: https://issues.apache.org/jira/browse/FLINK-33188 Project: Flink Issue Type: Bug Components: API / Python, API / Type Serialization System Affects Versions: 1.17.1 Reporter: Elkhan Dadashov I'm trying to use MapState, where the value will be a list of type elements. Wanted to check if anyone else faced the same issue while trying to use MapState in PyFlink with complex types. Here is the code: from pyflink.common import Time from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import ( KeyedCoProcessFunction, KeySelector, RuntimeContext, ) from pyflink.datastream.state import ( MapStateDescriptor, StateTtlConfig, ValueStateDescriptor, ListStateDescriptor ) from pyflink.table import DataTypes, StreamTableEnvironment class MyKeyedCoProcessFunction(KeyedCoProcessFunction): def __init__(self): self.my_map_state = None def open(self, runtime_context: RuntimeContext): state_ttl_config = ( StateTtlConfig.new_builder(Time.seconds(1)) .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) .disable_cleanup_in_background() .build() ) my_map_state_descriptor = MapStateDescriptor( "my_map_state", Types.SQL_TIMESTAMP(), Types.LIST(Types.ROW([ Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), Types.BIG_INT() ])) ) my_map_state_descriptor.enable_time_to_live(state_ttl_config) self.my_map_state = runtime_context.get_map_state(my_map_state_descriptor) But while running this code, it fails with this exception at job startup (at runtime_context.get_map_state(my_map_state_descriptor)), even without trying to add anything to the state. File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 249, in pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation.__init__ File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 132, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 127, in open self.open_func() File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 296, in open_func process_function.open(runtime_context) File"/tmp/ipykernel_83481/1603226134.py", line 57, in open File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/runtime_context.py", line 125, in get_map_state map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", line 812, in from_type_info from_type_info(type_info._key_type_info), from_type_info(type_info._value_type_info)) File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", line 809, in from_type_info returnGenericArrayCoder(from_type_info(type_info.elem_type)) File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", line 819, in from_type_info [f for f in type_info.get_field_names()]) File"/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", line 377, in get_field_names j_field_names = self.get_java_type_info().getFieldNames() File"/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", line 391, in get_java_type_info j_types_array = get_gateway()\ File"/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", line 62, in get_gateway _gateway = launch_gateway() File"/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", line 86, in launch_gateway raise Exception("It's launching the PythonGatewayServer during Python UDF execution " Exception: It's launching the PythonGatewayServer during Python UDF execution which is unexpected. It usually happens when the job codes are in the top level of the Python script file and are not enclosed in a `if name == 'main'` statement.If I switch from Tupes.ROW to Types.TUPLE() it works without any exception. This works: my_map_state_descriptor = MapStateDescriptor( "my_map_state", Types.SQL_TIMESTAMP(), Types.LIST(Types.TUPLE([ Types.STRING(), Types.STRING(),
[jira] [Reopened] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme
[ https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elkhan Dadashov reopened FLINK-32884: - > PyFlink remote execution should support URLs with paths and https scheme > > > Key: FLINK-32884 > URL: https://issues.apache.org/jira/browse/FLINK-32884 > Project: Flink > Issue Type: New Feature > Components: Client / Job Submission, Runtime / REST >Affects Versions: 1.17.1 >Reporter: Elkhan Dadashov >Assignee: Elkhan Dadashov >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently, the `SUBMIT_ARGS=remote -m http://:` format. For > local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081/`, > but it does not support the placement of the JobManager behind a proxy or > using an Ingress for routing to a specific Flink cluster based on the URL > path. In the current scenario, it expects JobManager to access PyFlink jobs > at `http://:/v1/jobs` endpoint. Mapping to a non-root > location, > `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` > is not supported. > This will use changes from > [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885)(https://issues.apache.org/jira/browse/FLINK-32885) > Since RestClusterClient talks to the JobManager via its REST endpoint, the > right format for `SUBMIT_ARGS` is a URL with a path (also support for https > scheme). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme
[ https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elkhan Dadashov updated FLINK-32884: Description: Currently, the `SUBMIT_ARGS=remote -m http://:` format. For local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081/`, but it does not support the placement of the JobManager behind a proxy or using an Ingress for routing to a specific Flink cluster based on the URL path. In the current scenario, it expects JobManager to access PyFlink jobs at `http://:/v1/jobs` endpoint. Mapping to a non-root location, `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` is not supported. This will use changes from [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885)(https://issues.apache.org/jira/browse/FLINK-32885) Since RestClusterClient talks to the JobManager via its REST endpoint, the right format for `SUBMIT_ARGS` is a URL with a path (also support for https scheme). was: Currently, the `SUBMIT_ARGS=remote -m http://:` format. For local execution it works fine `SUBMIT_ARGS=remote -m [http://localhost:8081|http://localhost:8081/]`, but it does not support the placement of the JobManager befind a proxy or using an Ingress for routing to a specific Flink cluster based on the URL path. In current scenario, it expects JobManager access PyFlink jobs at `http://:/v1/jobs` endpoint. Mapping to a non-root location, `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` is not supported. This will use changes from FLINK-32885(https://issues.apache.org/jira/browse/FLINK-32885) Since RestClusterClient talks to the JobManager via its REST endpoint, the right format for `SUBMIT_ARGS` is URL with path (also support for https scheme). > PyFlink remote execution should support URLs with paths and https scheme > > > Key: FLINK-32884 > URL: https://issues.apache.org/jira/browse/FLINK-32884 > Project: Flink > Issue Type: New Feature > Components: Client / Job Submission, Runtime / REST >Affects Versions: 1.17.1 >Reporter: Elkhan Dadashov >Assignee: Elkhan Dadashov >Priority: Major > Fix For: 1.19.0 > > > Currently, the `SUBMIT_ARGS=remote -m http://:` format. For > local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081/`, > but it does not support the placement of the JobManager behind a proxy or > using an Ingress for routing to a specific Flink cluster based on the URL > path. In the current scenario, it expects JobManager to access PyFlink jobs > at `http://:/v1/jobs` endpoint. Mapping to a non-root > location, > `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` > is not supported. > This will use changes from > [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885)(https://issues.apache.org/jira/browse/FLINK-32885) > Since RestClusterClient talks to the JobManager via its REST endpoint, the > right format for `SUBMIT_ARGS` is a URL with a path (also support for https > scheme). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme
[ https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elkhan Dadashov updated FLINK-32884: Fix Version/s: 1.19.0 > PyFlink remote execution should support URLs with paths and https scheme > > > Key: FLINK-32884 > URL: https://issues.apache.org/jira/browse/FLINK-32884 > Project: Flink > Issue Type: New Feature > Components: Client / Job Submission, Runtime / REST >Affects Versions: 1.17.1 >Reporter: Elkhan Dadashov >Assignee: Elkhan Dadashov >Priority: Major > Fix For: 1.19.0 > > > Currently, the `SUBMIT_ARGS=remote -m http://:` format. For > local execution it works fine `SUBMIT_ARGS=remote -m > [http://localhost:8081|http://localhost:8081/]`, but it does not support the > placement of the JobManager befind a proxy or using an Ingress for routing to > a specific Flink cluster based on the URL path. In current scenario, it > expects JobManager access PyFlink jobs at `http://:/v1/jobs` > endpoint. Mapping to a non-root location, > `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` > is not supported. > This will use changes from > FLINK-32885(https://issues.apache.org/jira/browse/FLINK-32885) > Since RestClusterClient talks to the JobManager via its REST endpoint, the > right format for `SUBMIT_ARGS` is URL with path (also support for https > scheme). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme
[ https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elkhan Dadashov updated FLINK-32884: Description: Currently, the `SUBMIT_ARGS=remote -m http://:` format. For local execution it works fine `SUBMIT_ARGS=remote -m [http://localhost:8081|http://localhost:8081/]`, but it does not support the placement of the JobManager befind a proxy or using an Ingress for routing to a specific Flink cluster based on the URL path. In current scenario, it expects JobManager access PyFlink jobs at `http://:/v1/jobs` endpoint. Mapping to a non-root location, `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` is not supported. This will use changes from FLINK-32885(https://issues.apache.org/jira/browse/FLINK-32885) Since RestClusterClient talks to the JobManager via its REST endpoint, the right format for `SUBMIT_ARGS` is URL with path (also support for https scheme). was: Currently, the `SUBMIT_ARGS=remote -m http://:` format. For local execution it works fine `SUBMIT_ARGS=remote -m [http://localhost:8081|http://localhost:8081/]`, but it does not support the placement of the JobManager befind a proxy or using an Ingress for routing to a specific Flink cluster based on the URL path. In current scenario, it expects JobManager access PyFlink jobs at `http://:/v1/jobs` endpoint. Mapping to a non-root location, `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` is not supported. This will use changes from [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885) Since RestClusterClient talks to the JobManager via its REST endpoint, the right format for `SUBMIT_ARGS` is URL with path (also support for https scheme). I intend to change these classes in a backward compatible way flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java flink-core/src/main/java/org/apache/flink/util/NetUtils.java > PyFlink remote execution should support URLs with paths and https scheme > > > Key: FLINK-32884 > URL: https://issues.apache.org/jira/browse/FLINK-32884 > Project: Flink > Issue Type: New Feature > Components: Client / Job Submission, Runtime / REST >Affects Versions: 1.17.1 >Reporter: Elkhan Dadashov >Assignee: Elkhan Dadashov >Priority: Major > > Currently, the `SUBMIT_ARGS=remote -m http://:` format. For > local execution it works fine `SUBMIT_ARGS=remote -m > [http://localhost:8081|http://localhost:8081/]`, but it does not support the > placement of the JobManager befind a proxy or using an Ingress for routing to > a specific Flink cluster based on the URL path. In current scenario, it > expects JobManager access PyFlink jobs at `http://:/v1/jobs` > endpoint. Mapping to a non-root location, > `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` > is not supported. > This will use changes from > FLINK-32885(https://issues.apache.org/jira/browse/FLINK-32885) > Since RestClusterClient talks to the JobManager via its REST endpoint, the > right format for `SUBMIT_ARGS` is URL with path (also support for https > scheme). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32885) Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution
[ https://issues.apache.org/jira/browse/FLINK-32885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elkhan Dadashov closed FLINK-32885. --- > Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used > by RestClusterClient for PyFlink remote execution > - > > Key: FLINK-32885 > URL: https://issues.apache.org/jira/browse/FLINK-32885 > Project: Flink > Issue Type: New Feature > Components: Client / Job Submission, Table SQL / Gateway >Affects Versions: 1.17.1 >Reporter: Elkhan Dadashov >Assignee: Elkhan Dadashov >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has > dependency on `flink-clients` module. RestClusterClient will also need to use > UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor > related classes to achieve this. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32885) Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution
[ https://issues.apache.org/jira/browse/FLINK-32885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elkhan Dadashov resolved FLINK-32885. - Resolution: Fixed > Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used > by RestClusterClient for PyFlink remote execution > - > > Key: FLINK-32885 > URL: https://issues.apache.org/jira/browse/FLINK-32885 > Project: Flink > Issue Type: New Feature > Components: Client / Job Submission, Table SQL / Gateway >Affects Versions: 1.17.1 >Reporter: Elkhan Dadashov >Assignee: Elkhan Dadashov >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has > dependency on `flink-clients` module. RestClusterClient will also need to use > UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor > related classes to achieve this. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32885) Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution
[ https://issues.apache.org/jira/browse/FLINK-32885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elkhan Dadashov updated FLINK-32885: Fix Version/s: 1.19.0 Description: UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has dependency on `flink-clients` module. RestClusterClient will also need to use UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor related classes to achieve this. was: UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has dependency on `flink-clients` module. RestClusterClient will also need to use UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor related classes to achieve this. I intend to change these classes in a backward compatible way flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/SQLGatewayUrlPrefixDecorator.java flink-clients/src/main/java/org/apache/flink/client/program/rest/MonitoringAPIMessageHeaders.java flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java > Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used > by RestClusterClient for PyFlink remote execution > - > > Key: FLINK-32885 > URL: https://issues.apache.org/jira/browse/FLINK-32885 > Project: Flink > Issue Type: New Feature > Components: Client / Job Submission, Table SQL / Gateway >Affects Versions: 1.17.1 >Reporter: Elkhan Dadashov >Assignee: Elkhan Dadashov >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has > dependency on `flink-clients` module. RestClusterClient will also need to use > UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor > related classes to achieve this. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme
[ https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elkhan Dadashov updated FLINK-32884: Description: Currently, the `SUBMIT_ARGS=remote -m http://:` format. For local execution it works fine `SUBMIT_ARGS=remote -m [http://localhost:8081|http://localhost:8081/]`, but it does not support the placement of the JobManager befind a proxy or using an Ingress for routing to a specific Flink cluster based on the URL path. In current scenario, it expects JobManager access PyFlink jobs at `http://:/v1/jobs` endpoint. Mapping to a non-root location, `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` is not supported. This will use changes from [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885) Since RestClusterClient talks to the JobManager via its REST endpoint, the right format for `SUBMIT_ARGS` is URL with path (also support for https scheme). I intend to change these classes in a backward compatible way flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java flink-core/src/main/java/org/apache/flink/util/NetUtils.java was: Currently, the `SUBMIT_ARGS=remote -m http://:` format. For local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081`, but it does not support the placement of the JobManager befind a proxy or using an Ingress for routing to a specific Flink cluster based on the URL path. In current scenario, it expects JobManager access PyFlink jobs at `http://:/v1/jobs` endpoint. Mapping to a non-root location, `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` is not supported. Since RestClusterClient talks to the JobManager via its REST endpoint, the right format for `SUBMIT_ARGS` is URL with path (also support for https scheme). I intend to change these classes in a backward compatible way flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java flink-core/src/main/java/org/apache/flink/util/NetUtils.java > PyFlink remote execution should support URLs with paths and https scheme > > > Key: FLINK-32884 > URL: https://issues.apache.org/jira/browse/FLINK-32884 > Project: Flink > Issue Type: New Feature > Components: Client / Job Submission, Runtime / REST >Affects Versions: 1.17.1 >Reporter: Elkhan Dadashov >Priority: Major > > Currently, the `SUBMIT_ARGS=remote -m http://:` format. For > local execution it works fine `SUBMIT_ARGS=remote -m > [http://localhost:8081|http://localhost:8081/]`, but it does not support the > placement of the JobManager befind a proxy or using an Ingress for routing to > a specific Flink cluster based on the URL path. In current scenario, it > expects JobManager access PyFlink jobs at `http://:/v1/jobs` > endpoint. Mapping to a non-root location, > `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` > is not supported. > This will use changes from > [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885) > Since RestClusterClient talks to the JobManager via its REST endpoint, the > right format for `SUBMIT_ARGS` is URL with path (also support for https > scheme). > I intend to change these classes in a backward compatible way > flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java > flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java > flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java > flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java > flink-core/src/main/java/org/apache/flink/util/NetUtils.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32885) Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution
Elkhan Dadashov created FLINK-32885: --- Summary: Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution Key: FLINK-32885 URL: https://issues.apache.org/jira/browse/FLINK-32885 Project: Flink Issue Type: New Feature Components: Client / Job Submission, Table SQL / Gateway Affects Versions: 1.17.1 Reporter: Elkhan Dadashov UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has dependency on `flink-clients` module. RestClusterClient will also need to use UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor related classes to achieve this. I intend to change these classes in a backward compatible way flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/SQLGatewayUrlPrefixDecorator.java flink-clients/src/main/java/org/apache/flink/client/program/rest/MonitoringAPIMessageHeaders.java flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme
Elkhan Dadashov created FLINK-32884: --- Summary: PyFlink remote execution should support URLs with paths and https scheme Key: FLINK-32884 URL: https://issues.apache.org/jira/browse/FLINK-32884 Project: Flink Issue Type: New Feature Components: Client / Job Submission, Runtime / REST Affects Versions: 1.17.1 Reporter: Elkhan Dadashov Currently, the `SUBMIT_ARGS=remote -m http://:` format. For local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081`, but it does not support the placement of the JobManager befind a proxy or using an Ingress for routing to a specific Flink cluster based on the URL path. In current scenario, it expects JobManager access PyFlink jobs at `http://:/v1/jobs` endpoint. Mapping to a non-root location, `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` is not supported. Since RestClusterClient talks to the JobManager via its REST endpoint, the right format for `SUBMIT_ARGS` is URL with path (also support for https scheme). I intend to change these classes in a backward compatible way flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java flink-core/src/main/java/org/apache/flink/util/NetUtils.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32728) Metrics are not reported in Python UDF (used inside FlinkSQL) when exception is raised
[ https://issues.apache.org/jira/browse/FLINK-32728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elkhan Dadashov updated FLINK-32728: Issue Type: Bug (was: New Feature) > Metrics are not reported in Python UDF (used inside FlinkSQL) when exception > is raised > -- > > Key: FLINK-32728 > URL: https://issues.apache.org/jira/browse/FLINK-32728 > Project: Flink > Issue Type: Bug > Components: API / Python, Table SQL / Runtime >Affects Versions: 1.16.2, 1.17.1 > Environment: Flink 1.16.2 and Flink 1.17.1 (Python 3.9) >Reporter: Elkhan Dadashov >Priority: Major > > When Python UDF (which is used inside FlinkSQL) raises an exception, then > metrics get lost and not reported. Facing this issue both in Flink 1.16.2 > and Flink 1.17.1 (Python 3.9). > If an exception is not raised, then metrics show up. > > It is not mentioned on Flink documentation that UDFs should not throw an > exception. > === > FlinkSQL script content: > === > CREATE TABLE input_table ( > price DOUBLE > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second' = '1' > ); > CREATE TABLE output_table WITH ('connector' = 'print') > LIKE input_table (EXCLUDING ALL); > CREATE FUNCTION myDivide AS 'custom_udf.divide_udf' > LANGUAGE PYTHON; > -- Fail scenario: ZeroDivisionError: division by zero > INSERT into output_table (select myDivide(value, 0) from input_table); > === > Python UDF content: > === > from pyflink.table.udf import ScalarFunction, udf > from pyflink.table import DataTypes > import logging > class DivideUDF(ScalarFunction): > def __init__(self): > self.success_counter = None > self.fail_counter = None > def open(self, function_context): > self.success_counter = > function_context.get_metric_group().counter("flinksql_custom_udf_success_metric") > self.fail_counter = > function_context.get_metric_group().counter("flinksql_custom_udf_fail_metric") > def eval(self, x, y): > [logging.info|http://logging.info/]('executing custom udf with > logging and metric example...') > try: > result = x/y > self.success_counter.inc() > return result > except Exception as e: > self.fail_counter.inc() > raise e > divide_udf = udf(DivideUDF(), result_type=DataTypes.DOUBLE()) > > === > Exception stack trace: > === > > 2023-07-26 18:17:20 > org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception > while processing timer. > at > org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1575) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1550) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1704) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1693) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:838) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:787) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: TimerException\{java.lang.RuntimeException: Error while waiting > for BeamPythonFunctionRunner flush} > ... 15 more > Caused by: java.lang.RuntimeException: Error while waiting for > BeamPythonFunctionRunner flush > at > org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107) > at >
[jira] [Created] (FLINK-32728) Metrics are not reported in Python UDF (used inside FlinkSQL) when exception is raised
Elkhan Dadashov created FLINK-32728: --- Summary: Metrics are not reported in Python UDF (used inside FlinkSQL) when exception is raised Key: FLINK-32728 URL: https://issues.apache.org/jira/browse/FLINK-32728 Project: Flink Issue Type: New Feature Components: API / Python, Table SQL / Runtime Affects Versions: 1.17.1, 1.16.2 Environment: Flink 1.16.2 and Flink 1.17.1 (Python 3.9) Reporter: Elkhan Dadashov When Python UDF (which is used inside FlinkSQL) raises an exception, then metrics get lost and not reported. Facing this issue both in Flink 1.16.2 and Flink 1.17.1 (Python 3.9). If an exception is not raised, then metrics show up. It is not mentioned on Flink documentation that UDFs should not throw an exception. === FlinkSQL script content: === CREATE TABLE input_table ( price DOUBLE ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); CREATE TABLE output_table WITH ('connector' = 'print') LIKE input_table (EXCLUDING ALL); CREATE FUNCTION myDivide AS 'custom_udf.divide_udf' LANGUAGE PYTHON; -- Fail scenario: ZeroDivisionError: division by zero INSERT into output_table (select myDivide(value, 0) from input_table); === Python UDF content: === from pyflink.table.udf import ScalarFunction, udf from pyflink.table import DataTypes import logging class DivideUDF(ScalarFunction): def __init__(self): self.success_counter = None self.fail_counter = None def open(self, function_context): self.success_counter = function_context.get_metric_group().counter("flinksql_custom_udf_success_metric") self.fail_counter = function_context.get_metric_group().counter("flinksql_custom_udf_fail_metric") def eval(self, x, y): [logging.info|http://logging.info/]('executing custom udf with logging and metric example...') try: result = x/y self.success_counter.inc() return result except Exception as e: self.fail_counter.inc() raise e divide_udf = udf(DivideUDF(), result_type=DataTypes.DOUBLE()) === Exception stack trace: === 2023-07-26 18:17:20 org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception while processing timer. at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1575) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1550) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1704) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1693) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:838) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:787) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: TimerException\{java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush} ... 15 more Caused by: java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:300) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:118) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1702) ... 14 more Caused by: java.lang.RuntimeException: Failed to close remote bundle at
[jira] [Created] (FLINK-32195) Add SQL Gateway custom headers support
Elkhan Dadashov created FLINK-32195: --- Summary: Add SQL Gateway custom headers support Key: FLINK-32195 URL: https://issues.apache.org/jira/browse/FLINK-32195 Project: Flink Issue Type: New Feature Components: Table SQL / Gateway Affects Versions: 1.17.0 Reporter: Elkhan Dadashov For some use cases, it might be needed setting a few extra HTTP headers with a request to FlinkSQL Gateway, for example, a cookie for Auth/session. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32126) When program arg contains two single quotes org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes fails
Elkhan Dadashov created FLINK-32126: --- Summary: When program arg contains two single quotes org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes fails Key: FLINK-32126 URL: https://issues.apache.org/jira/browse/FLINK-32126 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.17.0 Environment: Flink 1.17 Reporter: Elkhan Dadashov When the program argument contains two single quotes, then it fails : ``` for key '$internal.application.program-args'.\n\tat org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:126)\n\tat org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\n Caused by: java.lang.IllegalArgumentException: Could not split string. Quoting was not closed properly.\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:347)\n\tat org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat java.base/java.util.Optional.map(Optional.java:265)\n\tat org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714) ``` Double single quotes are used in faker connector extensively, and if FlinkSQL script is passed as main argument, then it fails with the above exception. This is just to show the use case of two single quotes usage in practice. ``` CREATE TEMPORARY TABLE IF NOT EXISTS xyz_table ( env STRING, dt DATE ) WITH ( 'connector' = 'faker' , 'rows-per-second' = '20' , 'fields.env.expression' = '#\{Options.option ''VAL'',''LIVE'')}' , 'fields.dt.expression' = '#\{date.past ''48'',''HOURS''}', 'source.parallelism' = '3' ); ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28991) Add documentation for lookup table caching feature
[ https://issues.apache.org/jira/browse/FLINK-28991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697611#comment-17697611 ] Elkhan Dadashov commented on FLINK-28991: - [~renqs] , +1 on documentation plan which describes how to benefit from caching in FlinkSQL. > Add documentation for lookup table caching feature > -- > > Key: FLINK-28991 > URL: https://issues.apache.org/jira/browse/FLINK-28991 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Affects Versions: 1.16.0 >Reporter: Qingsheng Ren >Assignee: Qingsheng Ren >Priority: Major > > We need a documentation to describe how to implement a lookup table based on > the new caching framework -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-19935) Supports configure heap memory of sql-client to avoid OOM
[ https://issues.apache.org/jira/browse/FLINK-19935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650132#comment-17650132 ] Elkhan Dadashov commented on FLINK-19935: - This happens in Flink 1.16.0 version also when SQL script is very large (>=164kb). This is very easy to reproduce with `./bin/sql-client -f script_larger_than164kb.sql` command. > Supports configure heap memory of sql-client to avoid OOM > - > > Key: FLINK-19935 > URL: https://issues.apache.org/jira/browse/FLINK-19935 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Client >Affects Versions: 1.11.2 >Reporter: harold.miao >Priority: Minor > Labels: auto-deprioritized-major > Fix For: 1.17.0 > > Attachments: image-2020-11-03-10-31-08-294.png > > > hi > when use sql-client submit job, the command below donot set JVM heap > pramameters. And cause OOM error in my production environment. > exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList > "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" > org.apache.flink.table.client.SqlClient "$@" > > !image-2020-11-03-10-31-08-294.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29785) Upgrade Flink Elasticsearch-7 connector elasticsearch.version to 7.17.0
Elkhan Dadashov created FLINK-29785: --- Summary: Upgrade Flink Elasticsearch-7 connector elasticsearch.version to 7.17.0 Key: FLINK-29785 URL: https://issues.apache.org/jira/browse/FLINK-29785 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Reporter: Elkhan Dadashov `elasticsearch-7` connector uses ElasticSearch `7.10.2` version. [https://github.com/apache/flink-connector-elasticsearch/blob/main/flink-connector-elasticsearch7/pom.xml#L39] When ElasticSearch server side is `8.X.X` then, `elasticsearch-7` connector does not work. For Example for ElasticSearch `8.2.2` version on the server side, the minimum required version on the `elasticsearch-7` Flink connector side is `"minimum_wire_compatibility_version" : "7.17.0"`. As of today `elasticsearch-8` does not exist yet. [ Add Elasticsearch 8.0 support Jira ticket |https://issues.apache.org/jira/browse/FLINK-26088 ] . With this intermediare step - upgrading `elasticsearch-7` connector `elasticsearch.version` to 7.17.0 can help Flink users still ingest into ElasticSearch which has `8.X.X` version deployed on the server side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25117) NoSuchMethodError getCatalog()
[ https://issues.apache.org/jira/browse/FLINK-25117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17485407#comment-17485407 ] Elkhan Dadashov commented on FLINK-25117: - [~zzt] I faced the same issue. It happens in Flink 1.14.3 version also. This is due to`/flink/lib/flink-table_2.12-*.jar` which also brings in CatalogManager and that does not have `TableLookupResult` in it, or `getCatalog()` method. Remove that jar from your `flink/lib` directory. That will resolve this issue. > NoSuchMethodError getCatalog() > -- > > Key: FLINK-25117 > URL: https://issues.apache.org/jira/browse/FLINK-25117 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.13.3 > Environment: offical docker image, flink:1.13.3-scala_2.12 >Reporter: zzt >Priority: Major > > {code:java} > Flink SQL> insert into `wide_order` (`user_id`, `row_num`, `sum`) > > select `t`.`receiver_user_id`, `t`.`rowNum`, `t`.`total` > > from (select `t`.`receiver_user_id`, `t`.`total`, ROW_NUMBER() OVER (ORDER > > BY total desc) as `rowNum` > > from (select `order_view`.`receiver_user_id`, > > sum(`order_view`.`total`) as `total` > > from `order_view` where create_time > '2021-11-01 00:24:55.453' > > group by `order_view`.`receiver_user_id`) `t`) `t` > > where `rowNum` <= 1; > Exception in thread "main" org.apache.flink.table.client.SqlClientException: > Unexpected exception. This is a bug. Please consider filing an issue. > at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) > Caused by: java.lang.NoSuchMethodError: > org.apache.flink.table.catalog.CatalogManager$TableLookupResult.getCatalog()Ljava/util/Optional; > at > org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.extractTableStats(DatabaseCalciteSchema.java:106) > at > org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getStatistic(DatabaseCalciteSchema.java:90) > at > org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:79) > at java.util.Optional.map(Optional.java:215) > at > org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:74) > at > org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) > at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) > at > org.apache.calcite.sql.validate.SqlValidatorUtil.getTableEntryFrom(SqlValidatorUtil.java:1059) > at > org.apache.calcite.sql.validate.SqlValidatorUtil.getTableEntry(SqlValidatorUtil.java:1016) > at > org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:119) > at > org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.getTable(FlinkCalciteCatalogReader.java:86) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:116) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:56) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:47) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:113) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:176) > at > org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:176) > at > org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:385) > at > org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:326) > at > org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297) > at > org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221) > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) > at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) > ... 1 more > Shutting down the