[PR] W.I.P [flink]
JunRuiLee opened a new pull request, #23642: URL: https://github.com/apache/flink/pull/23642 test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33235][doc] Update OLAP QuickStart doc and translate to Chinese [flink]
KarmaGYZ commented on code in PR #23523: URL: https://github.com/apache/flink/pull/23523#discussion_r1378445286 ## docs/content.zh/docs/dev/table/olap_quickstart.md: ## @@ -24,64 +24,72 @@ specific language governing permissions and limitations under the License. --> -# Introduction -Flink OLAP has already added to [Apache Flink Roadmap](https://flink.apache.org/roadmap/). It means Flink can not only support streaming and batch computing, but also support OLAP(On-Line Analytical Processing). This page will show how to quickly set up a Flink OLAP service, and will introduce some best practices. +OLAP(OnLine Analysis Processing)是数据分析领域的一项关键技术,通常被用来对较大的数据集进行秒级的复杂查询分析。Flink 作为一款流批一体的计算引擎,现在也同样支持用户将其作为一个 OLAP 计算服务来部署。本文将会帮助你在本地快速搭建起一个 Flink OLAP 集群并试用。同时,也会介绍一些在实际生产环境中使用 Flink 作为 OLAP 计算服务的实践。 -## Architecture +# 架构介绍 -The Flink OLAP service consists of three parts: Client, Flink SQL Gateway, Flink Session Cluster. +本章节将会介绍 Flink 作为一个 OLAP 服务的总体架构及其在使用上的优势。 -* **Client**: Could be any client that can interact with Flink SQL Gateway, such as SQL client, Flink JDBC driver and so on. -* **Flink SQL Gateway**: The SQL Gateway provides an easy way to submit the Flink Job, look up the metadata, and analyze table stats. -* **Flink Session Cluster**: We choose session clusters to run OLAP queries, mainly to avoid the overhead of cluster startup. +## 架构 -## Advantage +Flink OLAP 服务整体由3个部分组成,包括:客户端,Flink SQL Gateway 和 Flink Session Cluster。 -* **Massively Parallel Processing** - * Flink OLAP runs naturally as an MPP(Massively Parallel Processing) system, which supports low-latency ad-hoc queries -* **Reuse Connectors** - * Flink OLAP can reuse rich connectors in Flink ecosystem. -* **Unified Engine** - * Unified computing engine for Streaming/Batch/OLAP. +* **客户端**: 可以是任何可以和 Flink SQL Gateway 交互的客户端,包括:SQL Client,Flink JDBC Driver 等等; Review Comment: Add links for them. ## docs/content.zh/docs/dev/table/olap_quickstart.md: ## @@ -102,98 +110,98 @@ GROUP BY buyer ORDER BY total_cost LIMIT 3; ``` -And then you could find job detail information in web UI at localhost:8081. +具体的作业运行信息你可以通过访问本地的 Web UI(http://localhost:8081)来获取。 -# Deploying in Production +# 生产环境部署 -This section guides you through setting up a production ready Flink OLAP service. +这个章节会向你介绍一些在生产环境中使用 Flink OLAP 服务的建议。 -## Cluster Deployment +## 客户端 -In production, we recommend to use Flink Session Cluster, Flink SQL Gateway and Flink JDBC Driver to build an OLAP service. +### Flink JDBC Driver -### Session Cluster +Flink JDBC Driver 提供了底层的连接管理能力,方便用户使用并向 SQL Gateway 提交查询请求。在实际的生产使用中,用户需要注意如何复用 JDBC 连接,来避免 Gateway 频繁的执行 Session 相关的创建及关闭操作,从而减少端到端的作业耗时。详细信息可以参考文档 [Flink JDBC Driver]({{{}}})。 Review Comment: broken link. ## docs/content.zh/docs/dev/table/olap_quickstart.md: ## @@ -24,64 +24,72 @@ specific language governing permissions and limitations under the License. --> -# Introduction -Flink OLAP has already added to [Apache Flink Roadmap](https://flink.apache.org/roadmap/). It means Flink can not only support streaming and batch computing, but also support OLAP(On-Line Analytical Processing). This page will show how to quickly set up a Flink OLAP service, and will introduce some best practices. +OLAP(OnLine Analysis Processing)是数据分析领域的一项关键技术,通常被用来对较大的数据集进行秒级的复杂查询分析。Flink 作为一款流批一体的计算引擎,现在也同样支持用户将其作为一个 OLAP 计算服务来部署。本文将会帮助你在本地快速搭建起一个 Flink OLAP 集群并试用。同时,也会介绍一些在实际生产环境中使用 Flink 作为 OLAP 计算服务的实践。 -## Architecture +# 架构介绍 -The Flink OLAP service consists of three parts: Client, Flink SQL Gateway, Flink Session Cluster. +本章节将会介绍 Flink 作为一个 OLAP 服务的总体架构及其在使用上的优势。 -* **Client**: Could be any client that can interact with Flink SQL Gateway, such as SQL client, Flink JDBC driver and so on. -* **Flink SQL Gateway**: The SQL Gateway provides an easy way to submit the Flink Job, look up the metadata, and analyze table stats. -* **Flink Session Cluster**: We choose session clusters to run OLAP queries, mainly to avoid the overhead of cluster startup. +## 架构 -## Advantage +Flink OLAP 服务整体由3个部分组成,包括:客户端,Flink SQL Gateway 和 Flink Session Cluster。 -* **Massively Parallel Processing** - * Flink OLAP runs naturally as an MPP(Massively Parallel Processing) system, which supports low-latency ad-hoc queries -* **Reuse Connectors** - * Flink OLAP can reuse rich connectors in Flink ecosystem. -* **Unified Engine** - * Unified computing engine for Streaming/Batch/OLAP. +* **客户端**: 可以是任何可以和 Flink SQL Gateway 交互的客户端,包括:SQL Client,Flink JDBC Driver 等等; +* **Flink SQL Gateway**: Flink SQL Gateway 服务主要用作 SQL 解析、元数据获取、统计信息分析、Plan 优化和集群作业提交; +* **Flink Session Cluster**: OLAP 查询建议运行在 Session 集群上,主要是可以减少集群启动时的额外开销; Review Comment: ditto for session cluster. ## docs/content.zh/docs/dev/table/olap_quickstart.md: ## @@ -24,64 +24,72 @@ specific language governing permissions and limitations under the License. --> -# Intr
[jira] [Created] (FLINK-33424) Resolve the problem that yarnClient cannot load yarn configurations
zhengzhili created FLINK-33424: -- Summary: Resolve the problem that yarnClient cannot load yarn configurations Key: FLINK-33424 URL: https://issues.apache.org/jira/browse/FLINK-33424 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.17.1 Reporter: zhengzhili Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33423) Resolve the problem that yarnClient cannot load yarn configurations
zhengzhili created FLINK-33423: -- Summary: Resolve the problem that yarnClient cannot load yarn configurations Key: FLINK-33423 URL: https://issues.apache.org/jira/browse/FLINK-33423 Project: Flink Issue Type: Bug Components: Client / Job Submission Reporter: zhengzhili -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781613#comment-17781613 ] Fang Yong commented on FLINK-31275: --- Hi [~mobuchowski], thanks for your comments. In the currently FLIP the `LineageVertex` is the top interface for vertexes in lineage graph, it will be used in flink sql jobs and datastream jobs. 1. For table connectors in sql jobs, there will be `TableLineageVertex` which is generated from flink catalog based table and provides catalog context, table schema for specified connector. The table lineage vertex and edge implementations will be created from dynamic tables for connectors in flink, and they will be updated when the connectors are updated. 2. For customized source/sink in datastream jobs, we can get source and slink `LineageVertex` implementations from `LineageVertexProvider`. When users implement customized lineage vertex and edge, they need to update them when their connectors are updated. IIUC, do you mean we should give an implementation of `LineageVertex` for datastream jobs and users can provide source/sink information there just like `TableLinageVertex` in sql jobs? Then listeners can use the datastream lineage vertex which is similar with table lineage vertex? Due to the flexibility of the source and sink in `DataStream`, we think it's hard to cover all of them, so we just provide `LineageVertex` and `LineageVertexProvider` for them. So we left this flexibility to users and listeners. If a custom connector is a table in `DataStream` job, users can return `TableLineageVertex` in the `LineageVertexProvider`. And for the following `LineageVertex` ``` public interface LineageVertex { /* Config for the lineage vertex contains all the options for the connector. */ Map config(); /* List of datasets that are consumed by this job */ List inputs(); /* List of datasets that are produced by this job */ List outputs(); } ``` We tend to provide independent edge descriptions of connectors in `LineageEdge` for lineage graph instead of adding dataset in `LineageVertex`. The `LineageVertex` here is the `DataSet` you mentioned. WDYT? Hope to hear from you, thanks > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31242] [docs] Correct the definition of creating functions in the SQL client documentation [flink]
YesOrNo828 closed pull request #22038: [FLINK-31242] [docs] Correct the definition of creating functions in the SQL client documentation URL: https://github.com/apache/flink/pull/22038 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31242] [docs] Correct the definition of creating functions in the SQL client documentation [flink]
YesOrNo828 commented on PR #22038: URL: https://github.com/apache/flink/pull/22038#issuecomment-1788483913 Closed. This #22948 has fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378433008 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior. This +will involve additional computation and I/O costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate +each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing `ExecutionEnvironment` with `StreamExecutionEnvironment`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Create the execution environment +ExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +ExecutionEnvironment.createLocalEnvironment(); +// Create the collection environment +new CollectionEnvironment(); +// Create the remote environment +ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Create the execution environment +StreamExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +StreamExecutionEnvironment.createLocalEnvironment(); +// The collection environment is not supported. +// Create the remote environment +StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + + + + +As the source of DataSet is always bounded, the `RuntimeMode` must be set to `RuntimeMode.BATCH` to make Flink execute in batch mode. + +```java +StreamExecutionEnvironment executionEnvironment = // [...]; +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Using the streaming sources and sinks + +### Sources + +The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the `DataSource`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Read data from file +DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataSource<> source = ExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataSource<> source = ExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + +{{< highlight "java" >}} +// Read data from file +DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + + + + +### Sinks + +The DataStream API uses `DataStreamSink` to write records to external system, while the +DataSet API uses the `DataSink`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Write to outputformat +DataSink<> sink = dataSet.output(outputFormat); +// Write to csv file +DataSink<> sink = dataSet.writeAsCsv(filePath); +// Write to text file +DataSink<> sink = dataSet.writeAsText(filePath); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Write to sink function or sink +DataStreamSink<> sink
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378418615 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior. This +will involve additional computation and I/O costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate +each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing `ExecutionEnvironment` with `StreamExecutionEnvironment`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Create the execution environment +ExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +ExecutionEnvironment.createLocalEnvironment(); +// Create the collection environment +new CollectionEnvironment(); +// Create the remote environment +ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Create the execution environment +StreamExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +StreamExecutionEnvironment.createLocalEnvironment(); +// The collection environment is not supported. +// Create the remote environment +StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + + + + +As the source of DataSet is always bounded, the `RuntimeMode` must be set to `RuntimeMode.BATCH` to make Flink execute in batch mode. + +```java +StreamExecutionEnvironment executionEnvironment = // [...]; +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Using the streaming sources and sinks + +### Sources + +The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the `DataSource`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Read data from file +DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataSource<> source = ExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataSource<> source = ExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + +{{< highlight "java" >}} +// Read data from file +DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + + + + +### Sinks + +The DataStream API uses `DataStreamSink` to write records to external system, while the +DataSet API uses the `DataSink`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Write to outputformat +DataSink<> sink = dataSet.output(outputFormat); +// Write to csv file +DataSink<> sink = dataSet.writeAsCsv(filePath); +// Write to text file +DataSink<> sink = dataSet.writeAsText(filePath); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Write to sink function or sink +DataStreamSink<> sink
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378417184 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior. This +will involve additional computation and I/O costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate +each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing `ExecutionEnvironment` with `StreamExecutionEnvironment`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Create the execution environment +ExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +ExecutionEnvironment.createLocalEnvironment(); +// Create the collection environment +new CollectionEnvironment(); +// Create the remote environment +ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Create the execution environment +StreamExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +StreamExecutionEnvironment.createLocalEnvironment(); +// The collection environment is not supported. +// Create the remote environment +StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + + + + +As the source of DataSet is always bounded, the `RuntimeMode` must be set to `RuntimeMode.BATCH` to make Flink execute in batch mode. + +```java +StreamExecutionEnvironment executionEnvironment = // [...]; +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Using the streaming sources and sinks + +### Sources + +The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the `DataSource`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Read data from file +DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataSource<> source = ExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataSource<> source = ExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + +{{< highlight "java" >}} +// Read data from file +DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + + + + +### Sinks + +The DataStream API uses `DataStreamSink` to write records to external system, while the +DataSet API uses the `DataSink`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Write to outputformat +DataSink<> sink = dataSet.output(outputFormat); +// Write to csv file +DataSink<> sink = dataSet.writeAsCsv(filePath); +// Write to text file +DataSink<> sink = dataSet.writeAsText(filePath); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Write to sink function or sink +DataStreamSink<> sink
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378417033 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior. This +will involve additional computation and I/O costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate +each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing `ExecutionEnvironment` with `StreamExecutionEnvironment`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Create the execution environment +ExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +ExecutionEnvironment.createLocalEnvironment(); +// Create the collection environment +new CollectionEnvironment(); +// Create the remote environment +ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Create the execution environment +StreamExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +StreamExecutionEnvironment.createLocalEnvironment(); +// The collection environment is not supported. +// Create the remote environment +StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + + + + +As the source of DataSet is always bounded, the `RuntimeMode` must be set to `RuntimeMode.BATCH` to make Flink execute in batch mode. + +```java +StreamExecutionEnvironment executionEnvironment = // [...]; +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Using the streaming sources and sinks + +### Sources + +The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the `DataSource`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Read data from file +DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataSource<> source = ExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataSource<> source = ExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + +{{< highlight "java" >}} +// Read data from file +DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + + + + +### Sinks + +The DataStream API uses `DataStreamSink` to write records to external system, while the +DataSet API uses the `DataSink`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Write to outputformat +DataSink<> sink = dataSet.output(outputFormat); +// Write to csv file +DataSink<> sink = dataSet.writeAsCsv(filePath); +// Write to text file +DataSink<> sink = dataSet.writeAsText(filePath); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Write to sink function or sink +DataStreamSink<> sink
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378416760 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior. This +will involve additional computation and I/O costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate +each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing `ExecutionEnvironment` with `StreamExecutionEnvironment`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Create the execution environment +ExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +ExecutionEnvironment.createLocalEnvironment(); +// Create the collection environment +new CollectionEnvironment(); +// Create the remote environment +ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Create the execution environment +StreamExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +StreamExecutionEnvironment.createLocalEnvironment(); +// The collection environment is not supported. +// Create the remote environment +StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + + + + +As the source of DataSet is always bounded, the `RuntimeMode` must be set to `RuntimeMode.BATCH` to make Flink execute in batch mode. + +```java +StreamExecutionEnvironment executionEnvironment = // [...]; +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Using the streaming sources and sinks + +### Sources + +The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the `DataSource`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Read data from file +DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataSource<> source = ExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataSource<> source = ExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + +{{< highlight "java" >}} +// Read data from file +DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + + + + +### Sinks + +The DataStream API uses `DataStreamSink` to write records to external system, while the +DataSet API uses the `DataSink`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Write to outputformat +DataSink<> sink = dataSet.output(outputFormat); +// Write to csv file +DataSink<> sink = dataSet.writeAsCsv(filePath); +// Write to text file +DataSink<> sink = dataSet.writeAsText(filePath); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Write to sink function or sink +DataStreamSink<> sink
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378416526 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior. This +will involve additional computation and I/O costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate +each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing `ExecutionEnvironment` with `StreamExecutionEnvironment`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Create the execution environment +ExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +ExecutionEnvironment.createLocalEnvironment(); +// Create the collection environment +new CollectionEnvironment(); +// Create the remote environment +ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Create the execution environment +StreamExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +StreamExecutionEnvironment.createLocalEnvironment(); +// The collection environment is not supported. +// Create the remote environment +StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + + + + +As the source of DataSet is always bounded, the `RuntimeMode` must be set to `RuntimeMode.BATCH` to make Flink execute in batch mode. + +```java +StreamExecutionEnvironment executionEnvironment = // [...]; +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Using the streaming sources and sinks + +### Sources + +The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the `DataSource`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Read data from file +DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataSource<> source = ExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataSource<> source = ExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + +{{< highlight "java" >}} +// Read data from file +DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + + + + +### Sinks + +The DataStream API uses `DataStreamSink` to write records to external system, while the +DataSet API uses the `DataSink`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Write to outputformat +DataSink<> sink = dataSet.output(outputFormat); +// Write to csv file +DataSink<> sink = dataSet.writeAsCsv(filePath); +// Write to text file +DataSink<> sink = dataSet.writeAsText(filePath); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Write to sink function or sink +DataStreamSink<> sink
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378416167 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior. This +will involve additional computation and I/O costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate +each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing `ExecutionEnvironment` with `StreamExecutionEnvironment`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Create the execution environment +ExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +ExecutionEnvironment.createLocalEnvironment(); +// Create the collection environment +new CollectionEnvironment(); +// Create the remote environment +ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Create the execution environment +StreamExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +StreamExecutionEnvironment.createLocalEnvironment(); +// The collection environment is not supported. +// Create the remote environment +StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + + + + +As the source of DataSet is always bounded, the `RuntimeMode` must be set to `RuntimeMode.BATCH` to make Flink execute in batch mode. + +```java +StreamExecutionEnvironment executionEnvironment = // [...]; +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Using the streaming sources and sinks + +### Sources + +The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the `DataSource`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Read data from file +DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataSource<> source = ExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataSource<> source = ExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + +{{< highlight "java" >}} +// Read data from file +DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + + + + +### Sinks + +The DataStream API uses `DataStreamSink` to write records to external system, while the +DataSet API uses the `DataSink`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Write to outputformat +DataSink<> sink = dataSet.output(outputFormat); +// Write to csv file +DataSink<> sink = dataSet.writeAsCsv(filePath); +// Write to text file +DataSink<> sink = dataSet.writeAsText(filePath); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Write to sink function or sink +DataStreamSink<> sink
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378414045 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior. This +will involve additional computation and I/O costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate +each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing `ExecutionEnvironment` with `StreamExecutionEnvironment`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Create the execution environment +ExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +ExecutionEnvironment.createLocalEnvironment(); +// Create the collection environment +new CollectionEnvironment(); +// Create the remote environment +ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Create the execution environment +StreamExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +StreamExecutionEnvironment.createLocalEnvironment(); +// The collection environment is not supported. +// Create the remote environment +StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + + + + +As the source of DataSet is always bounded, the `RuntimeMode` must be set to `RuntimeMode.BATCH` to make Flink execute in batch mode. + +```java +StreamExecutionEnvironment executionEnvironment = // [...]; +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Using the streaming sources and sinks + +### Sources + +The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the `DataSource`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Read data from file +DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataSource<> source = ExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataSource<> source = ExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + +{{< highlight "java" >}} +// Read data from file +DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat) +{{< /highlight >}} + + + + + +### Sinks + +The DataStream API uses `DataStreamSink` to write records to external system, while the +DataSet API uses the `DataSink`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Write to outputformat +DataSink<> sink = dataSet.output(outputFormat); +// Write to csv file +DataSink<> sink = dataSet.writeAsCsv(filePath); +// Write to text file +DataSink<> sink = dataSet.writeAsText(filePath); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Write to sink function or sink +DataStreamSink<> sink
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378413855 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior. This +will involve additional computation and I/O costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate +each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing `ExecutionEnvironment` with `StreamExecutionEnvironment`. + + + + +DataSet +DataStream + + + + + +{{< highlight "java" >}} +// Create the execution environment +ExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +ExecutionEnvironment.createLocalEnvironment(); +// Create the collection environment +new CollectionEnvironment(); +// Create the remote environment +ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + +{{< highlight "java" >}} +// Create the execution environment +StreamExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +StreamExecutionEnvironment.createLocalEnvironment(); +// The collection environment is not supported. +// Create the remote environment +StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); +{{< /highlight >}} + + + + + +As the source of DataSet is always bounded, the `RuntimeMode` must be set to `RuntimeMode.BATCH` to make Flink execute in batch mode. Review Comment: I have replaced the original text with this sentence. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378413349 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior. This +will involve additional computation and I/O costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate +each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing `ExecutionEnvironment` with `StreamExecutionEnvironment`. Review Comment: I have replaced the original text with this sentence. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378413003 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior. This +will involve additional computation and I/O costs. Review Comment: I have replaced the original text with this sentence. ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior. This +will involve additional computation and I/O costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. Review Comment: I have replaced the original text with this sentence. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378412748 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. Review Comment: I have replaced the original text with this sentence. ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. Review Comment: I have replaced the original text with this sentence. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378412480 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior. This +will involve additional computation and I/O costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378412317 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. Review Comment: I have replaced the original text with these sentences. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32309][sql-gateway] Use independent resource manager for table environment [flink]
FangYongs commented on code in PR #22768: URL: https://github.com/apache/flink/pull/22768#discussion_r1378393971 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java: ## @@ -176,14 +184,17 @@ public ResultFetcher configureSession(OperationHandle handle, String statement) return callSetOperation(tableEnv, handle, (SetOperation) op); } else if (op instanceof ResetOperation) { return callResetOperation(handle, (ResetOperation) op); +} else if (op instanceof AddJarOperation) { +return callExecutableOperation(handle, (ExecutableOperation) op); Review Comment: You're right, we don't need it, I'll remove this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26907][Tests] OffHeapUnsafeMemorySegmentTest.testCallCleanerOnceOnConcurrentFree prints IllegalStateException [flink]
flinkbot commented on PR #23641: URL: https://github.com/apache/flink/pull/23641#issuecomment-1788392864 ## CI report: * ccdfdc101e0b67b72aa5d5e6dfe291987ae72cbf UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-26907][Tests] OffHeapUnsafeMemorySegmentTest.testCallCleanerOnceOnConcurrentFree prints IllegalStateException [flink]
victor9309 opened a new pull request, #23641: URL: https://github.com/apache/flink/pull/23641 ## What is the purpose of the change *RMQSourceITCase failed on azure due to ContainerLaunchException: Container startup failed, Increase the number of TestContainer startup attempts* ## Brief change log - *Increase the number of TestContainer startup attempts from 1 to 3.* ## Verifying this change - *This change is already covered by existing tests, such as MinioTestContainerTest.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper:no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26088][Connectors/ElasticSearch] Add Elasticsearch 8.0 support [flink-connector-elasticsearch]
StefanXiepj commented on code in PR #74: URL: https://github.com/apache/flink-connector-elasticsearch/pull/74#discussion_r1378366427 ## flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8Sink.java: ## @@ -0,0 +1,85 @@ +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.DeliveryGuarantee; + +import org.apache.http.HttpHost; + +import javax.net.ssl.SSLContext; + +import java.io.IOException; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Flink Sink to insert or update data in an Elasticsearch index. The sink supports the following + * delivery guarantees. + * + * + * {@link DeliveryGuarantee#NONE} does not provide any guarantees: actions are flushed to + * Elasticsearch only depending on the configurations of the bulk processor. In case of a + * failure, it might happen that actions are lost if the bulk processor still has buffered + * actions. + * {@link DeliveryGuarantee#AT_LEAST_ONCE} on a checkpoint the sink will wait until all + * buffered actions are flushed to and acknowledged by Elasticsearch. No actions will be lost + * but actions might be sent to Elasticsearch multiple times when Flink restarts. These + * additional requests may cause inconsistent data in ElasticSearch right after the restart, + * but eventually everything will be consistent again. + * + * + * @param type of the records converted to Elasticsearch actions + * @see Elasticsearch8SinkBuilderBase on how to construct a ElasticsearchSink + */ +@PublicEvolving +public class Elasticsearch8Sink implements Sink { Review Comment: I am also implementing the table API. I stopped pushing new code because another PR is also in progress and I am not sure which solution the community thinks is better. I have made many changes, which may not be what the community wants to see. I would like to discuss the repair plan in depth with you, if you don't mind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32426][table-runtime] Fix adaptive local hash can't work when auxGrouping exists [flink]
flinkbot commented on PR #23640: URL: https://github.com/apache/flink/pull/23640#issuecomment-1788357441 ## CI report: * 68d2083bb1e8be0e89408a8dc73e1c26646f4981 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33014) flink jobmanager raise java.io.IOException: Connection reset by peer
[ https://issues.apache.org/jira/browse/FLINK-33014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhu closed FLINK-33014. --- Resolution: Not A Bug Insufficient resource allocation for Tm > flink jobmanager raise java.io.IOException: Connection reset by peer > - > > Key: FLINK-33014 > URL: https://issues.apache.org/jira/browse/FLINK-33014 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.1 > Environment: |*blob.server.port*|6124| > |*classloader.resolve-order*|parent-first| > |*jobmanager.execution.failover-strategy*|region| > |*jobmanager.memory.heap.size*|2228014280b| > |*jobmanager.memory.jvm-metaspace.size*|536870912b| > |*jobmanager.memory.jvm-overhead.max*|322122552b| > |*jobmanager.memory.jvm-overhead.min*|322122552b| > |*jobmanager.memory.off-heap.size*|134217728b| > |*jobmanager.memory.process.size*|3gb| > |*jobmanager.rpc.address*|naf-flink-ms-flink-manager-1-59m7w| > |*jobmanager.rpc.port*|6123| > |*parallelism.default*|1| > |*query.server.port*|6125| > |*rest.address*|0.0.0.0| > |*rest.bind-address*|0.0.0.0| > |*rest.connection-timeout*|6| > |*rest.server.numThreads*|8| > |*slot.request.timeout*|300| > |*state.backend.rocksdb.localdir*|/home/nafplat/data/flinkStateStore| > |*state.backend.type*|rocksdb| > |*taskmanager.bind-host*|0.0.0.0| > |*taskmanager.host*|0.0.0.0| > |*taskmanager.memory.framework.off-heap.batch-shuffle.size*|256mb| > |*taskmanager.memory.framework.off-heap.size*|512mb| > |*taskmanager.memory.managed.fraction*|0.4| > |*taskmanager.memory.network.fraction*|0.2| > |*taskmanager.memory.process.size*|5gb| > |*taskmanager.memory.task.off-heap.size*|268435456bytes| > |*taskmanager.numberOfTaskSlots*|2| > |*taskmanager.runtime.large-record-handler*|true| > |*web.submit.enable*|true| > |*web.tmpdir*|/tmp/flink-web-c1b57e2b-5426-4fb8-a9ce-5acd1cceefc9| > |*web.upload.dir*|/opt/flink/nafJar| >Reporter: zhu >Priority: Major > > > The Flink cluster was deployed using the Docker image of Flink 1.17.1 java8. > After deployment, on k8s, in standalone form, jobmanager printed this error > at intervals, and taskmanager did not print any errors, > There are currently no jobs running > {code:java} > 2023-09-01 11:34:14,293 WARN > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] - Unhandled > exception > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_372] > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > ~[?:1.8.0_372] > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > ~[?:1.8.0_372] > at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_372] > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) > ~[?:1.8.0_372] > at > org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:258) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151) > [flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) > [flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) > [flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) > [flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) > [flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) > [flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > [flink-dist-1.17.1.jar:1.17.1] > at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32943) run batch tasks concurrently, the tasks still in the initialization status
[ https://issues.apache.org/jira/browse/FLINK-32943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhu closed FLINK-32943. --- Resolution: Not A Bug > run batch tasks concurrently, the tasks still in the initialization status > -- > > Key: FLINK-32943 > URL: https://issues.apache.org/jira/browse/FLINK-32943 > Project: Flink > Issue Type: Bug >Affects Versions: 1.15.2 > Environment: flink 1.15.2 > > |*lob.server.port*|6124| > |*classloader.resolve-order*|parent-first| > |*jobmanager.execution.failover-strategy*|region| > |*jobmanager.memory.heap.size*|2228014280b| > |*jobmanager.memory.jvm-metaspace.size*|536870912b| > |*jobmanager.memory.jvm-overhead.max*|322122552b| > |*jobmanager.memory.jvm-overhead.min*|322122552b| > |*jobmanager.memory.off-heap.size*|134217728b| > |*jobmanager.memory.process.size*|3gb| > |*jobmanager.rpc.address*|naf-flink-ms-flink-manager-1-4gcwz| > |*jobmanager.rpc.port*|6123| > |*parallelism.default*|1| > |*query.server.port*|6125| > |*rest.address*|0.0.0.0| > |*rest.bind-address*|0.0.0.0| > |*rest.connection-timeout*|6| > |*rest.server.numThreads*|8| > |*slot.request.timeout*|300| > |*state.backend.rocksdb.localdir*|/home/nafplat/data/flinkStateStore| > |*state.backend.type*|rocksdb| > |*taskmanager.bind-host*|0.0.0.0| > |*taskmanager.host*|0.0.0.0| > |*taskmanager.memory.framework.off-heap.batch-shuffle.size*|256mb| > |*taskmanager.memory.framework.off-heap.size*|512mb| > |*taskmanager.memory.managed.fraction*|0.4| > |*taskmanager.memory.network.fraction*|0.2| > |*taskmanager.memory.process.size*|16gb| > |*taskmanager.memory.task.off-heap.size*|268435456bytes| > |*taskmanager.numberOfTaskSlots*|6| > |*taskmanager.runtime.large-record-handler*|true| > |*web.submit.enable*|true| > |*web.tmpdir*|/tmp/flink-web-4be192ba-870a-4f88-8185-d07fa6303cca| > |*web.upload.dir*|/opt/flink/nafJar| >Reporter: zhu >Priority: Major > > run 1.15.2 flink session on k8s,In most cases, there is no problem. > Sometimes, tasks are initialized continuously, and subsequent tasks are also > initialized continuously,and > i find jobmanager thread dump jobmanager-io thread all blocked, > I run batch job with 6 concurrent,jobmanage with 2cpu and 3g Memory > When this situation occurs, i find this source code will still loop > public static void waitUntilJobInitializationFinished( > SupplierWithException jobStatusSupplier, > SupplierWithException jobResultSupplier, > ClassLoader userCodeClassloader) > throws JobInitializationException { > LOG.debug("Wait until job initialization is finished"); > WaitStrategy waitStrategy = new ExponentialWaitStrategy(50, 2000); > try { > JobStatus status = jobStatusSupplier.get(); > long attempt = 0; > while (status == JobStatus.INITIALIZING) { > Thread.sleep(waitStrategy.sleepTime(attempt++)); > status = jobStatusSupplier.get(); > } > if (status == JobStatus.FAILED) { > JobResult result = jobResultSupplier.get(); > Optional throwable = > result.getSerializedThrowable(); > if (throwable.isPresent()) { > Throwable t = > throwable.get().deserializeError(userCodeClassloader); > if (t instanceof JobInitializationException) { > throw t; > } > } > } > } catch (JobInitializationException initializationException) { > throw initializationException; > } catch (Throwable throwable) { > ExceptionUtils.checkInterrupted(throwable); > throw new RuntimeException("Error while waiting for job to be > initialized", throwable); > } > } > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-26907) RMQSourceITCase failed on azure due to ContainerLaunchException: Container startup failed
[ https://issues.apache.org/jira/browse/FLINK-26907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-26907: --- Labels: auto-deprioritized-major pull-request-available starter test-stability (was: auto-deprioritized-major starter test-stability) > RMQSourceITCase failed on azure due to ContainerLaunchException: Container > startup failed > - > > Key: FLINK-26907 > URL: https://issues.apache.org/jira/browse/FLINK-26907 > Project: Flink > Issue Type: Bug > Components: Build System, Connectors/ RabbitMQ >Affects Versions: 1.15.0, 1.16.0, rabbitmq-3.0.0 >Reporter: Yun Gao >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available, > starter, test-stability > > {code:java} > 2-03-28T09:41:01.3374229Z Mar 28 09:41:01 [ERROR] Tests run: 1, Failures: 0, > Errors: 1, Skipped: 0, Time elapsed: 91.834 s <<< FAILURE! - in > org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase > 2022-03-28T09:41:01.3375722Z Mar 28 09:41:01 [ERROR] > org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase Time elapsed: > 91.834 s <<< ERROR! > 2022-03-28T09:41:01.3376743Z Mar 28 09:41:01 > org.testcontainers.containers.ContainerLaunchException: Container startup > failed > 2022-03-28T09:41:01.3378470Z Mar 28 09:41:01 at > org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:336) > 2022-03-28T09:41:01.3379355Z Mar 28 09:41:01 at > org.testcontainers.containers.GenericContainer.start(GenericContainer.java:317) > 2022-03-28T09:41:01.3380117Z Mar 28 09:41:01 at > org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1066) > 2022-03-28T09:41:01.3381076Z Mar 28 09:41:01 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29) > 2022-03-28T09:41:01.3382198Z Mar 28 09:41:01 at > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2022-03-28T09:41:01.3383575Z Mar 28 09:41:01 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-03-28T09:41:01.3384717Z Mar 28 09:41:01 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > 2022-03-28T09:41:01.3385671Z Mar 28 09:41:01 at > org.junit.runner.JUnitCore.run(JUnitCore.java:137) > 2022-03-28T09:41:01.3386611Z Mar 28 09:41:01 at > org.junit.runner.JUnitCore.run(JUnitCore.java:115) > 2022-03-28T09:41:01.3387691Z Mar 28 09:41:01 at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > 2022-03-28T09:41:01.3388981Z Mar 28 09:41:01 at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) > 2022-03-28T09:41:01.3390250Z Mar 28 09:41:01 at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > 2022-03-28T09:41:01.3391619Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) > 2022-03-28T09:41:01.3393437Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) > 2022-03-28T09:41:01.3394826Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) > 2022-03-28T09:41:01.3396333Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) > 2022-03-28T09:41:01.3397800Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) > 2022-03-28T09:41:01.3399166Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) > 2022-03-28T09:41:01.3400315Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) > 2022-03-28T09:41:01.3401636Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86) > 2022-03-28T09:41:01.3403403Z Mar 28 09:41:01 at > org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) > 2022-03-28T09:41:01.3404823Z Mar 28 09:41:01 at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188) > 2022-03-28T09:41:01.3406517Z Mar 28 09:41:01 at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154) > 2022-03-28T09:41:01.3407936Z Mar 28 09:41:01 at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124) > 2022-03-28T09:41:01.3409418Z Mar 28 09:41:01 at > org.apache.maven.suref
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1378358266 ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +708,20 @@ public class TaskManagerOptions { "Time we wait for the timers in milliseconds to finish all pending timer threads" + " when the stream task is cancelled."); +public static final ConfigOption TASK_MANAGER_LOAD_BALANCE_MODE = Review Comment: hi, @1996fanrui Thank you very much for the precise comments ! I'd prefer the second and after a little adjustment as follows: 1. Add `@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)` 2. Temporarily comment out 'SLOT' mode in the `TaskManagerLoadBalanceMode` 3. Add the description about the configuration item like follows: ``` The load balance mode of taskmanager when processing scheduling tasks. 'NONE' means that the scheduler does not consider any dimensional balance when scheduling tasks. 'TASKS' means that the scheduler prioritizes ensuring that the number of tasks on each TM is balanced when scheduling tasks. Please view FLIP-370 for more details. ``` 4. After all PRs merged, we'll add the 'SLOTS' mode back, supplement the description and deprecate slot-spread-out. the added description would like follows: ``` 'SLOTS' indicates that the scheduler prioritizes and balances the use of each TM's slot when scheduling tasks. ``` Please let me know what's your opinion~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26907][Tests] OffHeapUnsafeMemorySegmentTest.testCallCleanerOnceOnConcurrentFree prints IllegalStateException [flink]
flinkbot commented on PR #23639: URL: https://github.com/apache/flink/pull/23639#issuecomment-1788353704 ## CI report: * b914c663f2955c007ac560a465e2799328ef3e04 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1378352765 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/BalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put in the same ExecutionSlotSharingGroup. Co-location + * constraints are ignored at present. + */ +class BalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy Review Comment: SGTM~ ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/BalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put in the same ExecutionSlotSharingGroup. Co-location + * constraints are ignored at present. Review Comment: I'll update the header comments. ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +708,20 @@ public class TaskManagerOptions { "Time we wait for the timers in mi
[PR] [BP-1.17][FLINK-32426][table-runtime] Fix adaptive local hash can't work when auxGrouping exists [flink]
lsyldliu opened a new pull request, #23640: URL: https://github.com/apache/flink/pull/23640 ## What is the purpose of the change *Fix adaptive local hash can't work when auxGrouping exists* ## Brief change log - *Fix adaptive local hash can't work when auxGrouping exists* ## Verifying this change This change added tests and can be verified as follows: - *Added integration tests in HashAggITCase* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-1.15][FLINK-31244][Tests] OffHeapUnsafeMemorySegmentTest.testCallCleanerOnceOnConcurrentFree prints IllegalStateException [flink]
victor9309 opened a new pull request, #23639: URL: https://github.com/apache/flink/pull/23639 ## What is the purpose of the change *RMQSourceITCase failed on azure due to ContainerLaunchException: Container startup failed, Increase the number of TestContainer startup attempts* ## Brief change log - *Increase the number of TestContainer startup attempts from 1 to 3.* ## Verifying this change - *This change is already covered by existing tests, such as MinioTestContainerTest.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper:no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26088][Connectors/ElasticSearch] Add Elasticsearch 8.0 support [flink-connector-elasticsearch]
liyubin117 commented on code in PR #74: URL: https://github.com/apache/flink-connector-elasticsearch/pull/74#discussion_r1378357036 ## flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8Sink.java: ## @@ -0,0 +1,85 @@ +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.DeliveryGuarantee; + +import org.apache.http.HttpHost; + +import javax.net.ssl.SSLContext; + +import java.io.IOException; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Flink Sink to insert or update data in an Elasticsearch index. The sink supports the following + * delivery guarantees. + * + * + * {@link DeliveryGuarantee#NONE} does not provide any guarantees: actions are flushed to + * Elasticsearch only depending on the configurations of the bulk processor. In case of a + * failure, it might happen that actions are lost if the bulk processor still has buffered + * actions. + * {@link DeliveryGuarantee#AT_LEAST_ONCE} on a checkpoint the sink will wait until all + * buffered actions are flushed to and acknowledged by Elasticsearch. No actions will be lost + * but actions might be sent to Elasticsearch multiple times when Flink restarts. These + * additional requests may cause inconsistent data in ElasticSearch right after the restart, + * but eventually everything will be consistent again. + * + * + * @param type of the records converted to Elasticsearch actions + * @see Elasticsearch8SinkBuilderBase on how to construct a ElasticsearchSink + */ +@PublicEvolving +public class Elasticsearch8Sink implements Sink { Review Comment: I cannot find any table api related implenmentions, and es6/es7 has implemented, you can refer to them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32309][sql-gateway] Use independent resource manager for table environment [flink]
KarmaGYZ commented on code in PR #22768: URL: https://github.com/apache/flink/pull/22768#discussion_r1364801781 ## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java: ## @@ -59,4 +68,10 @@ public URL unregisterJarResource(String jarPath) { String.format("Failed to unregister the jar resource [%s]", jarPath), e); } } + +@Override +public ResourceManager copy() { +return new ClientResourceManager( +localResourceDir, new HashMap<>(resourceInfos), userClassLoader); Review Comment: Do we also need to deep copy the localResourceDir and userclassLoader? ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ## @@ -232,6 +241,10 @@ public void addJarConfiguration(TableConfig tableConfig) { tableConfig.set(PipelineOptions.JARS, new ArrayList<>(jarFiles)); } +public ResourceManager copy() { +return new ResourceManager(localResourceDir, new HashMap<>(resourceInfos), userClassLoader); Review Comment: ditto ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java: ## @@ -176,14 +184,17 @@ public ResultFetcher configureSession(OperationHandle handle, String statement) return callSetOperation(tableEnv, handle, (SetOperation) op); } else if (op instanceof ResetOperation) { return callResetOperation(handle, (ResetOperation) op); +} else if (op instanceof AddJarOperation) { +return callExecutableOperation(handle, (ExecutableOperation) op); Review Comment: Now, all the operation will use the origin resource manager in this method. Is it in expected? If so, this branch might be unnecessary -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33358][sql] Fix Flink SQL Client fail to start in Flink on YARN [flink]
lsyldliu commented on PR #23629: URL: https://github.com/apache/flink/pull/23629#issuecomment-1788343978 > In my humble opinion, this change is fine. But I don't know much about the details of flink sql, and I'm not sure whether this change introduces additional risks. It would be better if there are flink sql experts to help and confirm. > > Hi @lsyldliu , would you mind helping take a look this PR in your free time? thanks~ cc @fsk119 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33317][runtime] Add cleaning mechanism for initial configs to reduce the memory usage [flink]
1996fanrui commented on PR #23589: URL: https://github.com/apache/flink/pull/23589#issuecomment-1788342771 Hi @pnowojski @RocMarshal , some unit tests failed when `getStreamOperatorFactory` throwing an exception after `SERIALIZED_UDF` is removed. I researched it for a while and found that it takes some time to refactor related tests. However, I don't have enough time this week because `flink-kubernetes-operator` will be released recently. And one autoscaler feature is nice to finished before this release, and this PR belongs to flink-1.19, it has enough time to develop. So I will work on `flink-kubernetes-operator` first, and back to this PR after that. Sorry for the delay, and thanks a lot for your quick review, have a good time to you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26088][Connectors/ElasticSearch] Add Elasticsearch 8.0 support [flink-connector-elasticsearch]
liyubin117 commented on code in PR #74: URL: https://github.com/apache/flink-connector-elasticsearch/pull/74#discussion_r1377514898 ## flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8SinkBaseITCase.java: ## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.elasticsearch.ElasticsearchServerBaseITCase; +import org.apache.flink.connector.elasticsearch.RequestTest; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLoggerExtension; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.rest_client.RestClientTransport; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.client.RestClient; +import org.junit.ClassRule; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.function.BiFunction; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link Elasticsearch8Sink}. */ +@ExtendWith(TestLoggerExtension.class) +abstract class Elasticsearch8SinkBaseITCase extends ElasticsearchServerBaseITCase { Review Comment: Why consider creating a new test class instead of using the existing `ElasticsearchSinkBaseITCase`? There are also other redundent parts need to be simplified, you can check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26088][Connectors/ElasticSearch] Add Elasticsearch 8.0 support [flink-connector-elasticsearch]
liyubin117 commented on code in PR #74: URL: https://github.com/apache/flink-connector-elasticsearch/pull/74#discussion_r1377503899 ## flink-connector-elasticsearch8/.gitignore: ## @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans Review Comment: unneccessary files. ## flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8SinkBaseITCase.java: ## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.elasticsearch.ElasticsearchServerBaseITCase; +import org.apache.flink.connector.elasticsearch.RequestTest; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLoggerExtension; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.rest_client.RestClientTransport; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.client.RestClient; +import org.junit.ClassRule; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.function.BiFunction; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link Elasticsearch8Sink}. */ +@ExtendWith(TestLoggerExtension.class) +abstract class Elasticsearch8SinkBaseITCase extends ElasticsearchServerBaseITCase { Review Comment: Why consider creating a new test class instead of using the existing `ElasticsearchSinkBaseITCase`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1378332744 ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +708,20 @@ public class TaskManagerOptions { "Time we wait for the timers in milliseconds to finish all pending timer threads" + " when the stream task is cancelled."); +public static final ConfigOption TASK_MANAGER_LOAD_BALANCE_MODE = Review Comment: It likes other options, it's better to add some comments, and `@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)`? Or should we add `@Documentation.Section` and deprecate `slot-spread-out` after all PRs is merged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1378331133 ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +708,20 @@ public class TaskManagerOptions { "Time we wait for the timers in milliseconds to finish all pending timer threads" + " when the stream task is cancelled."); +public static final ConfigOption TASK_MANAGER_LOAD_BALANCE_MODE = +ConfigOptions.key("taskmanager.load-balance.mode") +.enumType(TaskManagerLoadBalanceMode.class) +.defaultValue(TaskManagerLoadBalanceMode.NONE) +.withDescription( +"The load balance mode of taskmanager when processing scheduling tasks."); Review Comment: It's better to describe these values in detail, it's useful for users to understand this option. ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +708,20 @@ public class TaskManagerOptions { "Time we wait for the timers in milliseconds to finish all pending timer threads" + " when the stream task is cancelled."); +public static final ConfigOption TASK_MANAGER_LOAD_BALANCE_MODE = +ConfigOptions.key("taskmanager.load-balance.mode") +.enumType(TaskManagerLoadBalanceMode.class) +.defaultValue(TaskManagerLoadBalanceMode.NONE) +.withDescription( +"The load balance mode of taskmanager when processing scheduling tasks."); + +/** Type of taskmanager.load-balance.mode. */ Review Comment: ```suggestion /** Type of {@link #TASK_MANAGER_LOAD_BALANCE_MODE}. */ ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/BalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put in the same ExecutionSlotSharingGroup. Co-location + * constraints are ignored at present. + */ +class BalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy Review Comment: ```suggestion class TaskBalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy ``` How about `TaskBalancedPreferredSlotSharingStrategy`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32107] [Tests] 1.17 Kubernetes test failed because ofunable to establish ssl connection to github on AZP [flink]
victor9309 closed pull request #23636: [FLINK-32107] [Tests] 1.17 Kubernetes test failed because ofunable to establish ssl connection to github on AZP URL: https://github.com/apache/flink/pull/23636 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32107] [Tests] 1.17 Kubernetes test failed because ofunable to establish ssl connection to github on AZP [flink]
victor9309 closed pull request #23636: [FLINK-32107] [Tests] 1.17 Kubernetes test failed because ofunable to establish ssl connection to github on AZP URL: https://github.com/apache/flink/pull/23636 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32107] [Tests] Kubernetes test failed because ofunable to establish ssl connection to github on AZP [flink]
flinkbot commented on PR #23638: URL: https://github.com/apache/flink/pull/23638#issuecomment-1788261124 ## CI report: * d07d72543b49f24c788d5c52834e286f8e5350d7 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.18][FLINK-32107] [Tests] Kubernetes test failed because ofunable to establish ssl connection to github on AZP [flink]
flinkbot commented on PR #23637: URL: https://github.com/apache/flink/pull/23637#issuecomment-1788260613 ## CI report: * 1c7554f97aee03b0ab87c129eb2defd63d949718 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-1.17][FLINK-32107] [Tests] Kubernetes test failed because ofunable to establish ssl connection to github on AZP [flink]
victor9309 opened a new pull request, #23638: URL: https://github.com/apache/flink/pull/23638 ## What is the purpose of the change * Backport of https://github.com/apache/flink/pull/23528 to release-1.17,We could make CI fail earlier when download fails.* ## Brief change log - * modify common_kubernetes.sh;retry and make CI fail earlier* ## Verifying this change *This change is already covered by existing tests.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-1.18][FLINK-32107] [Tests] Kubernetes test failed because ofunable to establish ssl connection to github on AZP [flink]
victor9309 opened a new pull request, #23637: URL: https://github.com/apache/flink/pull/23637 ## What is the purpose of the change * Backport of https://github.com/apache/flink/pull/23528 to release-1.18,We could make CI fail earlier when download fails.* ## Brief change log - * modify common_kubernetes.sh;retry* ## Verifying this change *This change is already covered by existing tests.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32107] [Tests] 1.17 Kubernetes test failed because ofunable to establish ssl connection to github on AZP [flink]
flinkbot commented on PR #23636: URL: https://github.com/apache/flink/pull/23636#issuecomment-1788253541 ## CI report: * d07d72543b49f24c788d5c52834e286f8e5350d7 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-32107] [Tests] 1.17 Kubernetes test failed because ofunable to establish ssl connection to github on AZP [flink]
victor9309 opened a new pull request, #23636: URL: https://github.com/apache/flink/pull/23636 ## What is the purpose of the change *(We could make CI fail earlier when download fails.)* ## Brief change log - *backports 1.17,modify common_kubernetes.sh* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33401) Kafka connector has broken version
[ https://issues.apache.org/jira/browse/FLINK-33401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781563#comment-17781563 ] Yuxin Tan edited comment on FLINK-33401 at 11/1/23 12:24 AM: - [~pavelhp] If you wish to try it in Flink 1.18, you may need to wait for the new release of the new version connector. (Currently, the connector that can adapt to Flink 1.18 has not been released yet.) was (Author: tanyuxin): [~pavelhp] If you wish to try it in Flink 1.18, you may need to wait for the new release of the new version connector. > Kafka connector has broken version > -- > > Key: FLINK-33401 > URL: https://issues.apache.org/jira/browse/FLINK-33401 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Pavel Khokhlov >Priority: Major > Labels: pull-request-available > > Trying to run Flink 1.18 with Kafka Connector > but official documentation has a bug > [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kafka/] > {noformat} > > org.apache.flink > flink-connector-kafka > -1.18 > {noformat} > Basically version *-1.18* doesn't exist. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33401) Kafka connector has broken version
[ https://issues.apache.org/jira/browse/FLINK-33401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781563#comment-17781563 ] Yuxin Tan commented on FLINK-33401: --- [~pavelhp] If you wish to try it in Flink 1.18, you may need to wait for the new release of the new version connector. > Kafka connector has broken version > -- > > Key: FLINK-33401 > URL: https://issues.apache.org/jira/browse/FLINK-33401 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Pavel Khokhlov >Priority: Major > Labels: pull-request-available > > Trying to run Flink 1.18 with Kafka Connector > but official documentation has a bug > [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kafka/] > {noformat} > > org.apache.flink > flink-connector-kafka > -1.18 > {noformat} > Basically version *-1.18* doesn't exist. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33401][connectors/kafka] Fix the wrong download link and version in the connector doc [flink-connector-kafka]
TanYuxin-tyx commented on PR #64: URL: https://github.com/apache/flink-connector-kafka/pull/64#issuecomment-1788206919 @MartijnVisser, Thanks for helping review. This fix aims to resolve the issue of displaying the incorrect version number (-1.18) for the Kafka connector. Currently, the download link and version are not accessible until the new connector is released. This is expected, as it is similar to other connectors such as the Pulsar connector (https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/pulsar/), Kinesis connector (https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/), Elasticsearch connector (https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/elasticsearch/), and Google PubSub dependency (https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/pubsub/), and so on. Upon visiting these links, you will notice that the download links and dependencies are not available, resulting in a 404 error or the inability to find the dependency. However, their displayed versions are accurate. Therefore, this fix aims to ensure that the Kafka connector also displays the correct version and link. Once the new connector is released, these links will naturally become accessible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33422) Implement restore tests for Calc node
[ https://issues.apache.org/jira/browse/FLINK-33422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bonnie Varghese updated FLINK-33422: Summary: Implement restore tests for Calc node (was: Add restore tests for Calc node) > Implement restore tests for Calc node > - > > Key: FLINK-33422 > URL: https://issues.apache.org/jira/browse/FLINK-33422 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Bonnie Varghese >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33422) Add restore tests for Calc node
Bonnie Varghese created FLINK-33422: --- Summary: Add restore tests for Calc node Key: FLINK-33422 URL: https://issues.apache.org/jira/browse/FLINK-33422 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Bonnie Varghese -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33421) Implement ExecNode Restore Tests
Bonnie Varghese created FLINK-33421: --- Summary: Implement ExecNode Restore Tests Key: FLINK-33421 URL: https://issues.apache.org/jira/browse/FLINK-33421 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Bonnie Varghese Implement Restore Tests for various exec nodes to improve coverage Related JIRA: https://issues.apache.org/jira/browse/FLINK-33375 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss
[ https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781516#comment-17781516 ] Varun Narayanan Chakravarthy edited comment on FLINK-33402 at 10/31/23 9:10 PM: I had previously incorporated the fixes from [Flink-33360|https://issues.apache.org/jira/browse/FLINK-33360?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Connectors%20%2F%20HybridSource%22], but that does not guarantee complete correctness at high volume. was (Author: JIRAUSER302644): I had previously incorporated the fixes from Flink-33360, but that does not guarantee complete correctness at high volume. > Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in > Data Loss > > > Key: FLINK-33402 > URL: https://issues.apache.org/jira/browse/FLINK-33402 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.16.1 > Environment: Apache Flink 1.16.1 > Mac OSX, Linux etc. >Reporter: Varun Narayanan Chakravarthy >Priority: Blocker > Attachments: hybridSourceEnumeratorAndReaderFixes.patch > > Original Estimate: 2h > Remaining Estimate: 2h > > Hello Team, > I noticed that there is data loss when using Hybrid Source. We are reading > from a series of concrete File Sources ~100. All these locations are chained > together using the Hybrid source. > The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid > Sources switches the next source before the current source is complete. > Similarly for the Hybrid Source readers. I have also shared the patch file > that fixes the issue. > From the logs: > *Task Manager logs:* > 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding > split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, > 94451) hosts=[localhost] ID=000229 position=null] 2023-10-10 > 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek > policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - > Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished > reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher > for Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished > reading from splits [000154] 2023-10-10 17:46:24.014 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader > received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source > (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader > - No more splits for subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for > Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source > event: subtask=0 sourceIndex=12 > source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing > Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] > INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting > down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher > 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: > subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. > This is assigned to Reader with ID 000229. Now, we can see from the logs > this split is added after the no-more splits event
[jira] [Commented] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss
[ https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781516#comment-17781516 ] Varun Narayanan Chakravarthy commented on FLINK-33402: -- I had previously incorporated the fixes from Flink-33360, but that does not guarantee complete correctness at high volume. > Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in > Data Loss > > > Key: FLINK-33402 > URL: https://issues.apache.org/jira/browse/FLINK-33402 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.16.1 > Environment: Apache Flink 1.16.1 > Mac OSX, Linux etc. >Reporter: Varun Narayanan Chakravarthy >Priority: Blocker > Attachments: hybridSourceEnumeratorAndReaderFixes.patch > > Original Estimate: 2h > Remaining Estimate: 2h > > Hello Team, > I noticed that there is data loss when using Hybrid Source. We are reading > from a series of concrete File Sources ~100. All these locations are chained > together using the Hybrid source. > The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid > Sources switches the next source before the current source is complete. > Similarly for the Hybrid Source readers. I have also shared the patch file > that fixes the issue. > From the logs: > *Task Manager logs:* > 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding > split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, > 94451) hosts=[localhost] ID=000229 position=null] 2023-10-10 > 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek > policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - > Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished > reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher > for Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished > reading from splits [000154] 2023-10-10 17:46:24.014 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader > received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source > (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader > - No more splits for subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for > Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source > event: subtask=0 sourceIndex=12 > source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing > Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] > INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting > down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher > 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: > subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. > This is assigned to Reader with ID 000229. Now, we can see from the logs > this split is added after the no-more splits event and is NOT read. > *Job Manager logs:* > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote > split to requesting host '10': Optional[FileSourceSplit: > s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=000229 > position=null]
[jira] (FLINK-27758) [JUnit5 Migration] Module: flink-table-runtime
[ https://issues.apache.org/jira/browse/FLINK-27758 ] Chao Liu deleted comment on FLINK-27758: -- was (Author: JIRAUSER302840): Hi [~Sergey Nuyanzin] I'd like to work on this ticket, could I get assigned to this? > [JUnit5 Migration] Module: flink-table-runtime > -- > > Key: FLINK-27758 > URL: https://issues.apache.org/jira/browse/FLINK-27758 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime, Tests >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available, stale-assigned > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1787923597 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33420) Run e2e test fails Intermetiently with ClientCoordinationHandler : Unhandled exception
[ https://issues.apache.org/jira/browse/FLINK-33420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Samrat Deb updated FLINK-33420: --- Description: ``` Oct 31 08:49:37 2023-10-31 08:49:33,348 ERROR org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler [] - Unhandled exception. Oct 31 08:49:37 org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (8528fbf0d50c0f038653f6815d56f6fd) Oct 31 08:49:37 at org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:1450) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] Oct 31 08:49:37 at org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:1465) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] Oct 31 08:49:37 at org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:1088) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] Oct 31 08:49:37 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382] ``` log Link : [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=54212&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&s=ae4f8708-9994-57d3-c2d7-b892156e7812] cc: [~prabhujoseph] was: ``` Oct 31 08:49:37 2023-10-31 08:49:33,348 ERROR org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler [] - Unhandled exception. Oct 31 08:49:37 org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (8528fbf0d50c0f038653f6815d56f6fd) Oct 31 08:49:37 at org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:1450) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] Oct 31 08:49:37 at org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:1465) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] Oct 31 08:49:37 at org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:1088) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] Oct 31 08:49:37 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382] ``` log Link : https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=54212&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&s=ae4f8708-9994-57d3-c2d7-b892156e7812 > Run e2e test fails Intermetiently with ClientCoordinationHandler : Unhandled > exception > -- > > Key: FLINK-33420 > URL: https://issues.apache.org/jira/browse/FLINK-33420 > Project: Flink > Issue Type: Bug >Reporter: Samrat Deb >Priority: Major > > > ``` > Oct 31 08:49:37 2023-10-31 08:49:33,348 ERROR > org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler > [] - Unhandled exception. > Oct 31 08:49:37 org.apache.flink.runtime.messages.FlinkJobNotFoundException: > Could not find Flink job (8528fbf0d50c0f038653f6815d56f6fd) > Oct 31 08:49:37 at > org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:1450) > ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] > Oct 31 08:49:37 at > org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:1465) > ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] > Oct 31 08:49:37 at > org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:1088) > ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] > Oct 31 08:49:37 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) ~[?:1.8.0_382] > ``` > log Link : > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=54212&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&s=ae4f8708-9994-57d3-c2d7-b892156e7812] > > cc: [~prabhujoseph] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33420) Run e2e test fails Intermetiently with ClientCoordinationHandler : Unhandled exception
Samrat Deb created FLINK-33420: -- Summary: Run e2e test fails Intermetiently with ClientCoordinationHandler : Unhandled exception Key: FLINK-33420 URL: https://issues.apache.org/jira/browse/FLINK-33420 Project: Flink Issue Type: Bug Reporter: Samrat Deb ``` Oct 31 08:49:37 2023-10-31 08:49:33,348 ERROR org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler [] - Unhandled exception. Oct 31 08:49:37 org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (8528fbf0d50c0f038653f6815d56f6fd) Oct 31 08:49:37 at org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:1450) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] Oct 31 08:49:37 at org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:1465) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] Oct 31 08:49:37 at org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:1088) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] Oct 31 08:49:37 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382] ``` log Link : https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=54212&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&s=ae4f8708-9994-57d3-c2d7-b892156e7812 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss
[ https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Varun Narayanan Chakravarthy updated FLINK-33402: - Description: Hello Team, I noticed that there is data loss when using Hybrid Source. We are reading from a series of concrete File Sources ~100. All these locations are chained together using the Hybrid source. The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid Sources switches the next source before the current source is complete. Similarly for the Hybrid Source readers. I have also shared the patch file that fixes the issue. >From the logs: *Task Manager logs:* 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=000229 position=null] 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: parquet-source (1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished reading from splits [000154] 2023-10-10 17:46:24.014 [Source: parquet-source (1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader - No more splits for subtask=0 sourceIndex=11 currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source event: subtask=0 sourceIndex=12 source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: subtask=0 sourceIndex=11 currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. This is assigned to Reader with ID 000229. Now, we can see from the logs this split is added after the no-more splits event and is NOT read. *Job Manager logs:* 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote split to requesting host '10': Optional[FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=000229 position=null] 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=000229 position=null 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: parquet-source received split request from parallel task 1 (#0) 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest subtask=1 sourceIndex=11 pendingSplits={} 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on host '10.4.168.40') is requesting a file source split 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO o.a.f.c
[jira] [Updated] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs
[ https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Varun Narayanan Chakravarthy updated FLINK-33402: - Priority: Blocker (was: Major) > Hybrid Source Concurrency Race Condition Fixes and Related Bugs > --- > > Key: FLINK-33402 > URL: https://issues.apache.org/jira/browse/FLINK-33402 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.16.1 > Environment: Apache Flink 1.16.1 > Mac OSX, Linux etc. >Reporter: Varun Narayanan Chakravarthy >Priority: Blocker > Attachments: hybridSourceEnumeratorAndReaderFixes.patch > > Original Estimate: 2h > Remaining Estimate: 2h > > Hello Team, > I noticed that there is data loss when using Hybrid Source. We are reading > from a series of concrete File Sources ~100. All these locations are chained > together using the Hybrid source. > The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid > Sources switches the next source before the current source is complete. > Similarly for the Hybrid Source readers. I have also shared the patch file > that fixes the issue. > From the logs: > *Task Manager logs:* > 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding > split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, > 94451) hosts=[localhost] ID=000229 position=null] 2023-10-10 > 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek > policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - > Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished > reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher > for Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished > reading from splits [000154] 2023-10-10 17:46:24.014 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader > received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source > (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader > - No more splits for subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for > Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source > event: subtask=0 sourceIndex=12 > source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing > Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] > INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting > down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher > 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: > subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. > This is assigned to Reader with ID 000229. Now, we can see from the logs > this split is added after the no-more splits event and is NOT read. > *Job Manager logs:* > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote > split to requesting host '10': Optional[FileSourceSplit: > s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=000229 > position=null] > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split > to subtask 0 : FileSourceSplit:
[jira] [Commented] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs
[ https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781476#comment-17781476 ] Varun Narayanan Chakravarthy commented on FLINK-33402: -- Similar issues [here|https://www.mail-archive.com/issues@flink.apache.org/msg716124.html] > Hybrid Source Concurrency Race Condition Fixes and Related Bugs > --- > > Key: FLINK-33402 > URL: https://issues.apache.org/jira/browse/FLINK-33402 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.16.1 > Environment: Apache Flink 1.16.1 > Mac OSX, Linux etc. >Reporter: Varun Narayanan Chakravarthy >Priority: Major > Attachments: hybridSourceEnumeratorAndReaderFixes.patch > > Original Estimate: 2h > Remaining Estimate: 2h > > Hello Team, > I noticed that there is data loss when using Hybrid Source. We are reading > from a series of concrete File Sources ~100. All these locations are chained > together using the Hybrid source. > The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid > Sources switches the next source before the current source is complete. > Similarly for the Hybrid Source readers. I have also shared the patch file > that fixes the issue. > From the logs: > *Task Manager logs:* > 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding > split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, > 94451) hosts=[localhost] ID=000229 position=null] 2023-10-10 > 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek > policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - > Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished > reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher > for Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished > reading from splits [000154] 2023-10-10 17:46:24.014 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader > received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source > (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader > - No more splits for subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for > Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source > event: subtask=0 sourceIndex=12 > source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing > Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] > INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting > down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher > 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: > subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. > This is assigned to Reader with ID 000229. Now, we can see from the logs > this split is added after the no-more splits event and is NOT read. > *Job Manager logs:* > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote > split to requesting host '10': Optional[FileSourceSplit: > s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=000229 > position=null] > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.
[jira] [Updated] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss
[ https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Varun Narayanan Chakravarthy updated FLINK-33402: - Summary: Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss (was: Hybrid Source Concurrency Race Condition Fixes and Related Bugs) > Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in > Data Loss > > > Key: FLINK-33402 > URL: https://issues.apache.org/jira/browse/FLINK-33402 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.16.1 > Environment: Apache Flink 1.16.1 > Mac OSX, Linux etc. >Reporter: Varun Narayanan Chakravarthy >Priority: Blocker > Attachments: hybridSourceEnumeratorAndReaderFixes.patch > > Original Estimate: 2h > Remaining Estimate: 2h > > Hello Team, > I noticed that there is data loss when using Hybrid Source. We are reading > from a series of concrete File Sources ~100. All these locations are chained > together using the Hybrid source. > The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid > Sources switches the next source before the current source is complete. > Similarly for the Hybrid Source readers. I have also shared the patch file > that fixes the issue. > From the logs: > *Task Manager logs:* > 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding > split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, > 94451) hosts=[localhost] ID=000229 position=null] 2023-10-10 > 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek > policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - > Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished > reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher > for Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished > reading from splits [000154] 2023-10-10 17:46:24.014 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader > received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source > (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader > - No more splits for subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for > Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source > event: subtask=0 sourceIndex=12 > source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing > Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] > INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting > down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher > 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: > subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. > This is assigned to Reader with ID 000229. Now, we can see from the logs > this split is added after the no-more splits event and is NOT read. > *Job Manager logs:* > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote > split to requesting host '10': Optional[FileSourceSplit: > s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=000229 > position=null] > 2023-10-
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781471#comment-17781471 ] Maciej Obuchowski commented on FLINK-31275: --- [~zjureel] Sorry, it's probably too late for this comment as FLIP is accepted. But I'll still voice concern here from {{[OpenLineage|https://openlineage.io/] }}as we'd want to implement this interface. It seems like the intented way for LineageVertex interface is to just provide config context to particular nodes: ``` {{public}} {{interface}} {{LineageVertex {}} {{}}{{/* Config for the lineage vertex contains all the options for the connector. */}} {{}}{{Map config();}} {{}}} ``` and then in particular case, when the listener understand particular implementation, provide more information: ``` {{// Create kafka source class with lineage vertex}} {{public}} {{class}} {{KafkaVectorSource }}{{extends}} {{KafkaSource }}{{implements}} {{LineageVertexProvider {}} {{}}{{int}} {{capacity;}} {{}}{{String valueType;}} {{}}{{public}} {{LineageVertex LineageVertex() {}} {{}}{{return}} {{new}} {{KafkaVectorLineageVertex(capacity, valueType);}} {{}}{{}}} {{}}} {{```}} I think this is problematic because it strongly couples the listener to particular vertex implementation. If you want to get list of datasets that are read by particular Flink job, you'll have to understand where the config is coming from and it's structure. Additionally, sometimes config is not everything we need to get lineage - for example, for Kafka connector we could get regex pattern used for reading that we'd need to resolve ourselves. Or, if the connector subclasses `LineageVector` then another option is to get additional information from the subclass - but still, the connector has to understand it. Another problem is that the configuration structure for particular connector can have breaking changes between version - so we're tied not only to connector, but also particular version of it. But if we pushed the responsibility of understanting the datasets that particular vertex of a graph produces to the connector itself, we'd not have this problem. First, the connector understands where it's reading from and writing to - so providing that information is easy for it. Second, the versioning problem does not exist - because the connector can update the code responsible for providing dataset information at same PR that breaks it, which will be transparent for the listener. I would imagine the interface to be just something like this: ``` {{public}} {{interface}} {{LineageVertex {}} {{}}{{/* Config for the lineage vertex contains all the options for the connector. */}} {{}}{{Map config();}} {{ /* List of datasets that are consumed by this job */}}{{}} {{}}{{List inputs();}} {{ /* List of datasets that are produced by this job */}}{{}} {{}}{{List outputs();}} {{}}} ``` What dataset is in this case is debatable: from OL perspective it would be best if this would be something similar to [https://openlineage.io/apidocs/javadoc/io/openlineage/client/openlineage.dataset] - get name (ex. table name) and namespace (ex. standarized database identifier). It also provides extensible list of facets that represent additional information about the dataset that the particular connection wants to expose together with just dataset identifier - ex. something that represents table schema or side of the dataset. It could be something Flink - specific, but should allow particular connections to expose the additional information. > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275 ] Maciej Obuchowski deleted comment on FLINK-31275: --- was (Author: mobuchowski): [~zjureel] Sorry, it's probably too late for this comment as FLIP is accepted. But I'll still voice concern here: It seems like the intented way for LineageVertex interface is to just provide config context: ``` > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781458#comment-17781458 ] Maciej Obuchowski commented on FLINK-31275: --- [~zjureel] Sorry, it's probably too late for this comment as FLIP is accepted. But I'll still voice concern here: It seems like the intented way for LineageVertex interface is to just provide config context: ``` > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33418) SqlGatewayE2ECase failed due to ConnectException
[ https://issues.apache.org/jira/browse/FLINK-33418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781449#comment-17781449 ] Matthias Pohl commented on FLINK-33418: --- https://github.com/XComp/flink/actions/runs/6707387404/job/18226444235#step:15:12068 > SqlGatewayE2ECase failed due to ConnectException > > > Key: FLINK-33418 > URL: https://issues.apache.org/jira/browse/FLINK-33418 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Client, Tests >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions, test-stability > > The container couldn't be started in [this > build|https://github.com/XComp/flink/actions/runs/6696839844/job/18195926497#step:15:11765]: > {code} > Error: 20:18:40 20:18:40.111 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 110.789 s <<< FAILURE! - in > org.apache.flink.table.gateway.SqlGatewayE2ECase > Error: 20:18:40 20:18:40.111 [ERROR] > org.apache.flink.table.gateway.SqlGatewayE2ECase Time elapsed: 110.789 s > <<< ERROR! > Oct 30 20:18:40 org.testcontainers.containers.ContainerLaunchException: > Container startup failed for image prestodb/hdp2.6-hive:10 > Oct 30 20:18:40 at > org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:349) > Oct 30 20:18:40 at > org.apache.flink.table.gateway.containers.HiveContainer.doStart(HiveContainer.java:69) > Oct 30 20:18:40 at > org.testcontainers.containers.GenericContainer.start(GenericContainer.java:322) > Oct 30 20:18:40 at > org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1131) > Oct 30 20:18:40 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:28) > Oct 30 20:18:40 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > Oct 30 20:18:40 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Oct 30 20:18:40 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > Oct 30 20:18:40 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > Oct 30 20:18:40 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > Oct 30 20:18:40 at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > Oct 30 20:18:40 at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) > Oct 30 20:18:40 at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) > Oct 30 20:18:40 at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188) > Oct 30 20:18:40 at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154) > Oct 30 20:18:40 at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128) > Oct 30 20:18:40 at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428) > Oct 30 20:18:40 at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162) > Oct 30 20:18:40 at > org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562) > Oct 30 20:18:40 at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548) > Oct 30 20:18:40 Caused by: org.rnorth.ducttape.RetryCountExceededExcept
Re: [PR] [FLINK-33358][sql] Fix Flink SQL Client fail to start in Flink on YARN [flink]
PrabhuJoseph commented on PR #23629: URL: https://github.com/apache/flink/pull/23629#issuecomment-1787531140 @Samrat002 @1996fanrui Could you review this patch when you have time? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-24379) Support AWS Glue Schema Registry Avro for Table API
[ https://issues.apache.org/jira/browse/FLINK-24379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer reassigned FLINK-24379: - Assignee: Lorenzo Nicora > Support AWS Glue Schema Registry Avro for Table API > --- > > Key: FLINK-24379 > URL: https://issues.apache.org/jira/browse/FLINK-24379 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / API >Affects Versions: aws-connector-4.2.0 >Reporter: Brad Davis >Assignee: Lorenzo Nicora >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: aws-connector-4.3.0 > > > Unlike most (all?) of the other Avro formats, the AWS Glue Schema Registry > version doesn't include a > META-INF/services/org.apache.flink.table.factories.Factory resource or a > class implementing > org.apache.flink.table.factories.DeserializationFormatFactory and > org.apache.flink.table.factories.SerializationFormatFactory. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32380] Support Java Records with PojoTypeInfo/Serializer [flink]
gyfora commented on PR #23490: URL: https://github.com/apache/flink/pull/23490#issuecomment-1787514161 @XComp we have a green CI run with the Java tests enabled , I will remove that commit before merging. Do you have any further comments? I think I addressed most things -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
flinkbot commented on PR #23635: URL: https://github.com/apache/flink/pull/23635#issuecomment-1787488771 ## CI report: * 0816a1779cbf97c1e3526e18e70d3d2e09817a05 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33419] Port PROCTIME/ROWTIME functions to the new inference stack [flink]
flinkbot commented on PR #23634: URL: https://github.com/apache/flink/pull/23634#issuecomment-1787478201 ## CI report: * e204a5e3d94da5b0e4f31af5f31846c910a924f3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33386) Support tasks balancing at slot level for Default Scheduler
[ https://issues.apache.org/jira/browse/FLINK-33386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33386: --- Labels: pull-request-available (was: ) > Support tasks balancing at slot level for Default Scheduler > --- > > Key: FLINK-33386 > URL: https://issues.apache.org/jira/browse/FLINK-33386 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal opened a new pull request, #23635: URL: https://github.com/apache/flink/pull/23635 ## What is the purpose of the change - Support tasks balancing at slot level for Default Scheduler ## Brief change log - *Introduce BalancedPreferredSlotSharingStrategy to support tasks balancing at slot level.* - *Expose the configuration item to enable tasks balancing at slot level for Default Scheduler.* ## Verifying this change This change added tests and can be verified as follows: - *`org.apache.flink.runtime.scheduler.BalancedPreferredSlotSharingStrategyTest`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / **don't know**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33242) misc module: YARN tests are flaky
[ https://issues.apache.org/jira/browse/FLINK-33242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781430#comment-17781430 ] Matthias Pohl commented on FLINK-33242: --- Interestingly, we have a build where the yarn tests didn't fail: https://github.com/XComp/flink/actions/runs/6707387404/job/18226397885 > misc module: YARN tests are flaky > - > > Key: FLINK-33242 > URL: https://issues.apache.org/jira/browse/FLINK-33242 > Project: Flink > Issue Type: Sub-task >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Major > Labels: github-actions, test-stability > > https://github.com/XComp/flink/actions/runs/6473584177/job/17581942919 > {code} > 2023-10-10T23:16:09.3548634Z Oct 10 23:16:09 23:16:09.354 [ERROR] Tests run: > 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 5.664 s <<< FAILURE! - > in org.apache.flink.yarn.YarnPrioritySchedulingITCase > 2023-10-10T23:16:09.3564980Z Oct 10 23:16:09 23:16:09.354 [ERROR] > org.apache.flink.yarn.YarnPrioritySchedulingITCase.yarnApplication_submissionWithPriority_shouldRespectPriority > Time elapsed: 1.226 s <<< ERROR! > 2023-10-10T23:16:09.3565608Z Oct 10 23:16:09 java.lang.RuntimeException: > Runner failed with exception. > 2023-10-10T23:16:09.3566290Z Oct 10 23:16:09 at > org.apache.flink.yarn.YarnTestBase.startWithArgs(YarnTestBase.java:949) > 2023-10-10T23:16:09.3566954Z Oct 10 23:16:09 at > org.apache.flink.yarn.YarnPrioritySchedulingITCase.lambda$yarnApplication_submissionWithPriority_shouldRespectPriority$0(YarnPrioritySchedulingITCase.java:45) > 2023-10-10T23:16:09.3567646Z Oct 10 23:16:09 at > org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303) > 2023-10-10T23:16:09.3568447Z Oct 10 23:16:09 at > org.apache.flink.yarn.YarnPrioritySchedulingITCase.yarnApplication_submissionWithPriority_shouldRespectPriority(YarnPrioritySchedulingITCase.java:41) > 2023-10-10T23:16:09.3569187Z Oct 10 23:16:09 at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2023-10-10T23:16:09.3569805Z Oct 10 23:16:09 at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2023-10-10T23:16:09.3570485Z Oct 10 23:16:09 at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2023-10-10T23:16:09.3571052Z Oct 10 23:16:09 at > java.base/java.lang.reflect.Method.invoke(Method.java:566) > 2023-10-10T23:16:09.3571527Z Oct 10 23:16:09 at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > 2023-10-10T23:16:09.3572075Z Oct 10 23:16:09 at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > 2023-10-10T23:16:09.3572716Z Oct 10 23:16:09 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > 2023-10-10T23:16:09.3573350Z Oct 10 23:16:09 at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > 2023-10-10T23:16:09.3573954Z Oct 10 23:16:09 at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > 2023-10-10T23:16:09.3574665Z Oct 10 23:16:09 at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > 2023-10-10T23:16:09.3575378Z Oct 10 23:16:09 at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > 2023-10-10T23:16:09.3576139Z Oct 10 23:16:09 at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > 2023-10-10T23:16:09.3576852Z Oct 10 23:16:09 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > 2023-10-10T23:16:09.3577539Z Oct 10 23:16:09 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > 2023-10-10T23:16:09.3578225Z Oct 10 23:16:09 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > 2023-10-10T23:16:09.3578898Z Oct 10 23:16:09 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > 2023-10-10T23:16:09.3579568Z Oct 10 23:16:09 at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > 2023-10-10T23:16:09.3580243Z Oct 10 23:16:09 at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > 2023-10-10T23:16:09.3580917Z Oct 10 23:16:09 at > org.jun
[jira] [Updated] (FLINK-33419) Port PROCTIME/ROWTIME functions to the new inference stack
[ https://issues.apache.org/jira/browse/FLINK-33419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33419: --- Labels: pull-request-available (was: ) > Port PROCTIME/ROWTIME functions to the new inference stack > -- > > Key: FLINK-33419 > URL: https://issues.apache.org/jira/browse/FLINK-33419 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33419] Port PROCTIME/ROWTIME functions to the new inference stack [flink]
dawidwys opened a new pull request, #23634: URL: https://github.com/apache/flink/pull/23634 ## What is the purpose of the change Ports `PROCTIME` & `ROWTIME` functions to the new inference stack. ## Verifying this change Added tests for the input type strategy ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33419) Port PROCTIME/ROWTIME functions to the new inference stack
Dawid Wysakowicz created FLINK-33419: Summary: Port PROCTIME/ROWTIME functions to the new inference stack Key: FLINK-33419 URL: https://issues.apache.org/jira/browse/FLINK-33419 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Dawid Wysakowicz Assignee: Dawid Wysakowicz Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33418) SqlGatewayE2ECase failed due to ConnectException
[ https://issues.apache.org/jira/browse/FLINK-33418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781426#comment-17781426 ] Matthias Pohl commented on FLINK-33418: --- https://github.com/XComp/flink/actions/runs/6695266358/job/18192091957#step:15:12535 > SqlGatewayE2ECase failed due to ConnectException > > > Key: FLINK-33418 > URL: https://issues.apache.org/jira/browse/FLINK-33418 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Client, Tests >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions, test-stability > > The container couldn't be started in [this > build|https://github.com/XComp/flink/actions/runs/6696839844/job/18195926497#step:15:11765]: > {code} > Error: 20:18:40 20:18:40.111 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 110.789 s <<< FAILURE! - in > org.apache.flink.table.gateway.SqlGatewayE2ECase > Error: 20:18:40 20:18:40.111 [ERROR] > org.apache.flink.table.gateway.SqlGatewayE2ECase Time elapsed: 110.789 s > <<< ERROR! > Oct 30 20:18:40 org.testcontainers.containers.ContainerLaunchException: > Container startup failed for image prestodb/hdp2.6-hive:10 > Oct 30 20:18:40 at > org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:349) > Oct 30 20:18:40 at > org.apache.flink.table.gateway.containers.HiveContainer.doStart(HiveContainer.java:69) > Oct 30 20:18:40 at > org.testcontainers.containers.GenericContainer.start(GenericContainer.java:322) > Oct 30 20:18:40 at > org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1131) > Oct 30 20:18:40 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:28) > Oct 30 20:18:40 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > Oct 30 20:18:40 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Oct 30 20:18:40 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > Oct 30 20:18:40 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > Oct 30 20:18:40 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > Oct 30 20:18:40 at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > Oct 30 20:18:40 at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) > Oct 30 20:18:40 at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86) > Oct 30 20:18:40 at > org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) > Oct 30 20:18:40 at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188) > Oct 30 20:18:40 at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154) > Oct 30 20:18:40 at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128) > Oct 30 20:18:40 at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428) > Oct 30 20:18:40 at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162) > Oct 30 20:18:40 at > org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562) > Oct 30 20:18:40 at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548) > Oct 30 20:18:40 Caused by: org.rnorth.ducttape.RetryCountExceededExcept
[jira] [Updated] (FLINK-33418) SqlGatewayE2ECase failed due to ConnectException
[ https://issues.apache.org/jira/browse/FLINK-33418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-33418: -- Description: The container couldn't be started in [this build|https://github.com/XComp/flink/actions/runs/6696839844/job/18195926497#step:15:11765]: {code} Error: 20:18:40 20:18:40.111 [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 110.789 s <<< FAILURE! - in org.apache.flink.table.gateway.SqlGatewayE2ECase Error: 20:18:40 20:18:40.111 [ERROR] org.apache.flink.table.gateway.SqlGatewayE2ECase Time elapsed: 110.789 s <<< ERROR! Oct 30 20:18:40 org.testcontainers.containers.ContainerLaunchException: Container startup failed for image prestodb/hdp2.6-hive:10 Oct 30 20:18:40 at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:349) Oct 30 20:18:40 at org.apache.flink.table.gateway.containers.HiveContainer.doStart(HiveContainer.java:69) Oct 30 20:18:40 at org.testcontainers.containers.GenericContainer.start(GenericContainer.java:322) Oct 30 20:18:40 at org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1131) Oct 30 20:18:40 at org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:28) Oct 30 20:18:40 at org.junit.rules.RunRules.evaluate(RunRules.java:20) Oct 30 20:18:40 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Oct 30 20:18:40 at org.junit.runners.ParentRunner.run(ParentRunner.java:413) Oct 30 20:18:40 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) Oct 30 20:18:40 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) Oct 30 20:18:40 at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) Oct 30 20:18:40 at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) Oct 30 20:18:40 at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) Oct 30 20:18:40 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147) Oct 30 20:18:40 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127) Oct 30 20:18:40 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90) Oct 30 20:18:40 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55) Oct 30 20:18:40 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102) Oct 30 20:18:40 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54) Oct 30 20:18:40 at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) Oct 30 20:18:40 at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) Oct 30 20:18:40 at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86) Oct 30 20:18:40 at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) Oct 30 20:18:40 at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188) Oct 30 20:18:40 at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154) Oct 30 20:18:40 at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128) Oct 30 20:18:40 at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428) Oct 30 20:18:40 at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162) Oct 30 20:18:40 at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562) Oct 30 20:18:40 at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548) Oct 30 20:18:40 Caused by: org.rnorth.ducttape.RetryCountExceededException: Retry limit hit with exception Oct 30 20:18:40 at org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:88) Oct 30 20:18:40 at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:334) Oct 30 20:18:40 ... 29 more Oct 30 20:18:40 Caused by: org.testcontainers.containers.ContainerLaunchException: Could not create/start container Oct 30 20:18:40 at org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:553) Oct 30 20:18:40 at org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:344) Oct 30
[jira] [Created] (FLINK-33418) SqlGatewayE2ECase failed due to ConnectException
Matthias Pohl created FLINK-33418: - Summary: SqlGatewayE2ECase failed due to ConnectException Key: FLINK-33418 URL: https://issues.apache.org/jira/browse/FLINK-33418 Project: Flink Issue Type: Sub-task Components: Table SQL / Client, Tests Reporter: Matthias Pohl The container couldn't be started in [this build|https://github.com/XComp/flink/actions/runs/6696839844/job/18195926497#step:15:11765]: {code} Error: 20:18:40 20:18:40.111 [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 110.789 s <<< FAILURE! - in org.apache.flink.table.gateway.SqlGatewayE2ECase Error: 20:18:40 20:18:40.111 [ERROR] org.apache.flink.table.gateway.SqlGatewayE2ECase Time elapsed: 110.789 s <<< ERROR! Oct 30 20:18:40 org.testcontainers.containers.ContainerLaunchException: Container startup failed for image prestodb/hdp2.6-hive:10 Oct 30 20:18:40 at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:349) Oct 30 20:18:40 at org.apache.flink.table.gateway.containers.HiveContainer.doStart(HiveContainer.java:69) Oct 30 20:18:40 at org.testcontainers.containers.GenericContainer.start(GenericContainer.java:322) Oct 30 20:18:40 at org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1131) Oct 30 20:18:40 at org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:28) Oct 30 20:18:40 at org.junit.rules.RunRules.evaluate(RunRules.java:20) Oct 30 20:18:40 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Oct 30 20:18:40 at org.junit.runners.ParentRunner.run(ParentRunner.java:413) Oct 30 20:18:40 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) Oct 30 20:18:40 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) Oct 30 20:18:40 at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) Oct 30 20:18:40 at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) Oct 30 20:18:40 at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) Oct 30 20:18:40 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147) Oct 30 20:18:40 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127) Oct 30 20:18:40 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90) Oct 30 20:18:40 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55) Oct 30 20:18:40 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102) Oct 30 20:18:40 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54) Oct 30 20:18:40 at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) Oct 30 20:18:40 at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) Oct 30 20:18:40 at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86) Oct 30 20:18:40 at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) Oct 30 20:18:40 at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188) Oct 30 20:18:40 at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154) Oct 30 20:18:40 at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128) Oct 30 20:18:40 at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428) Oct 30 20:18:40 at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162) Oct 30 20:18:40 at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562) Oct 30 20:18:40 at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548) Oct 30 20:18:40 Caused by: org.rnorth.ducttape.RetryCountExceededException: Retry limit hit with exception Oct 30 20:18:40 at org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:88) Oct 30 20:18:40 at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:334) Oct 30 20:18:40 ... 29 more Oct 30 20:18:40 Caused by: org.testcontainers.containers.ContainerLaunchException: Could not create/start container Oct 30 20:18:40 at org.testcontainers.containers.
Re: [PR] [FLINK-33408] Fixing the container vulnerability by upgrade the SnakeYaml Maven dependency in flink-kubernetes module. [flink]
ShijieHome commented on PR #23631: URL: https://github.com/apache/flink/pull/23631#issuecomment-1787444937 @tweise @mbalassi @gyfora , could you please check this PR for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Adding AWS Connectors v4.2.0 [flink-web]
dannycranmer opened a new pull request, #693: URL: https://github.com/apache/flink-web/pull/693 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33283) core stage: WebFrontendBootstrapTest.testHandlersMustBeLoaded
[ https://issues.apache.org/jira/browse/FLINK-33283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781422#comment-17781422 ] Matthias Pohl commented on FLINK-33283: --- I did a test run where I repeatedly run {{WebFrontendBootstrapTest.testHandlersMustBeLoaded}} only (see [2a0627e|https://github.com/XComp/flink/blob/2a0627e9d6cc5f12462cfc7de2c58acb85e666fd/.github/workflows/flink-ci-template.yml#L286-L290]. I wasn't able to reproduce the error (grep over the build's log artifacts): {code:bash} $ grep -c "Tests run: 1" *module*/*_Test*txt ci Test (module connect_1)/12_Test - connect_1.txt:5536 ci Test (module connect_2)/12_Test - connect_2.txt:6136 ci Test (module core)/12_Test - core.txt:4866 ci Test (module misc)/12_Test - misc.txt:5602 ci Test (module python)/12_Test - python.txt:5922 ci Test (module table)/12_Test - table.txt:5522 ci Test (module tests)/12_Test - tests.txt:5806 $ grep -c "Tests run: 1, Failures: 0" *module*/*_Test*txt ci Test (module connect_1)/12_Test - connect_1.txt:5536 ci Test (module connect_2)/12_Test - connect_2.txt:6136 ci Test (module core)/12_Test - core.txt:4866 ci Test (module misc)/12_Test - misc.txt:5602 ci Test (module python)/12_Test - python.txt:5922 ci Test (module table)/12_Test - table.txt:5522 ci Test (module tests)/12_Test - tests.txt:5806 {code} The number of test runs matched the number of test runs w/o a failure. > core stage: WebFrontendBootstrapTest.testHandlersMustBeLoaded > - > > Key: FLINK-33283 > URL: https://issues.apache.org/jira/browse/FLINK-33283 > Project: Flink > Issue Type: Sub-task >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions, test-stability > > [https://github.com/XComp/flink/actions/runs/6525957588/job/17719339666#step:10:12279] > {code:java} > Error: 20:06:13 20:06:13.132 [ERROR] > org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrapTest.testHandlersMustBeLoaded > Time elapsed: 2.298 s <<< FAILURE! > 12279Oct 15 20:06:13 org.opentest4j.AssertionFailedError: > 12280Oct 15 20:06:13 > 12281Oct 15 20:06:13 expected: 404 > 12282Oct 15 20:06:13 but was: 200 > 12283Oct 15 20:06:13 at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > 12284Oct 15 20:06:13 at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > 12285Oct 15 20:06:13 at > java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > 12286Oct 15 20:06:13 at > org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrapTest.testHandlersMustBeLoaded(WebFrontendBootstrapTest.java:89) > 12287Oct 15 20:06:13 at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > [...]{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33259) flink-connector-aws should use/extend the common connector workflow
[ https://issues.apache.org/jira/browse/FLINK-33259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-33259: -- Fix Version/s: aws-connector-4.3.0 (was: aws-connector-4.2.0) > flink-connector-aws should use/extend the common connector workflow > --- > > Key: FLINK-33259 > URL: https://issues.apache.org/jira/browse/FLINK-33259 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / AWS >Affects Versions: aws-connector-3.0.0, aws-connector-4.1.0 >Reporter: Hong Liang Teoh >Assignee: Aleksandr Pilipenko >Priority: Major > Fix For: aws-connector-4.3.0 > > > We should use the common ci github workflow. > [https://github.com/apache/flink-connector-shared-utils/blob/ci_utils/.github/workflows/ci.yml] > > Example used in flink-connector-elasticsearch > [https://github.com/apache/flink-connector-elasticsearch/blob/main/.github/workflows/push_pr.yml] > > This improves our operational stance because we will now inherit any > improvements/changes to the main ci workflow file -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33260) Custom Error Handling for Kinesis Consumer
[ https://issues.apache.org/jira/browse/FLINK-33260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-33260: -- Affects Version/s: aws-connector-4.2.0 > Custom Error Handling for Kinesis Consumer > -- > > Key: FLINK-33260 > URL: https://issues.apache.org/jira/browse/FLINK-33260 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Affects Versions: aws-connector-4.2.0 >Reporter: Danny Cranmer >Assignee: Emre Kartoglu >Priority: Major > Labels: pull-request-available > Fix For: aws-connector-4.3.0 > > > Background > The Kinesis Consumer exposes various configuration that allows the user to > define retry and backoff strategies when dealing with errors. However, the > configuration does not allow the user to configure which errors are > retryable, or different strategies for different errors. The error handling > logic is hard coded within the connector. Over time we discover errors that > should be retryable that are not, for example KDS throwing 500 on > SubscribeToShare or transient DNS issues. When these arise the user can > either fork-fix the connector or log an issue and wait for the next version. > h3. Scope > Add the ability for the user to define retry/backoff strategy per error. This > could be achieved using flexible configuration keys, or allowing the user to > register their own retry strategies on the connector > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33260) Custom Error Handling for Kinesis Consumer
[ https://issues.apache.org/jira/browse/FLINK-33260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-33260: -- Fix Version/s: aws-connector-4.3.0 (was: aws-connector-4.2.0) > Custom Error Handling for Kinesis Consumer > -- > > Key: FLINK-33260 > URL: https://issues.apache.org/jira/browse/FLINK-33260 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Reporter: Danny Cranmer >Assignee: Emre Kartoglu >Priority: Major > Labels: pull-request-available > Fix For: aws-connector-4.3.0 > > > Background > The Kinesis Consumer exposes various configuration that allows the user to > define retry and backoff strategies when dealing with errors. However, the > configuration does not allow the user to configure which errors are > retryable, or different strategies for different errors. The error handling > logic is hard coded within the connector. Over time we discover errors that > should be retryable that are not, for example KDS throwing 500 on > SubscribeToShare or transient DNS issues. When these arise the user can > either fork-fix the connector or log an issue and wait for the next version. > h3. Scope > Add the ability for the user to define retry/backoff strategy per error. This > could be achieved using flexible configuration keys, or allowing the user to > register their own retry strategies on the connector > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33259) flink-connector-aws should use/extend the common connector workflow
[ https://issues.apache.org/jira/browse/FLINK-33259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-33259: -- Affects Version/s: aws-connector-4.2.0 (was: aws-connector-4.1.0) > flink-connector-aws should use/extend the common connector workflow > --- > > Key: FLINK-33259 > URL: https://issues.apache.org/jira/browse/FLINK-33259 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / AWS >Affects Versions: aws-connector-3.0.0, aws-connector-4.2.0 >Reporter: Hong Liang Teoh >Assignee: Aleksandr Pilipenko >Priority: Major > Fix For: aws-connector-4.3.0 > > > We should use the common ci github workflow. > [https://github.com/apache/flink-connector-shared-utils/blob/ci_utils/.github/workflows/ci.yml] > > Example used in flink-connector-elasticsearch > [https://github.com/apache/flink-connector-elasticsearch/blob/main/.github/workflows/push_pr.yml] > > This improves our operational stance because we will now inherit any > improvements/changes to the main ci workflow file -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33072) Implement end-to-end tests for AWS Kinesis Connectors
[ https://issues.apache.org/jira/browse/FLINK-33072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer resolved FLINK-33072. --- Assignee: Hong Liang Teoh Resolution: Done > Implement end-to-end tests for AWS Kinesis Connectors > - > > Key: FLINK-33072 > URL: https://issues.apache.org/jira/browse/FLINK-33072 > Project: Flink > Issue Type: Improvement > Components: Connectors / AWS >Reporter: Hong Liang Teoh >Assignee: Hong Liang Teoh >Priority: Major > Fix For: aws-connector-4.2.0 > > > *What* > We want to implement end-to-end tests that target real Kinesis Data Streams. > *Why* > This solidifies our testing to ensure we pick up any integration issues with > Kinesis Data Streams API. > We especially want to test happy cases and failure cases to ensure those > cases are handled as expected by the KDS connector. > > Reference: https://issues.apache.org/jira/browse/INFRA-24474 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32116) FlinkKinesisConsumer cannot stop-with-savepoint when configured with watermark assigner and watermark tracker
[ https://issues.apache.org/jira/browse/FLINK-32116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-32116: -- Affects Version/s: aws-connector-4.2.0 (was: aws-connector-4.1.0) > FlinkKinesisConsumer cannot stop-with-savepoint when configured with > watermark assigner and watermark tracker > - > > Key: FLINK-32116 > URL: https://issues.apache.org/jira/browse/FLINK-32116 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.16.1, 1.15.4, aws-connector-4.2.0 >Reporter: Hong Liang Teoh >Assignee: Aleksandr Pilipenko >Priority: Major > Fix For: aws-connector-4.3.0 > > > Problem: > When FlinkKinesisConsumer is configured with legacy watermarking system, it > is unable to take a savepoint during stop-with-savepoint, and will get stuck > indefinitely. > > > {code:java} > FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new > SimpleStringSchema(), consumerConfig); > // Set up watermark assigner on Kinesis source > src.setPeriodicWatermarkAssigner(...); > // Set up watermark tracker on Kinesis source > src.setWatermarkTracker(...);{code} > > > *Why does it get stuck?* > When watermarks are setup, the `shardConsumer` and `recordEmitter` thread > communicate using asynchronous queue. > On stop-with-savepoint, shardConsumer waits for queue to empty before > continuing. recordEmitter is terminated before queue is empty. As such, queue > is never going to be empty, and app gets stuck indefinitely. > > *Workarounds* > Use the new watermark framework > {code:java} > FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new > SimpleStringSchema(), consumerConfig); > env.addSource(src) > // Set up watermark strategy with both watermark assigner and watermark > tracker > > .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()){code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33072) Implement end-to-end tests for AWS Kinesis Connectors
[ https://issues.apache.org/jira/browse/FLINK-33072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-33072: -- Fix Version/s: aws-connector-4.2.0 (was: 2.0.0) > Implement end-to-end tests for AWS Kinesis Connectors > - > > Key: FLINK-33072 > URL: https://issues.apache.org/jira/browse/FLINK-33072 > Project: Flink > Issue Type: Improvement > Components: Connectors / AWS >Reporter: Hong Liang Teoh >Priority: Major > Fix For: aws-connector-4.2.0 > > > *What* > We want to implement end-to-end tests that target real Kinesis Data Streams. > *Why* > This solidifies our testing to ensure we pick up any integration issues with > Kinesis Data Streams API. > We especially want to test happy cases and failure cases to ensure those > cases are handled as expected by the KDS connector. > > Reference: https://issues.apache.org/jira/browse/INFRA-24474 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33073) Implement end-to-end tests for the Kinesis Streams Sink
[ https://issues.apache.org/jira/browse/FLINK-33073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer resolved FLINK-33073. --- Resolution: Fixed > Implement end-to-end tests for the Kinesis Streams Sink > --- > > Key: FLINK-33073 > URL: https://issues.apache.org/jira/browse/FLINK-33073 > Project: Flink > Issue Type: Sub-task > Components: Connectors / AWS >Reporter: Hong Liang Teoh >Assignee: Hong Liang Teoh >Priority: Major > Labels: pull-request-available > Fix For: aws-connector-4.2.0 > > > *What* > Implement end-to-end tests for KinesisStreamsSink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32116) FlinkKinesisConsumer cannot stop-with-savepoint when configured with watermark assigner and watermark tracker
[ https://issues.apache.org/jira/browse/FLINK-32116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-32116: -- Fix Version/s: aws-connector-4.3.0 (was: aws-connector-4.2.0) > FlinkKinesisConsumer cannot stop-with-savepoint when configured with > watermark assigner and watermark tracker > - > > Key: FLINK-32116 > URL: https://issues.apache.org/jira/browse/FLINK-32116 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.16.1, 1.15.4, aws-connector-4.1.0 >Reporter: Hong Liang Teoh >Assignee: Aleksandr Pilipenko >Priority: Major > Fix For: aws-connector-4.3.0 > > > Problem: > When FlinkKinesisConsumer is configured with legacy watermarking system, it > is unable to take a savepoint during stop-with-savepoint, and will get stuck > indefinitely. > > > {code:java} > FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new > SimpleStringSchema(), consumerConfig); > // Set up watermark assigner on Kinesis source > src.setPeriodicWatermarkAssigner(...); > // Set up watermark tracker on Kinesis source > src.setWatermarkTracker(...);{code} > > > *Why does it get stuck?* > When watermarks are setup, the `shardConsumer` and `recordEmitter` thread > communicate using asynchronous queue. > On stop-with-savepoint, shardConsumer waits for queue to empty before > continuing. recordEmitter is terminated before queue is empty. As such, queue > is never going to be empty, and app gets stuck indefinitely. > > *Workarounds* > Use the new watermark framework > {code:java} > FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new > SimpleStringSchema(), consumerConfig); > env.addSource(src) > // Set up watermark strategy with both watermark assigner and watermark > tracker > > .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()){code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31872) Add Support for Configuring AIMD Ratelimiting strategy parameters by Sink users for KinesisStreamsSink
[ https://issues.apache.org/jira/browse/FLINK-31872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-31872: -- Fix Version/s: aws-connector-4.3.0 (was: aws-connector-4.2.0) > Add Support for Configuring AIMD Ratelimiting strategy parameters by Sink > users for KinesisStreamsSink > -- > > Key: FLINK-31872 > URL: https://issues.apache.org/jira/browse/FLINK-31872 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Major > Fix For: aws-connector-4.3.0 > > > h1. Issue > As part of FLINK-31772 > I performed a complete benchmark for {{KinesisStreamsSink}} after configuring > rate limiting strategy. > It appears that optimum values for rate limiting strategy parameters are > dependent on use case (shard number/ parallellism/ record thouroughput) > We initially implemeted the {{AIMDRateLimitingStrategy}} in accordance with > one used for TCP congestion control but since parameters are use case > dependent we would like to allow sink users to adjust parameters as suitable. > h2. Requirements > - we *must* allow users to configure increment rate and decrease factor of > AIMDRateLimitingStrategy for {{KinesisStreamsSink}} > - we *must* provide backward compatible default values identical to current > values to introduce no further regressions. > h2. Appendix > h3. Performace Benchmark Results > |Parallelism/Shards/Payload|paralellism|shards|payload|records/sec|Async > Sink|Async Sink With Configured Ratelimiting Strategy Thourouput (MB/s)|Async > sink/ Maximum Thourouput|% of Improvement| > |Low/Low/Low|1|1|1024|1|0.991|1|1|0.9| > |Low/Low/High|1|1|102400|100|0.9943|1|1|0.57| > |Low/Med/Low|1|8|1024|8|4.12|4.57|0.57125|5.625| > |Low/Med/High|1|8|102400|800|4.35|7.65|0.95625|41.25| > |Med/Low/Low|8|1|1024|2|0.852|0.846|0.846|-0.6| > |Med/Low/High|8|1|102400|200|0.921|0.867|0.867|-5.4| > |Med/Med/Low|8|8|1024|8|5.37|4.76|0.595|-7.625| > |Med/Med/High|8|8|102400|800|7.53|7.69|0.96125|2| > |Med/High/Low|8|64|1024|8|32.5|37.4|0.58438|7.65625| > |Med/High/High|8|64|102400|800|47.27|60.4|0.94375|20.51562| > |High/High/Low|256|256|1024|30|127|127|0.49609|0| > |High/High/High|256|256|102400|3000|225|246|0.96094|8.20313| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31872) Add Support for Configuring AIMD Ratelimiting strategy parameters by Sink users for KinesisStreamsSink
[ https://issues.apache.org/jira/browse/FLINK-31872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-31872: -- Affects Version/s: aws-connector-4.2.0 > Add Support for Configuring AIMD Ratelimiting strategy parameters by Sink > users for KinesisStreamsSink > -- > > Key: FLINK-31872 > URL: https://issues.apache.org/jira/browse/FLINK-31872 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Affects Versions: aws-connector-4.2.0 >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Major > Fix For: aws-connector-4.3.0 > > > h1. Issue > As part of FLINK-31772 > I performed a complete benchmark for {{KinesisStreamsSink}} after configuring > rate limiting strategy. > It appears that optimum values for rate limiting strategy parameters are > dependent on use case (shard number/ parallellism/ record thouroughput) > We initially implemeted the {{AIMDRateLimitingStrategy}} in accordance with > one used for TCP congestion control but since parameters are use case > dependent we would like to allow sink users to adjust parameters as suitable. > h2. Requirements > - we *must* allow users to configure increment rate and decrease factor of > AIMDRateLimitingStrategy for {{KinesisStreamsSink}} > - we *must* provide backward compatible default values identical to current > values to introduce no further regressions. > h2. Appendix > h3. Performace Benchmark Results > |Parallelism/Shards/Payload|paralellism|shards|payload|records/sec|Async > Sink|Async Sink With Configured Ratelimiting Strategy Thourouput (MB/s)|Async > sink/ Maximum Thourouput|% of Improvement| > |Low/Low/Low|1|1|1024|1|0.991|1|1|0.9| > |Low/Low/High|1|1|102400|100|0.9943|1|1|0.57| > |Low/Med/Low|1|8|1024|8|4.12|4.57|0.57125|5.625| > |Low/Med/High|1|8|102400|800|4.35|7.65|0.95625|41.25| > |Med/Low/Low|8|1|1024|2|0.852|0.846|0.846|-0.6| > |Med/Low/High|8|1|102400|200|0.921|0.867|0.867|-5.4| > |Med/Med/Low|8|8|1024|8|5.37|4.76|0.595|-7.625| > |Med/Med/High|8|8|102400|800|7.53|7.69|0.96125|2| > |Med/High/Low|8|64|1024|8|32.5|37.4|0.58438|7.65625| > |Med/High/High|8|64|102400|800|47.27|60.4|0.94375|20.51562| > |High/High/Low|256|256|1024|30|127|127|0.49609|0| > |High/High/High|256|256|102400|3000|225|246|0.96094|8.20313| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31922) Port over Kinesis Client configurations for retry and backoff
[ https://issues.apache.org/jira/browse/FLINK-31922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-31922: -- Affects Version/s: aws-connector-4.2.0 > Port over Kinesis Client configurations for retry and backoff > - > > Key: FLINK-31922 > URL: https://issues.apache.org/jira/browse/FLINK-31922 > Project: Flink > Issue Type: Sub-task > Components: Connectors / AWS >Affects Versions: aws-connector-4.2.0 >Reporter: Hong Liang Teoh >Assignee: Daren Wong >Priority: Major > Fix For: aws-connector-4.3.0 > > > Port over the Kinesis Client configurations for GetRecords, ListShards, > DescribeStream -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31922) Port over Kinesis Client configurations for retry and backoff
[ https://issues.apache.org/jira/browse/FLINK-31922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-31922: -- Fix Version/s: aws-connector-4.3.0 (was: aws-connector-4.2.0) > Port over Kinesis Client configurations for retry and backoff > - > > Key: FLINK-31922 > URL: https://issues.apache.org/jira/browse/FLINK-31922 > Project: Flink > Issue Type: Sub-task > Components: Connectors / AWS >Reporter: Hong Liang Teoh >Assignee: Daren Wong >Priority: Major > Fix For: aws-connector-4.3.0 > > > Port over the Kinesis Client configurations for GetRecords, ListShards, > DescribeStream -- This message was sent by Atlassian Jira (v8.20.10#820010)