[jira] [Commented] (FLINK-33188) PyFlink MapState with Types.ROW() throws exception

2023-10-05 Thread Elkhan Dadashov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772336#comment-17772336
 ] 

Elkhan Dadashov commented on FLINK-33188:
-

After digging into the flink-python code, It seems if 
`PYFLINK_GATEWAY_DISABLED` is set to false in an environment variable, then 
using Types.LIST(Types.ROW([...])) does not have any issue, once Java Gateway 
is launched.
 
It was unexpected for Flink local run to set this flag to false explicitly.
 
This is a workaround for this issue:

 
 def open(self, runtime_context: RuntimeContext):
        state_ttl_config = (
            StateTtlConfig.new_builder(Time.seconds(1))
            .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite)
            .disable_cleanup_in_background()
            .build()
        )
        import os
        os.environ["PYFLINK_GATEWAY_DISABLED"] = "0"

> PyFlink MapState with Types.ROW() throws exception
> --
>
> Key: FLINK-33188
> URL: https://issues.apache.org/jira/browse/FLINK-33188
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, API / Type Serialization System
>Affects Versions: 1.17.1
>Reporter: Elkhan Dadashov
>Priority: Major
>
> I'm trying to use MapState, where the value will be a list of  'pyflink.common.types.Row'> type elements.
>  
> Wanted to check if anyone else faced the same issue while trying to use 
> MapState in PyFlink with complex types.
>  
> Here is the code:
>  
> from pyflink.common import Time
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import (
>     KeyedCoProcessFunction,
>     KeySelector,
>     RuntimeContext,
> )
> from pyflink.datastream.state import (
>     MapStateDescriptor,
>     StateTtlConfig,
>     ValueStateDescriptor,
>     ListStateDescriptor
> )
> from pyflink.table import DataTypes, StreamTableEnvironment
> class MyKeyedCoProcessFunction(KeyedCoProcessFunction):
>     def __init__(self):
>         self.my_map_state = None
>     def open(self, runtime_context: RuntimeContext):
>         state_ttl_config = (
>             StateTtlConfig.new_builder(Time.seconds(1))
>             .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite)
>             .disable_cleanup_in_background()
>             .build()
>         )
>         my_map_state_descriptor = MapStateDescriptor(
>             "my_map_state",
>             Types.SQL_TIMESTAMP(),
>             Types.LIST(Types.ROW([
>                 Types.STRING(), 
>                 Types.STRING(), 
>                 Types.STRING(), 
>                 Types.STRING(), 
>                 Types.STRING(), 
>                 Types.STRING(), 
>                 Types.STRING(), 
>                 Types.STRING(), 
>                 Types.SQL_TIMESTAMP(), 
>                 Types.SQL_TIMESTAMP(), 
>                 Types.SQL_TIMESTAMP(), 
>                 Types.BIG_INT() 
>             ]))
>         )
>         my_map_state_descriptor.enable_time_to_live(state_ttl_config)
>         self.my_map_state = 
> runtime_context.get_map_state(my_map_state_descriptor)
>  
> But while running this code, it fails with this exception at job startup (at 
> runtime_context.get_map_state(my_map_state_descriptor)), even without trying 
> to add anything to the state.
>  
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 249, in 
> pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation.__init__
> File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 132, in 
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
> File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
>  line 127, in open
> self.open_func()
> File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
>  line 296, in open_func
> process_function.open(runtime_context)
> File"/tmp/ipykernel_83481/1603226134.py", line 57, in open
> File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/runtime_context.py",
>  line 125, in get_map_state
> map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder
> File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
>  line 812, in from_type_info
> from_type_info(type_info._key_type_info), 
> from_type_info(type_info._value_type_info))
> File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
>  line 809, in from_type_info
> returnGenericArrayCoder(from_type_info(type_info.elem_type))
> File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
>  line 819, in from_type_info
> [f for f in type_info.get_field_names()])
> 

[jira] [Created] (FLINK-33188) PyFlink MapState with Types.ROW() throws exception

2023-10-04 Thread Elkhan Dadashov (Jira)
Elkhan Dadashov created FLINK-33188:
---

 Summary: PyFlink MapState with Types.ROW() throws exception
 Key: FLINK-33188
 URL: https://issues.apache.org/jira/browse/FLINK-33188
 Project: Flink
  Issue Type: Bug
  Components: API / Python, API / Type Serialization System
Affects Versions: 1.17.1
Reporter: Elkhan Dadashov


I'm trying to use MapState, where the value will be a list of  type elements.
 
Wanted to check if anyone else faced the same issue while trying to use 
MapState in PyFlink with complex types.
 
Here is the code:
 
from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import (
    KeyedCoProcessFunction,
    KeySelector,
    RuntimeContext,
)
from pyflink.datastream.state import (
    MapStateDescriptor,
    StateTtlConfig,
    ValueStateDescriptor,
    ListStateDescriptor
)
from pyflink.table import DataTypes, StreamTableEnvironment


class MyKeyedCoProcessFunction(KeyedCoProcessFunction):
    def __init__(self):
        self.my_map_state = None

    def open(self, runtime_context: RuntimeContext):
        state_ttl_config = (
            StateTtlConfig.new_builder(Time.seconds(1))
            .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite)
            .disable_cleanup_in_background()
            .build()
        )

        my_map_state_descriptor = MapStateDescriptor(
            "my_map_state",
            Types.SQL_TIMESTAMP(),
            Types.LIST(Types.ROW([
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.SQL_TIMESTAMP(), 
                Types.SQL_TIMESTAMP(), 
                Types.SQL_TIMESTAMP(), 
                Types.BIG_INT() 
            ]))
        )
        my_map_state_descriptor.enable_time_to_live(state_ttl_config)
        self.my_map_state = 
runtime_context.get_map_state(my_map_state_descriptor)
 
But while running this code, it fails with this exception at job startup (at 
runtime_context.get_map_state(my_map_state_descriptor)), even without trying to 
add anything to the state.
 
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 249, in 
pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation.__init__
File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 132, in 
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
 line 127, in open
self.open_func()
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
 line 296, in open_func
process_function.open(runtime_context)
File"/tmp/ipykernel_83481/1603226134.py", line 57, in open
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/runtime_context.py",
 line 125, in get_map_state
map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", 
line 812, in from_type_info
from_type_info(type_info._key_type_info), 
from_type_info(type_info._value_type_info))
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", 
line 809, in from_type_info
returnGenericArrayCoder(from_type_info(type_info.elem_type))
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", 
line 819, in from_type_info
[f for f in type_info.get_field_names()])
File"/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", line 
377, in get_field_names
j_field_names = self.get_java_type_info().getFieldNames()
File"/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", line 
391, in get_java_type_info
j_types_array = get_gateway()\
File"/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", line 
62, in get_gateway
_gateway = launch_gateway()
File"/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", line 
86, in launch_gateway
raise Exception("It's launching the PythonGatewayServer during Python UDF 
execution "
Exception: It's launching the PythonGatewayServer during Python UDF execution 
which is unexpected. It usually happens when the job codes are in the top level 
of the Python script file and are not enclosed in a `if name == 'main'` 
statement.If I switch from Tupes.ROW to Types.TUPLE() it works without any 
exception.
 
This works:
 
my_map_state_descriptor = MapStateDescriptor(
            "my_map_state",
            Types.SQL_TIMESTAMP(),
            Types.LIST(Types.TUPLE([
                Types.STRING(),
                Types.STRING(),
              

[jira] [Reopened] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme

2023-09-22 Thread Elkhan Dadashov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elkhan Dadashov reopened FLINK-32884:
-

> PyFlink remote execution should support URLs with paths and https scheme
> 
>
> Key: FLINK-32884
> URL: https://issues.apache.org/jira/browse/FLINK-32884
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Runtime / REST
>Affects Versions: 1.17.1
>Reporter: Elkhan Dadashov
>Assignee: Elkhan Dadashov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
> local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081/`, 
> but it does not support the placement of the JobManager behind a proxy or 
> using an Ingress for routing to a specific Flink cluster based on the URL 
> path. In the current scenario, it expects JobManager to access PyFlink jobs 
> at `http://:/v1/jobs` endpoint. Mapping to a non-root 
> location, 
> `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
>  is not supported.
> This will use changes from 
> [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885)(https://issues.apache.org/jira/browse/FLINK-32885)
> Since RestClusterClient talks to the JobManager via its REST endpoint, the 
> right format for `SUBMIT_ARGS` is a URL with a path (also support for https 
> scheme).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme

2023-09-12 Thread Elkhan Dadashov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elkhan Dadashov updated FLINK-32884:

Description: 
Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081/`, 
but it does not support the placement of the JobManager behind a proxy or using 
an Ingress for routing to a specific Flink cluster based on the URL path. In 
the current scenario, it expects JobManager to access PyFlink jobs at 
`http://:/v1/jobs` endpoint. Mapping to a non-root location, 
`https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
 is not supported.

This will use changes from 
[FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885)(https://issues.apache.org/jira/browse/FLINK-32885)

Since RestClusterClient talks to the JobManager via its REST endpoint, the 
right format for `SUBMIT_ARGS` is a URL with a path (also support for https 
scheme).

  was:
Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
local execution it works fine `SUBMIT_ARGS=remote -m 
[http://localhost:8081|http://localhost:8081/]`, but it does not support the 
placement of the JobManager befind a proxy or using an Ingress for routing to a 
specific Flink cluster based on the URL path. In current scenario, it expects 
JobManager access PyFlink jobs at `http://:/v1/jobs` endpoint. 
Mapping to a non-root location, 
`https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
 is not supported.

This will use changes from 
FLINK-32885(https://issues.apache.org/jira/browse/FLINK-32885)

Since RestClusterClient talks to the JobManager via its REST endpoint, the 
right format for `SUBMIT_ARGS` is URL with path (also support for https scheme).


> PyFlink remote execution should support URLs with paths and https scheme
> 
>
> Key: FLINK-32884
> URL: https://issues.apache.org/jira/browse/FLINK-32884
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Runtime / REST
>Affects Versions: 1.17.1
>Reporter: Elkhan Dadashov
>Assignee: Elkhan Dadashov
>Priority: Major
> Fix For: 1.19.0
>
>
> Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
> local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081/`, 
> but it does not support the placement of the JobManager behind a proxy or 
> using an Ingress for routing to a specific Flink cluster based on the URL 
> path. In the current scenario, it expects JobManager to access PyFlink jobs 
> at `http://:/v1/jobs` endpoint. Mapping to a non-root 
> location, 
> `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
>  is not supported.
> This will use changes from 
> [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885)(https://issues.apache.org/jira/browse/FLINK-32885)
> Since RestClusterClient talks to the JobManager via its REST endpoint, the 
> right format for `SUBMIT_ARGS` is a URL with a path (also support for https 
> scheme).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme

2023-09-12 Thread Elkhan Dadashov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elkhan Dadashov updated FLINK-32884:

Fix Version/s: 1.19.0

> PyFlink remote execution should support URLs with paths and https scheme
> 
>
> Key: FLINK-32884
> URL: https://issues.apache.org/jira/browse/FLINK-32884
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Runtime / REST
>Affects Versions: 1.17.1
>Reporter: Elkhan Dadashov
>Assignee: Elkhan Dadashov
>Priority: Major
> Fix For: 1.19.0
>
>
> Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
> local execution it works fine `SUBMIT_ARGS=remote -m 
> [http://localhost:8081|http://localhost:8081/]`, but it does not support the 
> placement of the JobManager befind a proxy or using an Ingress for routing to 
> a specific Flink cluster based on the URL path. In current scenario, it 
> expects JobManager access PyFlink jobs at `http://:/v1/jobs` 
> endpoint. Mapping to a non-root location, 
> `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
>  is not supported.
> This will use changes from 
> FLINK-32885(https://issues.apache.org/jira/browse/FLINK-32885)
> Since RestClusterClient talks to the JobManager via its REST endpoint, the 
> right format for `SUBMIT_ARGS` is URL with path (also support for https 
> scheme).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme

2023-09-12 Thread Elkhan Dadashov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elkhan Dadashov updated FLINK-32884:

Description: 
Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
local execution it works fine `SUBMIT_ARGS=remote -m 
[http://localhost:8081|http://localhost:8081/]`, but it does not support the 
placement of the JobManager befind a proxy or using an Ingress for routing to a 
specific Flink cluster based on the URL path. In current scenario, it expects 
JobManager access PyFlink jobs at `http://:/v1/jobs` endpoint. 
Mapping to a non-root location, 
`https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
 is not supported.

This will use changes from 
FLINK-32885(https://issues.apache.org/jira/browse/FLINK-32885)

Since RestClusterClient talks to the JobManager via its REST endpoint, the 
right format for `SUBMIT_ARGS` is URL with path (also support for https scheme).

  was:
Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
local execution it works fine `SUBMIT_ARGS=remote -m 
[http://localhost:8081|http://localhost:8081/]`, but it does not support the 
placement of the JobManager befind a proxy or using an Ingress for routing to a 
specific Flink cluster based on the URL path. In current scenario, it expects 
JobManager access PyFlink jobs at `http://:/v1/jobs` endpoint. 
Mapping to a non-root location, 
`https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
 is not supported.

This will use changes from 
[FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885)

Since RestClusterClient talks to the JobManager via its REST endpoint, the 
right format for `SUBMIT_ARGS` is URL with path (also support for https scheme).

I intend to change these classes in a backward compatible way

flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
flink-core/src/main/java/org/apache/flink/util/NetUtils.java


> PyFlink remote execution should support URLs with paths and https scheme
> 
>
> Key: FLINK-32884
> URL: https://issues.apache.org/jira/browse/FLINK-32884
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Runtime / REST
>Affects Versions: 1.17.1
>Reporter: Elkhan Dadashov
>Assignee: Elkhan Dadashov
>Priority: Major
>
> Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
> local execution it works fine `SUBMIT_ARGS=remote -m 
> [http://localhost:8081|http://localhost:8081/]`, but it does not support the 
> placement of the JobManager befind a proxy or using an Ingress for routing to 
> a specific Flink cluster based on the URL path. In current scenario, it 
> expects JobManager access PyFlink jobs at `http://:/v1/jobs` 
> endpoint. Mapping to a non-root location, 
> `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
>  is not supported.
> This will use changes from 
> FLINK-32885(https://issues.apache.org/jira/browse/FLINK-32885)
> Since RestClusterClient talks to the JobManager via its REST endpoint, the 
> right format for `SUBMIT_ARGS` is URL with path (also support for https 
> scheme).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32885) Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution

2023-09-12 Thread Elkhan Dadashov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elkhan Dadashov closed FLINK-32885.
---

> Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used 
> by RestClusterClient for PyFlink remote execution
> -
>
> Key: FLINK-32885
> URL: https://issues.apache.org/jira/browse/FLINK-32885
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Table SQL / Gateway
>Affects Versions: 1.17.1
>Reporter: Elkhan Dadashov
>Assignee: Elkhan Dadashov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has 
> dependency on `flink-clients` module. RestClusterClient will also need to use 
> UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor 
> related classes to achieve this.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32885) Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution

2023-09-12 Thread Elkhan Dadashov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elkhan Dadashov resolved FLINK-32885.
-
Resolution: Fixed

> Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used 
> by RestClusterClient for PyFlink remote execution
> -
>
> Key: FLINK-32885
> URL: https://issues.apache.org/jira/browse/FLINK-32885
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Table SQL / Gateway
>Affects Versions: 1.17.1
>Reporter: Elkhan Dadashov
>Assignee: Elkhan Dadashov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has 
> dependency on `flink-clients` module. RestClusterClient will also need to use 
> UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor 
> related classes to achieve this.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32885) Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution

2023-09-12 Thread Elkhan Dadashov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elkhan Dadashov updated FLINK-32885:

Fix Version/s: 1.19.0
  Description: 
UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has 
dependency on `flink-clients` module. RestClusterClient will also need to use 
UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor 
related classes to achieve this.

 

  was:
UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has 
dependency on `flink-clients` module. RestClusterClient will also need to use 
UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor 
related classes to achieve this.

I intend to change these classes in a backward compatible way

flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/SQLGatewayUrlPrefixDecorator.java
flink-clients/src/main/java/org/apache/flink/client/program/rest/MonitoringAPIMessageHeaders.java
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java


> Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used 
> by RestClusterClient for PyFlink remote execution
> -
>
> Key: FLINK-32885
> URL: https://issues.apache.org/jira/browse/FLINK-32885
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Table SQL / Gateway
>Affects Versions: 1.17.1
>Reporter: Elkhan Dadashov
>Assignee: Elkhan Dadashov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has 
> dependency on `flink-clients` module. RestClusterClient will also need to use 
> UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor 
> related classes to achieve this.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme

2023-08-16 Thread Elkhan Dadashov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elkhan Dadashov updated FLINK-32884:

Description: 
Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
local execution it works fine `SUBMIT_ARGS=remote -m 
[http://localhost:8081|http://localhost:8081/]`, but it does not support the 
placement of the JobManager befind a proxy or using an Ingress for routing to a 
specific Flink cluster based on the URL path. In current scenario, it expects 
JobManager access PyFlink jobs at `http://:/v1/jobs` endpoint. 
Mapping to a non-root location, 
`https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
 is not supported.

This will use changes from 
[FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885)

Since RestClusterClient talks to the JobManager via its REST endpoint, the 
right format for `SUBMIT_ARGS` is URL with path (also support for https scheme).

I intend to change these classes in a backward compatible way

flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
flink-core/src/main/java/org/apache/flink/util/NetUtils.java

  was:
Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081`, 
but it does not support the placement of the JobManager befind a proxy or using 
an Ingress for routing to a specific Flink cluster based on the URL path. In 
current scenario, it expects JobManager access PyFlink jobs at 
`http://:/v1/jobs` endpoint. Mapping to a non-root location, 
`https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
 is not supported.

Since RestClusterClient talks to the JobManager via its REST endpoint, the 
right format for `SUBMIT_ARGS` is URL with path (also support for https scheme).

I intend to change these classes in a backward compatible way

flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
flink-core/src/main/java/org/apache/flink/util/NetUtils.java


> PyFlink remote execution should support URLs with paths and https scheme
> 
>
> Key: FLINK-32884
> URL: https://issues.apache.org/jira/browse/FLINK-32884
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Runtime / REST
>Affects Versions: 1.17.1
>Reporter: Elkhan Dadashov
>Priority: Major
>
> Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
> local execution it works fine `SUBMIT_ARGS=remote -m 
> [http://localhost:8081|http://localhost:8081/]`, but it does not support the 
> placement of the JobManager befind a proxy or using an Ingress for routing to 
> a specific Flink cluster based on the URL path. In current scenario, it 
> expects JobManager access PyFlink jobs at `http://:/v1/jobs` 
> endpoint. Mapping to a non-root location, 
> `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
>  is not supported.
> This will use changes from 
> [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885)
> Since RestClusterClient talks to the JobManager via its REST endpoint, the 
> right format for `SUBMIT_ARGS` is URL with path (also support for https 
> scheme).
> I intend to change these classes in a backward compatible way
> flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
> flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
> flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
> flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
> flink-core/src/main/java/org/apache/flink/util/NetUtils.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32885) Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution

2023-08-16 Thread Elkhan Dadashov (Jira)
Elkhan Dadashov created FLINK-32885:
---

 Summary: Refactoring: Moving UrlPrefixDecorator into flink-clients 
so it can be used by RestClusterClient for PyFlink remote execution
 Key: FLINK-32885
 URL: https://issues.apache.org/jira/browse/FLINK-32885
 Project: Flink
  Issue Type: New Feature
  Components: Client / Job Submission, Table SQL / Gateway
Affects Versions: 1.17.1
Reporter: Elkhan Dadashov


UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has 
dependency on `flink-clients` module. RestClusterClient will also need to use 
UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor 
related classes to achieve this.

I intend to change these classes in a backward compatible way

flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/SQLGatewayUrlPrefixDecorator.java
flink-clients/src/main/java/org/apache/flink/client/program/rest/MonitoringAPIMessageHeaders.java
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme

2023-08-16 Thread Elkhan Dadashov (Jira)
Elkhan Dadashov created FLINK-32884:
---

 Summary: PyFlink remote execution should support URLs with paths 
and https scheme
 Key: FLINK-32884
 URL: https://issues.apache.org/jira/browse/FLINK-32884
 Project: Flink
  Issue Type: New Feature
  Components: Client / Job Submission, Runtime / REST
Affects Versions: 1.17.1
Reporter: Elkhan Dadashov


Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081`, 
but it does not support the placement of the JobManager befind a proxy or using 
an Ingress for routing to a specific Flink cluster based on the URL path. In 
current scenario, it expects JobManager access PyFlink jobs at 
`http://:/v1/jobs` endpoint. Mapping to a non-root location, 
`https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
 is not supported.

Since RestClusterClient talks to the JobManager via its REST endpoint, the 
right format for `SUBMIT_ARGS` is URL with path (also support for https scheme).

I intend to change these classes in a backward compatible way

flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
flink-core/src/main/java/org/apache/flink/util/NetUtils.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32728) Metrics are not reported in Python UDF (used inside FlinkSQL) when exception is raised

2023-08-01 Thread Elkhan Dadashov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elkhan Dadashov updated FLINK-32728:

Issue Type: Bug  (was: New Feature)

> Metrics are not reported in Python UDF (used inside FlinkSQL) when exception 
> is raised
> --
>
> Key: FLINK-32728
> URL: https://issues.apache.org/jira/browse/FLINK-32728
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Table SQL / Runtime
>Affects Versions: 1.16.2, 1.17.1
> Environment: Flink 1.16.2 and Flink 1.17.1 (Python 3.9)
>Reporter: Elkhan Dadashov
>Priority: Major
>
> When Python UDF (which is used inside FlinkSQL) raises an exception, then 
> metrics get lost and not reported.  Facing this issue both in Flink 1.16.2 
> and Flink 1.17.1 (Python 3.9).
> If an exception is not raised, then metrics show up.
>  
> It is not mentioned on Flink documentation that UDFs should not throw an 
> exception.
> ===
> FlinkSQL script content:
> ===
> CREATE TABLE input_table (
> price DOUBLE
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '1'
> );
> CREATE TABLE output_table WITH ('connector' = 'print')
> LIKE input_table (EXCLUDING ALL);
> CREATE FUNCTION myDivide AS 'custom_udf.divide_udf'
> LANGUAGE PYTHON;
> -- Fail scenario: ZeroDivisionError: division by zero
> INSERT into output_table (select myDivide(value, 0)  from input_table);
> ===
> Python UDF content:
> ===
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes
> import logging
> class DivideUDF(ScalarFunction):
>     def __init__(self):
>         self.success_counter = None
>         self.fail_counter = None
>     def open(self, function_context):
>         self.success_counter = 
> function_context.get_metric_group().counter("flinksql_custom_udf_success_metric")
>         self.fail_counter = 
> function_context.get_metric_group().counter("flinksql_custom_udf_fail_metric")
>     def eval(self, x, y):
>         [logging.info|http://logging.info/]('executing custom udf with 
> logging and metric example...')
>         try:
>             result = x/y
>             self.success_counter.inc()
>             return result
>         except Exception as e:
>             self.fail_counter.inc()
>             raise e
> divide_udf = udf(DivideUDF(), result_type=DataTypes.DOUBLE())
>  
> ===
> Exception stack trace:
> ===
>  
> 2023-07-26 18:17:20
> org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception 
> while processing timer.
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1575)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1550)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1704)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1693)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:838)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:787)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: TimerException\{java.lang.RuntimeException: Error while waiting 
> for BeamPythonFunctionRunner flush}
> ... 15 more
> Caused by: java.lang.RuntimeException: Error while waiting for 
> BeamPythonFunctionRunner flush
> at 
> org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107)
> at 
> 

[jira] [Created] (FLINK-32728) Metrics are not reported in Python UDF (used inside FlinkSQL) when exception is raised

2023-08-01 Thread Elkhan Dadashov (Jira)
Elkhan Dadashov created FLINK-32728:
---

 Summary: Metrics are not reported in Python UDF (used inside 
FlinkSQL) when exception is raised
 Key: FLINK-32728
 URL: https://issues.apache.org/jira/browse/FLINK-32728
 Project: Flink
  Issue Type: New Feature
  Components: API / Python, Table SQL / Runtime
Affects Versions: 1.17.1, 1.16.2
 Environment: Flink 1.16.2 and Flink 1.17.1 (Python 3.9)
Reporter: Elkhan Dadashov


When Python UDF (which is used inside FlinkSQL) raises an exception, then 
metrics get lost and not reported.  Facing this issue both in Flink 1.16.2 and 
Flink 1.17.1 (Python 3.9).
If an exception is not raised, then metrics show up.
 
It is not mentioned on Flink documentation that UDFs should not throw an 
exception.
===
FlinkSQL script content:
===

CREATE TABLE input_table (
price DOUBLE
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);

CREATE TABLE output_table WITH ('connector' = 'print')
LIKE input_table (EXCLUDING ALL);

CREATE FUNCTION myDivide AS 'custom_udf.divide_udf'
LANGUAGE PYTHON;

-- Fail scenario: ZeroDivisionError: division by zero
INSERT into output_table (select myDivide(value, 0)  from input_table);

===
Python UDF content:
===

from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes

import logging

class DivideUDF(ScalarFunction):
    def __init__(self):
        self.success_counter = None
        self.fail_counter = None
    def open(self, function_context):
        self.success_counter = 
function_context.get_metric_group().counter("flinksql_custom_udf_success_metric")
        self.fail_counter = 
function_context.get_metric_group().counter("flinksql_custom_udf_fail_metric")
    def eval(self, x, y):
        [logging.info|http://logging.info/]('executing custom udf with logging 
and metric example...')
        try:
            result = x/y
            self.success_counter.inc()
            return result
        except Exception as e:
            self.fail_counter.inc()
            raise e

divide_udf = udf(DivideUDF(), result_type=DataTypes.DOUBLE())
 
===
Exception stack trace:
===
 
2023-07-26 18:17:20
org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception 
while processing timer.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1575)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1550)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1704)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1693)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:838)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:787)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: TimerException\{java.lang.RuntimeException: Error while waiting for 
BeamPythonFunctionRunner flush}
... 15 more
Caused by: java.lang.RuntimeException: Error while waiting for 
BeamPythonFunctionRunner flush
at 
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:300)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:118)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1702)
... 14 more
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at 

[jira] [Created] (FLINK-32195) Add SQL Gateway custom headers support

2023-05-25 Thread Elkhan Dadashov (Jira)
Elkhan Dadashov created FLINK-32195:
---

 Summary: Add SQL Gateway custom headers support
 Key: FLINK-32195
 URL: https://issues.apache.org/jira/browse/FLINK-32195
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Gateway
Affects Versions: 1.17.0
Reporter: Elkhan Dadashov


For some use cases, it might be needed setting a few extra HTTP headers with a 
request to FlinkSQL Gateway, for example, a cookie for Auth/session.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32126) When program arg contains two single quotes org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes fails

2023-05-18 Thread Elkhan Dadashov (Jira)
Elkhan Dadashov created FLINK-32126:
---

 Summary: When program arg contains two single quotes 
org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes fails
 Key: FLINK-32126
 URL: https://issues.apache.org/jira/browse/FLINK-32126
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.17.0
 Environment: Flink 1.17
Reporter: Elkhan Dadashov


When the program argument contains two single quotes, then it fails :

```

for key '$internal.application.program-args'.\n\tat 
org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
 org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat 
org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:126)\n\tat
 
org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\n

Caused by: java.lang.IllegalArgumentException: Could not split string. Quoting 
was not closed properly.\n\tat 
org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
 
org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
 
org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
 
org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:347)\n\tat
 
org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
 java.base/java.util.Optional.map(Optional.java:265)\n\tat 
org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)

```

Double single quotes are used in faker connector extensively, and if FlinkSQL 
script is passed as main argument, then it fails with the above exception. This 
is just to show the use case of two single quotes usage in practice.

```

CREATE TEMPORARY TABLE IF NOT EXISTS xyz_table (
 env STRING,
 dt DATE
)
WITH (
'connector' = 'faker' ,
'rows-per-second' = '20' ,
'fields.env.expression' = '#\{Options.option ''VAL'',''LIVE'')}' ,
'fields.dt.expression' = '#\{date.past ''48'',''HOURS''}',
'source.parallelism' = '3'
);

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28991) Add documentation for lookup table caching feature

2023-03-07 Thread Elkhan Dadashov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697611#comment-17697611
 ] 

Elkhan Dadashov commented on FLINK-28991:
-

[~renqs] , +1 on documentation plan which describes how to benefit from caching 
in FlinkSQL.

> Add documentation for lookup table caching feature
> --
>
> Key: FLINK-28991
> URL: https://issues.apache.org/jira/browse/FLINK-28991
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Affects Versions: 1.16.0
>Reporter: Qingsheng Ren
>Assignee: Qingsheng Ren
>Priority: Major
>
> We need a documentation to describe how to implement a lookup table based on 
> the new caching framework



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-19935) Supports configure heap memory of sql-client to avoid OOM

2022-12-20 Thread Elkhan Dadashov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650132#comment-17650132
 ] 

Elkhan Dadashov commented on FLINK-19935:
-

This happens in Flink 1.16.0 version also when SQL script is very large 
(>=164kb).

This is very easy to reproduce with `./bin/sql-client -f 
script_larger_than164kb.sql` command.

> Supports configure heap memory of sql-client to avoid OOM
> -
>
> Key: FLINK-19935
> URL: https://issues.apache.org/jira/browse/FLINK-19935
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Client
>Affects Versions: 1.11.2
>Reporter: harold.miao
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.17.0
>
> Attachments: image-2020-11-03-10-31-08-294.png
>
>
> hi 
> when use sql-client submit job,  the command below donot set JVM heap 
> pramameters. And cause OOM error in my production environment.
> exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList 
> "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
> org.apache.flink.table.client.SqlClient "$@"
>  
> !image-2020-11-03-10-31-08-294.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29785) Upgrade Flink Elasticsearch-7 connector elasticsearch.version to 7.17.0

2022-10-27 Thread Elkhan Dadashov (Jira)
Elkhan Dadashov created FLINK-29785:
---

 Summary: Upgrade Flink Elasticsearch-7 connector 
elasticsearch.version to 7.17.0
 Key: FLINK-29785
 URL: https://issues.apache.org/jira/browse/FLINK-29785
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Reporter: Elkhan Dadashov


`elasticsearch-7` connector uses ElasticSearch `7.10.2` version.

[https://github.com/apache/flink-connector-elasticsearch/blob/main/flink-connector-elasticsearch7/pom.xml#L39]

 

When ElasticSearch server side is `8.X.X` then, `elasticsearch-7` connector 
does not work. For Example for ElasticSearch `8.2.2` version on the server 
side, the minimum required version on the `elasticsearch-7` Flink connector 
side is `"minimum_wire_compatibility_version" : "7.17.0"`.

As of today `elasticsearch-8` does not exist yet. [ Add Elasticsearch 8.0 
support Jira ticket |https://issues.apache.org/jira/browse/FLINK-26088 ] .

With this intermediare step - upgrading `elasticsearch-7` connector 
`elasticsearch.version` to 7.17.0 can help Flink users still ingest into 
ElasticSearch which has `8.X.X` version deployed on the server side. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25117) NoSuchMethodError getCatalog()

2022-02-01 Thread Elkhan Dadashov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17485407#comment-17485407
 ] 

Elkhan Dadashov commented on FLINK-25117:
-

[~zzt] I faced the same issue. It happens in Flink 1.14.3 version also. This is 
due to`/flink/lib/flink-table_2.12-*.jar` which also brings in CatalogManager 
and that does not have `TableLookupResult` in it, or `getCatalog()` method. 
Remove that jar from your `flink/lib` directory. That will resolve this issue.

> NoSuchMethodError getCatalog()
> --
>
> Key: FLINK-25117
> URL: https://issues.apache.org/jira/browse/FLINK-25117
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.3
> Environment: offical docker image,  flink:1.13.3-scala_2.12
>Reporter: zzt
>Priority: Major
>
> {code:java}
> Flink SQL> insert into `wide_order` (`user_id`, `row_num`, `sum`)
> > select `t`.`receiver_user_id`, `t`.`rowNum`, `t`.`total`
> > from (select `t`.`receiver_user_id`, `t`.`total`, ROW_NUMBER() OVER (ORDER 
> > BY total desc) as `rowNum`
> >       from (select `order_view`.`receiver_user_id`, 
> > sum(`order_view`.`total`) as `total`
> >             from `order_view` where create_time > '2021-11-01 00:24:55.453'
> >             group by `order_view`.`receiver_user_id`) `t`) `t`
> > where `rowNum` <= 1;
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>     at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
>     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> Caused by: java.lang.NoSuchMethodError: 
> org.apache.flink.table.catalog.CatalogManager$TableLookupResult.getCatalog()Ljava/util/Optional;
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.extractTableStats(DatabaseCalciteSchema.java:106)
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getStatistic(DatabaseCalciteSchema.java:90)
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:79)
>     at java.util.Optional.map(Optional.java:215)
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:74)
>     at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>     at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTableEntryFrom(SqlValidatorUtil.java:1059)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTableEntry(SqlValidatorUtil.java:1016)
>     at 
> org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:119)
>     at 
> org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.getTable(FlinkCalciteCatalogReader.java:86)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:116)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:56)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:47)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:113)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:176)
>     at 
> org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:176)
>     at 
> org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:385)
>     at 
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:326)
>     at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
>     at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
>     at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
>     at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
>     at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
>     ... 1 more
> Shutting down the