Re: [PR] Show correct float metric value on flink UI. [flink]

2024-04-20 Thread via GitHub


luocan17 commented on PR #24693:
URL: https://github.com/apache/flink/pull/24693#issuecomment-2067912716

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Show correct float metric value on flink UI. [flink]

2024-04-20 Thread via GitHub


flinkbot commented on PR #24693:
URL: https://github.com/apache/flink/pull/24693#issuecomment-2067879789

   
   ## CI report:
   
   * 0c152dc813def8df2ae89d56c4f4900be0325b63 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Show correct float metric value show on flink UI. [flink]

2024-04-20 Thread via GitHub


luocan17 opened a new pull request, #24693:
URL: https://github.com/apache/flink/pull/24693

   
   
   ## What is the purpose of the change
   
   This PR makes flink ui show float metric value correctly.
   Before this PR:  (`outPoolUsage` always 0 or 1)
   https://github.com/apache/flink/assets/20294680/2b78892d-b820-42df-85dc-3cd271e44f4d;>
   
   After this PR:
   https://github.com/apache/flink/assets/20294680/0d365d02-6e3e-4b54-9a1c-6e9898e25f09;>
   
   
   
   ## Brief change log
   
   
   
   
   ## Verifying this change
   
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35183) Expose FlinkMinorVersion as metric for applications in operator

2024-04-20 Thread Jira
Márton Balassi created FLINK-35183:
--

 Summary: Expose FlinkMinorVersion as metric for applications in 
operator
 Key: FLINK-35183
 URL: https://issues.apache.org/jira/browse/FLINK-35183
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Márton Balassi
Assignee: Márton Balassi
 Fix For: kubernetes-operator-1.9.0


This is a convenience feature on top of the existing Flink version grouping. 
When implementing platform overview dashboards for aggregating metrics from 
multiple operators it comes in handy.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32315) Support local file upload in K8s mode

2024-04-20 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-32315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi updated FLINK-32315:
---
Fix Version/s: 1.20.0

> Support local file upload in K8s mode
> -
>
> Key: FLINK-32315
> URL: https://issues.apache.org/jira/browse/FLINK-32315
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Deployment / Kubernetes
>Reporter: Paul Lin
>Assignee: Ferenc Csaky
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Currently, Flink assumes all resources are locally accessible in the pods, 
> which requires users to prepare the resources by mounting storages, 
> downloading resources with init containers, or rebuilding images for each 
> execution.
> We could make things much easier by introducing a built-in file distribution 
> mechanism based on Flink-supported filesystems. It's implemented in two steps:
>  
> 1. KubernetesClusterDescripter uploads all local resources to remote storage 
> via Flink filesystem (skips if the resources are already remote).
> 2. KubernetesApplicationClusterEntrypoint and KubernetesTaskExecutorRunner 
> download the resources and put them in the classpath during startup.
>  
> The 2nd step is mostly done by 
> [FLINK-28915|https://issues.apache.org/jira/browse/FLINK-28915], thus this 
> issue is focused on the upload part.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32315) Support local file upload in K8s mode

2024-04-20 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-32315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi closed FLINK-32315.
--
Resolution: Fixed

[{{b3fdb07}}|https://github.com/apache/flink/commit/b3fdb07c04114c515cfc5893e89528bbfb4384ed]
 in master

> Support local file upload in K8s mode
> -
>
> Key: FLINK-32315
> URL: https://issues.apache.org/jira/browse/FLINK-32315
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Deployment / Kubernetes
>Reporter: Paul Lin
>Assignee: Ferenc Csaky
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Currently, Flink assumes all resources are locally accessible in the pods, 
> which requires users to prepare the resources by mounting storages, 
> downloading resources with init containers, or rebuilding images for each 
> execution.
> We could make things much easier by introducing a built-in file distribution 
> mechanism based on Flink-supported filesystems. It's implemented in two steps:
>  
> 1. KubernetesClusterDescripter uploads all local resources to remote storage 
> via Flink filesystem (skips if the resources are already remote).
> 2. KubernetesApplicationClusterEntrypoint and KubernetesTaskExecutorRunner 
> download the resources and put them in the classpath during startup.
>  
> The 2nd step is mostly done by 
> [FLINK-28915|https://issues.apache.org/jira/browse/FLINK-28915], thus this 
> issue is focused on the upload part.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-04-20 Thread via GitHub


mbalassi merged PR #24303:
URL: https://github.com/apache/flink/pull/24303


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-28048) Introduce Source API alternative to FiniteTestSource

2024-04-20 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-28048.
--
Fix Version/s: 1.20.0
   Resolution: Implemented

> Introduce Source API alternative to FiniteTestSource
> 
>
> Key: FLINK-28048
> URL: https://issues.apache.org/jira/browse/FLINK-28048
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common, Tests
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> This also has to verify that Iceberg connector tests mentioned in FLINK-28054 
> also get covered by the solution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33463][Connector/JDBC] Support the implementation of dynamic source tables based on the new source [flink-connector-jdbc]

2024-04-20 Thread via GitHub


RocMarshal closed pull request #117: [FLINK-33463][Connector/JDBC] Support the 
implementation of dynamic source tables based on the new source
URL: https://github.com/apache/flink-connector-jdbc/pull/117


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35182) Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Pulsar connector

2024-04-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35182:
---
Labels: pull-request-available  (was: )

> Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
> Pulsar connector
> -
>
> Key: FLINK-35182
> URL: https://issues.apache.org/jira/browse/FLINK-35182
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Pulsar
>Reporter: Zhongqiang Gong
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35182) Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Pulsar connector

2024-04-20 Thread Zhongqiang Gong (Jira)
Zhongqiang Gong created FLINK-35182:
---

 Summary: Bump org.apache.commons:commons-compress from 1.24.0 to 
1.26.1 for Flink Pulsar connector
 Key: FLINK-35182
 URL: https://issues.apache.org/jira/browse/FLINK-35182
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Pulsar
Reporter: Zhongqiang Gong






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Polish ci concurrency group [flink-cdc]

2024-04-20 Thread via GitHub


GOODBOY008 commented on PR #3241:
URL: https://github.com/apache/flink-cdc/pull/3241#issuecomment-2067678410

   @leonardBang @Jiabao-Sun PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix] Polish ci concurrency group [flink-cdc]

2024-04-20 Thread via GitHub


GOODBOY008 opened a new pull request, #3241:
URL: https://github.com/apache/flink-cdc/pull/3241

   Changes:
   - Avoid canceling ci in branch.
   
![1713620429122](https://github.com/apache/flink-cdc/assets/13617900/a3def071-5ac1-47ac-9ff0-453ac68f68eb)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35181) Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Elasticsearch connector

2024-04-20 Thread Zhongqiang Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhongqiang Gong closed FLINK-35181.
---
Resolution: Fixed

> Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
> Elasticsearch connector
> 
>
> Key: FLINK-35181
> URL: https://issues.apache.org/jira/browse/FLINK-35181
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / ElasticSearch
>Reporter: Zhongqiang Gong
>Assignee: Danny Cranmer
>Priority: Major
>
> Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
> Elasticsearch connector
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35181) Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Elasticsearch connector

2024-04-20 Thread Zhongqiang Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhongqiang Gong updated FLINK-35181:

Summary: Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for 
Flink Elasticsearch connector  (was: CLONE - Bump 
org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
Elasticsearch connector)

> Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
> Elasticsearch connector
> 
>
> Key: FLINK-35181
> URL: https://issues.apache.org/jira/browse/FLINK-35181
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / ElasticSearch
>Reporter: Zhongqiang Gong
>Assignee: Danny Cranmer
>Priority: Major
>
> Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
> Elasticsearch connector
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35181) CLONE - Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Elasticsearch connector

2024-04-20 Thread Zhongqiang Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhongqiang Gong updated FLINK-35181:

Fix Version/s: (was: rabbitmq-3.1.0)

> CLONE - Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for 
> Flink Elasticsearch connector
> 
>
> Key: FLINK-35181
> URL: https://issues.apache.org/jira/browse/FLINK-35181
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors/ RabbitMQ
>Reporter: Zhongqiang Gong
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
>
> Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
> RabbitMQ connector
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35181) CLONE - Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Elasticsearch connector

2024-04-20 Thread Zhongqiang Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhongqiang Gong updated FLINK-35181:

Labels:   (was: pull-request-available)

> CLONE - Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for 
> Flink Elasticsearch connector
> 
>
> Key: FLINK-35181
> URL: https://issues.apache.org/jira/browse/FLINK-35181
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / ElasticSearch
>Reporter: Zhongqiang Gong
>Assignee: Danny Cranmer
>Priority: Major
>
> Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
> RabbitMQ connector
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35181) CLONE - Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Elasticsearch connector

2024-04-20 Thread Zhongqiang Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhongqiang Gong updated FLINK-35181:

Description: 
Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
Elasticsearch connector

 

  was:
Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
RabbitMQ connector

 


> CLONE - Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for 
> Flink Elasticsearch connector
> 
>
> Key: FLINK-35181
> URL: https://issues.apache.org/jira/browse/FLINK-35181
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / ElasticSearch
>Reporter: Zhongqiang Gong
>Assignee: Danny Cranmer
>Priority: Major
>
> Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
> Elasticsearch connector
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35181) CLONE - Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Elasticsearch connector

2024-04-20 Thread Zhongqiang Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhongqiang Gong updated FLINK-35181:

Component/s: Connectors / ElasticSearch
 (was: Connectors/ RabbitMQ)

> CLONE - Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for 
> Flink Elasticsearch connector
> 
>
> Key: FLINK-35181
> URL: https://issues.apache.org/jira/browse/FLINK-35181
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / ElasticSearch
>Reporter: Zhongqiang Gong
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
>
> Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
> RabbitMQ connector
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35181) CLONE - Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Elasticsearch connector

2024-04-20 Thread Zhongqiang Gong (Jira)
Zhongqiang Gong created FLINK-35181:
---

 Summary: CLONE - Bump org.apache.commons:commons-compress from 
1.24.0 to 1.26.1 for Flink Elasticsearch connector
 Key: FLINK-35181
 URL: https://issues.apache.org/jira/browse/FLINK-35181
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors/ RabbitMQ
Reporter: Zhongqiang Gong
Assignee: Danny Cranmer
 Fix For: rabbitmq-3.1.0


Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
RabbitMQ connector

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35173) Debezium for Mysql connector Custom Time Serializer

2024-04-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35173:
---
Labels: CDC pull-request-available  (was: CDC)

> Debezium for Mysql connector Custom Time Serializer 
> 
>
> Key: FLINK-35173
> URL: https://issues.apache.org/jira/browse/FLINK-35173
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: 3.1.0
>Reporter: ZhengYu Chen
>Priority: Major
>  Labels: CDC, pull-request-available
> Fix For: 3.1.0
>
>
> Currently, Flink CDC Time encounters time type errors (including DateTime, 
> Time, Date, TimeStamp) when using MySQL Connector 
> (JsonDebeziumDeserializationSchema) as deserialization, and the converted 
> time is wrong. The essential reason is that the timestamp returned by the 
> bottom layer of debezium is UTC (such as io.debezium.time.Timestamp). The 
> community has already had some 
> [PR|https://github.com/apache/flink-cdc/pull/1366/files#diff-e129e9fae3eea0bb32f0019debb4932413c91088d6dae656e2ecb63913badae4],
>  but they are not work.
> Now a way is provided to provide a solution based on Debezium's custom 
> Convert interface 
> (https://debezium.io/documentation/reference/1.9/development/converters.html),
> Users can choose to convert the above four time types into STRING according 
> to the specified time format to ensure that users can correctly convert JSON 
> when using the Flink DataStream API.
> When the user enables this converter, we need to configure it according to 
> the parameters, That's some datastream use case:
> {code:java}
> Properties debeziumProperties = new Properties();
> debeziumProperties.setProperty("converters", "datetime");
> debeziumProperties.setProperty("datetime.database.type", 
> DataBaseType.MYSQL.getType());
> debeziumProperties.setProperty("datetime.type", 
> "cn.xxx.sources.cdc.MysqlDebeziumConverter");
> debeziumProperties.setProperty("datetime.format.date", "-MM-dd");
> debeziumProperties.setProperty("datetime.format.time", "HH:mm:ss");
> debeziumProperties.setProperty("datetime.format.datetime", "-MM-dd 
> HH:mm:ss");
> debeziumProperties.setProperty("datetime.format.timestamp", "-MM-dd 
> HH:mm:ss");
> debeziumProperties.setProperty("datetime.format.timestamp.zone", "UTC+8");
> MySqlSourceBuilder builder = MySqlSource.builder()
>         .hostname(url[0])
>         .port(Integer.parseInt(url[1]))
>         .databaseList(table.getDatabase())
>         .tableList(getTablePattern(table))
>         .username(table.getUserName())
>         .password(table.getPassword())
>         .debeziumProperties(debeziumProperties); {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Draft : [FLINK-35173] Debezium for Mysql connector Custom Time Serializer [flink-cdc]

2024-04-20 Thread via GitHub


czy006 opened a new pull request, #3240:
URL: https://github.com/apache/flink-cdc/pull/3240

   Currently, Flink CDC Time encounters time type errors (including DateTime, 
Time, Date, TimeStamp) when using MySQL Connector 
(JsonDebeziumDeserializationSchema) as deserialization, and the converted time 
is wrong. The essential reason is that the timestamp returned by the bottom 
layer of debezium is UTC (such as io.debezium.time.Timestamp). The community 
has already had some 
[PR](https://github.com/apache/flink-cdc/pull/1366/files#diff-e129e9fae3eea0bb32f0019debb4932413c91088d6dae656e2ecb63913badae4),
 but they are not work.
   
   Now a way is provided to provide a solution based on Debezium's custom 
Convert interface 
(https://debezium.io/documentation/reference/1.9/development/converters.html),
   Users can choose to convert the above four time types into STRING according 
to the specified time format to ensure that users can correctly convert JSON 
when using the Flink DataStream API.This PR:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35173) Debezium for Mysql connector Custom Time Serializer

2024-04-20 Thread ZhengYu Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZhengYu Chen updated FLINK-35173:
-
Summary: Debezium for Mysql connector Custom Time Serializer   (was: 
Debezium Custom Time Serializer )

> Debezium for Mysql connector Custom Time Serializer 
> 
>
> Key: FLINK-35173
> URL: https://issues.apache.org/jira/browse/FLINK-35173
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: 3.1.0
>Reporter: ZhengYu Chen
>Priority: Major
>  Labels: CDC
> Fix For: 3.1.0
>
>
> Currently, Flink CDC Time encounters time type errors (including DateTime, 
> Time, Date, TimeStamp) when using MySQL Connector 
> (JsonDebeziumDeserializationSchema) as deserialization, and the converted 
> time is wrong. The essential reason is that the timestamp returned by the 
> bottom layer of debezium is UTC (such as io.debezium.time.Timestamp). The 
> community has already had some 
> [PR|https://github.com/apache/flink-cdc/pull/1366/files#diff-e129e9fae3eea0bb32f0019debb4932413c91088d6dae656e2ecb63913badae4],
>  but they are not work.
> Now a way is provided to provide a solution based on Debezium's custom 
> Convert interface 
> (https://debezium.io/documentation/reference/1.9/development/converters.html),
> Users can choose to convert the above four time types into STRING according 
> to the specified time format to ensure that users can correctly convert JSON 
> when using the Flink DataStream API.
> When the user enables this converter, we need to configure it according to 
> the parameters, That's some datastream use case:
> {code:java}
> Properties debeziumProperties = new Properties();
> debeziumProperties.setProperty("converters", "datetime");
> debeziumProperties.setProperty("datetime.database.type", 
> DataBaseType.MYSQL.getType());
> debeziumProperties.setProperty("datetime.type", 
> "cn.xxx.sources.cdc.MysqlDebeziumConverter");
> debeziumProperties.setProperty("datetime.format.date", "-MM-dd");
> debeziumProperties.setProperty("datetime.format.time", "HH:mm:ss");
> debeziumProperties.setProperty("datetime.format.datetime", "-MM-dd 
> HH:mm:ss");
> debeziumProperties.setProperty("datetime.format.timestamp", "-MM-dd 
> HH:mm:ss");
> debeziumProperties.setProperty("datetime.format.timestamp.zone", "UTC+8");
> MySqlSourceBuilder builder = MySqlSource.builder()
>         .hostname(url[0])
>         .port(Integer.parseInt(url[1]))
>         .databaseList(table.getDatabase())
>         .tableList(getTablePattern(table))
>         .username(table.getUserName())
>         .password(table.getPassword())
>         .debeziumProperties(debeziumProperties); {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35180) Instant in row doesn't convert to correct type in python thread mode

2024-04-20 Thread Wei Yuan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Yuan updated FLINK-35180:
-
Summary: Instant in row doesn't convert to correct type in python thread 
mode  (was: Instant in row doesn't convert to correct type in python process 
mode)

> Instant in row doesn't convert to correct type in python thread mode
> 
>
> Key: FLINK-35180
> URL: https://issues.apache.org/jira/browse/FLINK-35180
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Wei Yuan
>Priority: Major
>
> {code:java}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.common import Types, WatermarkStrategy, Configuration
> from pyflink.table import EnvironmentSettings, TableEnvironment
> from pyflink.table import StreamTableEnvironment, Schema
> from pyflink.datastream.functions import ProcessFunction, MapFunction
> # init task env
> config = Configuration()
> # config.set_string("python.execution-mode", "thread")
> config.set_string("python.execution-mode", "process")
> config.set_string("python.client.executable", "/root/miniconda3/bin/python3")
> config.set_string("python.executable", "/root/miniconda3/bin/python3")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> table_env = StreamTableEnvironment.create(env)
> table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", 
> "content")
> table_env.create_temporary_view("test_table", table)
> result_table = table_env.sql_query("select *, NOW() as dt from test_table")
> result_ds = table_env.to_data_stream(result_table)
> def test_func(row):
>     print(row)
>     return row
> result_ds.map(test_func)
> env.execute(){code}
> output in process mode:
> {code:java}
> Row(id=1, content='Hi', dt=Instant<1713609386, 27100>)
> Row(id=2, content='Hello', dt=Instant<1713609386, 58000>) {code}
> output in thread mode:
> {code:java}
>  
> Row(id=1, content='Hi', dt=)
> Traceback (most recent call last):
>   File "/home/disk1/yuanwei/bug.py", line 31, in 
>     env.execute()
>   File 
> "/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py",
>  line 773, in execute
>     return 
> JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
>   File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", 
> line 1322, in {}call{}
>     return_value = get_return_value(
>   File 
> "/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", 
> line 146, in deco
>     return f(*a, **kw)
>   File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 
> 326, in get_return_value
>     raise Py4JJavaError(
> py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
> : org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>         at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>         at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
>         at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>         at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>         at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>         at 
> org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>         at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>         at 
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267)
>         at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
>         at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>         at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>         at 
> 

[jira] [Updated] (FLINK-35180) Instant in row doesn't convert to correct type in python process mode

2024-04-20 Thread Wei Yuan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Yuan updated FLINK-35180:
-
Description: 
{code:java}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Types, WatermarkStrategy, Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import StreamTableEnvironment, Schema
from pyflink.datastream.functions import ProcessFunction, MapFunction

# init task env
config = Configuration()
# config.set_string("python.execution-mode", "thread")
config.set_string("python.execution-mode", "process")
config.set_string("python.client.executable", "/root/miniconda3/bin/python3")
config.set_string("python.executable", "/root/miniconda3/bin/python3")
env = StreamExecutionEnvironment.get_execution_environment(config)
table_env = StreamTableEnvironment.create(env)

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", 
"content")
table_env.create_temporary_view("test_table", table)
result_table = table_env.sql_query("select *, NOW() as dt from test_table")

result_ds = table_env.to_data_stream(result_table)
def test_func(row):
    print(row)
    return row

result_ds.map(test_func)
env.execute(){code}
output in process mode:
{code:java}
Row(id=1, content='Hi', dt=Instant<1713609386, 27100>)
Row(id=2, content='Hello', dt=Instant<1713609386, 58000>) {code}
output in thread mode:
{code:java}
 
Row(id=1, content='Hi', dt=)
Traceback (most recent call last):
  File "/home/disk1/yuanwei/bug.py", line 31, in 
    env.execute()
  File 
"/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py",
 line 773, in execute
    return 
JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", 
line 1322, in {}call{}
    return_value = get_return_value(
  File 
"/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", 
line 146, in deco
    return f(*a, **kw)
  File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 
326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at 
org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
        at 

[jira] [Updated] (FLINK-35180) Instant in row doesn't convert to correct type in python process mode

2024-04-20 Thread Wei Yuan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Yuan updated FLINK-35180:
-
Description: 
{code:java}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Types, WatermarkStrategy, Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import StreamTableEnvironment, Schema
from pyflink.datastream.functions import ProcessFunction, MapFunction

# init task env
config = Configuration()
# config.set_string("python.execution-mode", "thread")
config.set_string("python.execution-mode", "process")
config.set_string("python.client.executable", "/root/miniconda3/bin/python3")
config.set_string("python.executable", "/root/miniconda3/bin/python3")
env = StreamExecutionEnvironment.get_execution_environment(config)
table_env = StreamTableEnvironment.create(env)

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", 
"content")
table_env.create_temporary_view("test_table", table)
result_table = table_env.sql_query("select *, NOW() as dt from test_table")

result_ds = table_env.to_data_stream(result_table)
def test_func(row):
    print(row)
    return row

result_ds.map(test_func)
env.execute(){code}
output in process mode:
{code:java}
Row(id=1, content='Hi', dt=Instant<1713609386, 27100>)
Row(id=2, content='Hello', dt=Instant<1713609386, 58000>) {code}
output in thread mode:
{code:java}
 
Row(id=1, content='Hi', dt=)
Traceback (most recent call last):
  File "/home/disk1/yuanwei/bug.py", line 31, in 
    env.execute()
  File 
"/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py",
 line 773, in execute
    return 
JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", 
line 1322, in {}call{}
    return_value = get_return_value(
  File 
"/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", 
line 146, in deco
    return f(*a, **kw)
  File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 
326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at 
org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
        at 

[jira] [Updated] (FLINK-35180) Instant in row doesn't convert to correct type in python process mode

2024-04-20 Thread Wei Yuan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Yuan updated FLINK-35180:
-
Description: 
use 

```

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Types, WatermarkStrategy, Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import StreamTableEnvironment, Schema
from pyflink.datastream.functions import ProcessFunction, MapFunction
 # init task env
config = Configuration()
 # config.set_string("python.execution-mode", "thread")
config.set_string("python.execution-mode", "process")
config.set_string("python.client.executable", "/root/miniconda3/bin/python3")
config.set_string("python.executable", "/root/miniconda3/bin/python3")

env = StreamExecutionEnvironment.get_execution_environment(config)
table_env = StreamTableEnvironment.create(env)
 # create a batch TableEnvironment
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", 
"content")
table_env.create_temporary_view("test_table", table)

result_table = table_env.sql_query("select *, NOW() as dt from test_table")
result_ds = table_env.to_data_stream(result_table)

def test_func(row):
    print(row)
    return row

result_ds.map(test_func)

env.execute()

```

output in process mode:

```

Row(id=1, content='Hi', dt=Instant<1713609386, 27100>)
Row(id=2, content='Hello', dt=Instant<1713609386, 58000>)

```

output in thread mode:

```

Row(id=1, content='Hi', dt=)
Traceback (most recent call last):
  File "/home/disk1/yuanwei/bug.py", line 31, in 
    env.execute()
  File 
"/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py",
 line 773, in execute
    return 
JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", 
line 1322, in _{_}call{_}_
    return_value = get_return_value(
  File 
"/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", 
line 146, in deco
    return f(*a, **kw)
  File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 
326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at 
org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
        

[jira] [Created] (FLINK-35180) Instant in row doesn't convert to correct type in python process mode

2024-04-20 Thread Wei Yuan (Jira)
Wei Yuan created FLINK-35180:


 Summary: Instant in row doesn't convert to correct type in python 
process mode
 Key: FLINK-35180
 URL: https://issues.apache.org/jira/browse/FLINK-35180
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Wei Yuan


use 

```

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Types, WatermarkStrategy, Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import StreamTableEnvironment, Schema
from pyflink.datastream.functions import ProcessFunction, MapFunction

# init task env
config = Configuration()
# config.set_string("python.execution-mode", "thread")
config.set_string("python.execution-mode", "process")
config.set_string("python.client.executable", "/root/miniconda3/bin/python3")
config.set_string("python.executable", "/root/miniconda3/bin/python3")

env = StreamExecutionEnvironment.get_execution_environment(config)
table_env = StreamTableEnvironment.create(env)

# create a batch TableEnvironment
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", 
"content")
table_env.create_temporary_view("test_table", table)

result_table = table_env.sql_query("select *, NOW() as dt from test_table")
result_ds = table_env.to_data_stream(result_table)

def test_func(row):
    print(row)
    return row

result_ds.map(test_func)

env.execute()

```

output in process mode:

```

Row(id=1, content='Hi', dt=Instant<1713609386, 27100>)
Row(id=2, content='Hello', dt=Instant<1713609386, 58000>)

```

output in thread mode:

```

Row(id=1, content='Hi', dt=)
Traceback (most recent call last):
  File "/home/disk1/yuanwei/bug.py", line 31, in 
    env.execute()
  File 
"/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py",
 line 773, in execute
    return 
JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", 
line 1322, in __call__
    return_value = get_return_value(
  File 
"/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", 
line 146, in deco
    return f(*a, **kw)
  File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 
326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
        at 

[jira] (FLINK-33463) Support the implementation of dynamic source tables based on the new source

2024-04-20 Thread RocMarshal (Jira)


[ https://issues.apache.org/jira/browse/FLINK-33463 ]


RocMarshal deleted comment on FLINK-33463:


was (Author: rocmarshal):
The ticket is mainly to do the three items:
1. Support the implementation of dynamic source/factories tables based on the 
new source

2. Mark the old APIs about dynamic table source or factories as Deprecated.
3. Supplement the docs about the usage of stream semantic table or other 
extended feature if needed.

> Support the implementation of dynamic source tables based on the new source
> ---
>
> Key: FLINK-33463
> URL: https://issues.apache.org/jira/browse/FLINK-33463
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / JDBC
>Reporter: RocMarshal
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35179) add postgres pipeline data sink connector

2024-04-20 Thread melin (Jira)
melin created FLINK-35179:
-

 Summary: add postgres pipeline data sink connector
 Key: FLINK-35179
 URL: https://issues.apache.org/jira/browse/FLINK-35179
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Reporter: melin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35178) Checkpoint CLAIM mode does not fully control snapshot ownership

2024-04-20 Thread elon_X (Jira)
elon_X created FLINK-35178:
--

 Summary: Checkpoint CLAIM mode does not fully control snapshot 
ownership
 Key: FLINK-35178
 URL: https://issues.apache.org/jira/browse/FLINK-35178
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.18.0
Reporter: elon_X
 Attachments: image-2024-04-20-14-51-21-062.png

When I enable incremental checkpointing, and the task fails or is canceled for 
some reason, restarting the task from {{-s checkpoint_path}} with {{restoreMode 
CLAIM}} allows the Flink job to recover from the last checkpoint, it just 
discards the previous checkpoint.

Then I found that this leads to the following two cases:

1. If the new checkpoint_x meta file does not reference files in the shared 
directory under the previous jobID:         

the shared and taskowned directories from the previous Job will be left as 
empty directories, and these two directories will persist without being deleted 
by Flink. !image-2024-04-20-14-51-21-062.png!

2. If the new checkpoint_x meta file references files in the shared directory 
under the previous jobID:

the chk-(x-1) from the previous job will be discarded, but there will still be 
state data in the shared directory under that job, which might persist for a 
relatively long time. Here arises the question: the previous job is no longer 
running, and it's unclear whether users should delete the state data. Deleting 
it could lead to errors when the task is restarted, as the meta might reference 
files that can no longer be found; this could be confusing for users.

 

The potential solution might be to reuse the previous job's jobID when 
restoring from {{{}-s checkpoint_path{}}}, or to add a new parameter that 
allows users to specify the jobID they want to recover from;

 

Please correct me if there's anything I've misunderstood.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35160) Support for Thread Dump provides a convenient way to display issues of thread deadlocks in tasks

2024-04-20 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35160:
---
 Attachment: image-2024-04-20-14-43-36-939.png
Description: 
After receiving feedback from the business side about performance issues in 
their tasks, we attempted to troubleshoot and discovered that their tasks had 
issues with thread deadlocks. However, the Thread Dump entry on the Flink page 
only shows thread stacks. Since the users are not very familiar with Java 
stacks, they couldn't clearly identify that the deadlocks were due to issues in 
the business logic code and mistakenly thought they were problems with the 
Flink framework

!image-2024-04-18-20-57-52-440.png!

!image-2024-04-18-20-58-09-872.png!

the JVM's jstack command can clearly display thread deadlocks, unfortunately, 
the business team does not have the permissions to log into the machines.  hear 
is the jstack log

!image-2024-04-20-14-43-36-939.png!

FlameGraph are excellent for visualizing performance bottlenecks and hotspots 
in application profiling but are not designed to pinpoint the exact lines of 
code where thread deadlocks occur.

!image-2024-04-18-21-01-22-881.png!

Perhaps we could enhance the Thread Dump feature to display thread deadlocks, 
similar to what the {{jstack}} command provides.

 

!image-2024-04-18-21-34-41-014.png!

  was:
After receiving feedback from the business side about performance issues in 
their tasks, we attempted to troubleshoot and discovered that their tasks had 
issues with thread deadlocks. However, the Thread Dump entry on the Flink page 
only shows thread stacks. Since the users are not very familiar with Java 
stacks, they couldn't clearly identify that the deadlocks were due to issues in 
the business logic code and mistakenly thought they were problems with the 
Flink framework

!image-2024-04-18-20-57-52-440.png!

!image-2024-04-18-20-58-09-872.png!

the JVM's jstack command can clearly display thread deadlocks, unfortunately, 
the business team does not have the permissions to log into the machines.  hear 
is the jstack log

!image-2024-04-18-21-00-04-532.png!

FlameGraph are excellent for visualizing performance bottlenecks and hotspots 
in application profiling but are not designed to pinpoint the exact lines of 
code where thread deadlocks occur.

!image-2024-04-18-21-01-22-881.png!

Perhaps we could enhance the Thread Dump feature to display thread deadlocks, 
similar to what the {{jstack}} command provides.

 

!image-2024-04-18-21-34-41-014.png!


> Support for Thread Dump provides a convenient way to display issues of thread 
> deadlocks in tasks
> 
>
> Key: FLINK-35160
> URL: https://issues.apache.org/jira/browse/FLINK-35160
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.16.0, 1.17.1, 1.19.0, 1.18.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-18-20-57-52-440.png, 
> image-2024-04-18-20-58-09-872.png, image-2024-04-18-21-01-22-881.png, 
> image-2024-04-18-21-34-41-014.png, image-2024-04-20-14-43-36-939.png
>
>
> After receiving feedback from the business side about performance issues in 
> their tasks, we attempted to troubleshoot and discovered that their tasks had 
> issues with thread deadlocks. However, the Thread Dump entry on the Flink 
> page only shows thread stacks. Since the users are not very familiar with 
> Java stacks, they couldn't clearly identify that the deadlocks were due to 
> issues in the business logic code and mistakenly thought they were problems 
> with the Flink framework
> !image-2024-04-18-20-57-52-440.png!
> !image-2024-04-18-20-58-09-872.png!
> the JVM's jstack command can clearly display thread deadlocks, unfortunately, 
> the business team does not have the permissions to log into the machines.  
> hear is the jstack log
> !image-2024-04-20-14-43-36-939.png!
> FlameGraph are excellent for visualizing performance bottlenecks and hotspots 
> in application profiling but are not designed to pinpoint the exact lines of 
> code where thread deadlocks occur.
> !image-2024-04-18-21-01-22-881.png!
> Perhaps we could enhance the Thread Dump feature to display thread deadlocks, 
> similar to what the {{jstack}} command provides.
>  
> !image-2024-04-18-21-34-41-014.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35160) Support for Thread Dump provides a convenient way to display issues of thread deadlocks in tasks

2024-04-20 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35160:
---
Attachment: (was: image-2024-04-18-21-00-04-532.png)

> Support for Thread Dump provides a convenient way to display issues of thread 
> deadlocks in tasks
> 
>
> Key: FLINK-35160
> URL: https://issues.apache.org/jira/browse/FLINK-35160
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.16.0, 1.17.1, 1.19.0, 1.18.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-18-20-57-52-440.png, 
> image-2024-04-18-20-58-09-872.png, image-2024-04-18-21-01-22-881.png, 
> image-2024-04-18-21-34-41-014.png, image-2024-04-20-14-43-36-939.png
>
>
> After receiving feedback from the business side about performance issues in 
> their tasks, we attempted to troubleshoot and discovered that their tasks had 
> issues with thread deadlocks. However, the Thread Dump entry on the Flink 
> page only shows thread stacks. Since the users are not very familiar with 
> Java stacks, they couldn't clearly identify that the deadlocks were due to 
> issues in the business logic code and mistakenly thought they were problems 
> with the Flink framework
> !image-2024-04-18-20-57-52-440.png!
> !image-2024-04-18-20-58-09-872.png!
> the JVM's jstack command can clearly display thread deadlocks, unfortunately, 
> the business team does not have the permissions to log into the machines.  
> hear is the jstack log
> !image-2024-04-18-21-00-04-532.png!
> FlameGraph are excellent for visualizing performance bottlenecks and hotspots 
> in application profiling but are not designed to pinpoint the exact lines of 
> code where thread deadlocks occur.
> !image-2024-04-18-21-01-22-881.png!
> Perhaps we could enhance the Thread Dump feature to display thread deadlocks, 
> similar to what the {{jstack}} command provides.
>  
> !image-2024-04-18-21-34-41-014.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35144) Support various sources sync for FlinkCDC in one pipeline

2024-04-20 Thread melin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839198#comment-17839198
 ] 

melin commented on FLINK-35144:
---

Yes, the business system database consists of multiple database instances, up 
to 16 service instances, merged and synchronized to kafka.

分库分表业务场景,数据库分布是在多个服务器实例上

> Support various sources sync for FlinkCDC in one pipeline
> -
>
> Key: FLINK-35144
> URL: https://issues.apache.org/jira/browse/FLINK-35144
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Congxian Qiu
>Priority: Major
>
> Currently, the FlinkCDC pipeline can only support a single source in one 
> pipeline, we need to start multiple pipelines when there are various sources. 
> For upstream which uses sharding, we need to sync multiple sources in one 
> pipeline, the current pipeline can't do this because it can only support a 
> single source.
> This issue wants to support the sync of multiple sources in one pipeline.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)