[PR] [hotfix] Using commons-lang3 for exception checking [flink-kubernetes-operator]
haoxins opened a new pull request, #818: URL: https://github.com/apache/flink-kubernetes-operator/pull/818 ## What is the purpose of the change *(For example: This pull request adds a new feature to periodically create and maintain savepoints through the `FlinkDeployment` custom resource.)* ## Brief change log *(for example:)* - *Periodic savepoint trigger is introduced to the custom resource* - *The operator checks on reconciliation whether the required time has passed* - *The JobManager's dispose savepoint API is used to clean up obsolete savepoints* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (yes / no) - Core observer or reconciler logic that is regularly executed: (yes / no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839474#comment-17839474 ] Roman Boyko commented on FLINK-34380: - Hi [~xuyangzhong] , [~xu_shuai_] ! 1) The RowKind can't be fixed in current architecture, because +I and +U are separated in different batches in this example. And this would be are bit tricky to fix it. 2) But the records order is really incorrect in this example and it can be easily fixed - https://github.com/rovboyko/flink/tree/fix/FLINK-34380 > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.20.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]
caicancai commented on code in PR #810: URL: https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1574122219 ## docs/content.zh/docs/concepts/overview.md: ## @@ -24,86 +24,93 @@ specific language governing permissions and limitations under the License. --> -# Overview -Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. Although Flink’s native Kubernetes integration already allows you to directly deploy Flink applications on a running Kubernetes(k8s) cluster, [custom resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) and the [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have also become central to a Kubernetes native deployment experience. + -Flink Kubernetes Operator aims to capture the responsibilities of a human operator who is managing Flink deployments. Human operators have deep knowledge of how Flink deployments ought to behave, how to start clusters, how to deploy jobs, how to upgrade them and how to react if there are problems. The main goal of the operator is the automation of these activities, which cannot be achieved through the Flink native integration alone. +# 概述 +Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 [自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) 和 [operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 Kubernetes 本地部署体验的核心。 -## Features -### Core +Flink Kubernetes Operator 通过自定义资源定义(CRD)来扩展 Kubernetes API,以便通过本地 k8s 工具(如 kubectl)管理和操作 Flink 部署。Operator 的核心功能包括: + + + +## 特征 + + + +### 核心 - Fully-automated [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" >}}) - - Running, suspending and deleting applications - - Stateful and stateless application upgrades - - Triggering and managing savepoints - - Handling errors, rolling-back broken upgrades + - 运行、暂停和删除应用程序 + - 有状态和无状态应用程序升级 + - 保存点的触发和管理 + - 任务管理器的扩展和缩减 - Multiple Flink version support: v1.15, v1.16, v1.17, v1.18 - [Deployment Modes]({{< ref "docs/custom-resource/overview#application-deployments" >}}): - - Application cluster - - Session cluster - - Session job + - 应用程序集群 + - 会话集群 + - 会话作业 - Built-in [High Availability](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/) - Extensible framework - [Custom validators]({{< ref "docs/operations/plugins#custom-flink-resource-validators" >}}) - [Custom resource listeners]({{< ref "docs/operations/plugins#custom-flink-resource-listeners" >}}) - Advanced [Configuration]({{< ref "docs/operations/configuration" >}}) management - - Default configurations with dynamic updates - - Per job configuration - - Environment variables + - 默认配置与动态更新 + - 作业配置 + - 任务管理器配置 - POD augmentation via [Pod Templates]({{< ref "docs/custom-resource/pod-template" >}}) - - Native Kubernetes POD definitions - - Layering (Base/JobManager/TaskManager overrides) + - 原生Kubernetes POD定义 + - 用于自定义容器和资源 - [Job Autoscaler]({{< ref "docs/custom-resource/autoscaler" >}}) - - Collect lag and utilization metrics - - Scale job vertices to the ideal parallelism - - Scale up and down as the load changes -### Operations + - 收集延迟和利用率指标 + - 根据指标自动调整任务管理器数量 + - 根据负载的变化进行扩展和缩减 + + + +### 运营 - Operator [Metrics]({{< ref "docs/operations/metrics-logging#metrics" >}}) - - Utilizes the well-established [Flink Metric System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics) - - Pluggable metrics reporters - - Detailed resources and kubernetes api access metrics + - 使用成熟的 [Flink Metric System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics) + - 可插拔的指标报告器 + - 详细的资源和 kubernetes api 访问指标 - Fully-customizable [Logging]({{< ref "docs/operations/metrics-logging#logging" >}}) - - Default log configuration - - Per job log configuration - - Sidecar based log forwarders -- Flink Web UI and REST Endpoint Access - - Fully supported Flink Native Kubernetes [service expose types](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui) - - Dynamic [Ingress templates]({{< ref "docs/operations/ingress" >}}) + - 默认日志配置 + - 每个作业日志配置 + - 基于 sidecar 的日志转发器 +- Flink Web UI 和 REST 端点访问 + - 完整支持 Flink 原生 Kubernetes [服务暴露类型](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui) + - 通过 [Ingress 模板]({{< ref "docs/operations/ingress" >}}) 动态暴露服务 - [Helm based installation]({{< ref "docs/operations/helm" >}}) - - Automated [RBAC configuration]({{< ref "docs/operations/rbac" >}}) - - Advanced customization techniques -- Up-to-date public
Re: [PR] [BP-1.18][FLINK-35169][runtime] Recycle buffers to freeSegments before releasing data buffer for sort accumulator [flink]
flinkbot commented on PR #24696: URL: https://github.com/apache/flink/pull/24696#issuecomment-2068485490 ## CI report: * 51874ab3e4d2d2b8b84c22bc1e3e6326dba6839a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-1.18][FLINK-35169][runtime] Recycle buffers to freeSegments before releasing data buffer for sort accumulator [flink]
TanYuxin-tyx opened a new pull request, #24696: URL: https://github.com/apache/flink/pull/24696 ## What is the purpose of the change Backport to Flink 1.18 for https://github.com/apache/flink/pull/24688#event-12553480360 *When using sortBufferAccumulator, we should recycle the buffers to freeSegments before releasing the data buffer. The reason is that when getting buffers from the DataBuffer, it may require more buffers than the current quantity available in freeSegments. Consequently, to ensure adequate buffers from DataBuffer, the flushed and recycled buffers should also be added to freeSegments for reuse.* ## Brief change log - *Reuse the recycled buffers before releasing data buffer for sort accumulator* ## Verifying this change This change is already covered by added tests *SortBufferAccumulatorTest#testReuseRecycledBuffersWhenFlushDataBuffer*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP][FLINK-35169][runtime] Recycle buffers to freeSegments before releasing data buffer for sort accumulator [flink]
flinkbot commented on PR #24695: URL: https://github.com/apache/flink/pull/24695#issuecomment-2068471232 ## CI report: * 4923275887fdcdfe189f070894fd779dc8fe0f46 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP][FLINK-35169][runtime] Recycle buffers to freeSegments before releasing data buffer for sort accumulator [flink]
TanYuxin-tyx opened a new pull request, #24695: URL: https://github.com/apache/flink/pull/24695 ## What is the purpose of the change Backport to Flink 1.19 for https://github.com/apache/flink/pull/24688#event-12553480360 *When using sortBufferAccumulator, we should recycle the buffers to freeSegments before releasing the data buffer. The reason is that when getting buffers from the DataBuffer, it may require more buffers than the current quantity available in freeSegments. Consequently, to ensure adequate buffers from DataBuffer, the flushed and recycled buffers should also be added to freeSegments for reuse.* ## Brief change log - *Reuse the recycled buffers before releasing data buffer for sort accumulator* ## Verifying this change This change is already covered by added tests *SortBufferAccumulatorTest#testReuseRecycledBuffersWhenFlushDataBuffer*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35120][doris] Add Doris integration test cases [flink-cdc]
JNSimba commented on PR #3227: URL: https://github.com/apache/flink-cdc/pull/3227#issuecomment-2068453899 It seems that some codes are duplicated with pr #3222 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34970] Translate architecture documents into Chinese [flink-kubernetes-operator]
caicancai commented on code in PR #809: URL: https://github.com/apache/flink-kubernetes-operator/pull/809#discussion_r1574098275 ## docs/content.zh/docs/concepts/architecture.md: ## @@ -24,57 +24,66 @@ specific language governing permissions and limitations under the License. --> -# Architecture + -Flink Kubernetes Operator (Operator) acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. The Operator can be installed on a Kubernetes cluster using [Helm](https://helm.sh). In most production environments it is typically deployed in a designated namespace and controls Flink deployments in one or more managed namespaces. The custom resource definition (CRD) that describes the schema of a `FlinkDeployment` is a cluster wide resource. For a CRD, the declaration must be registered before any resources of that CRDs kind(s) can be used, and the registration process sometimes takes a few seconds. +# 架构 -{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator Architecture" >}} -> Note: There is no support at this time for [upgrading or deleting CRDs using Helm](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/). +Flink Kubernetes Operator(Operator)充当控制平面,用于管理 Apache Flink 应用程序的完整 deployment 生命周期。可以使用 [Helm](https://helm.sh) 在 Kubernetes 集群上安装 Operator。在大多数生产环境中,它通常部署在指定的命名空间中,并控制一个或多个 Flink 部署到受托管的 namespaces 。描述 `FlinkDeployment` 模式的自定义资源定义(CRD)是一个集群范围的资源。对于 CRD,必须在使用该 CRD 类型的任何资源之前注册声明,注册过程有时需要几秒钟。 -## Control Loop -The Operator follow the Kubernetes principles, notably the [control loop](https://kubernetes.io/docs/concepts/architecture/controller/): +{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator 架构" >}} +> Note: 目前不支持[使用 Helm 升级或删除 CRD](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/). -{{< img src="/img/concepts/control_loop.svg" alt="Control Loop" >}} + -Users can interact with the operator using the Kubernetes command-line tool, [kubectl](https://kubernetes.io/docs/tasks/tools/). The Operator continuously tracks cluster events relating to the `FlinkDeployment` and `FlinkSessionJob` custom resources. When the operator receives a new resource update, it will take action to adjust the Kubernetes cluster to the desired state as part of its reconciliation loop. The initial loop consists of the following high-level steps: +## 控制平面 +Operator 遵循 Kubernetes 原则,特别是 [控制平面](https://kubernetes.io/docs/concepts/architecture/controller/): -1. User submits a `FlinkDeployment`/`FlinkSessionJob` custom resource(CR) using `kubectl` -2. Operator observes the current status of the Flink resource (if previously deployed) -3. Operator validates the submitted resource change -4. Operator reconciles any required changes and executes upgrades +{{< img src="/img/concepts/control_loop.svg" alt="控制循环" >}} -The CR can be (re)applied on the cluster any time. The Operator makes continuous adjustments to imitate the desired state until the current state becomes the desired state. All lifecycle management operations are realized using this very simple principle in the Operator. +用户可以使用 Kubernetes 命令行工具 [kubectl](https://kubernetes.io/docs/tasks/tools/) 与 Operator 进行交互。Operator 不断跟踪与 `FlinkDeployment` 和 `FlinkSessionJob` 自定义资源相关的集群事件。当 Operator 接收到新的资源更新时,它将调整 Kubernetes 集群以达到所需状态,这个调整将作为其协调循环的一部分。初始循环包括以下高级步骤: -The Operator is built with the [Java Operator SDK](https://github.com/java-operator-sdk/java-operator-sdk) and uses the [Native Kubernetes Integration](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/) for launching Flink deployments and submitting jobs under the hood. The Java Operator SDK is a higher level framework and related tooling to support writing Kubernetes Operators in Java. Both the Java Operator SDK and Flink's native kubernetes integration itself is using the [Fabric8 Kubernetes Client](https://github.com/fabric8io/kubernetes-client) to interact with the Kubernetes API Server. +1. 用户使用 `kubectl` 提交 `FlinkDeployment`/`FlinkSessionJob` 自定义资源(CR) +2. Operator 观察 Flink 资源的当前状态(如果先前已部署) +3. Operator 验证提交的资源更改 +4. Operator 协调任何必要的更改并执行升级 -## Flink Resource Lifecycle +CR 可以随时在集群上(重新)应用。Operator 通过不断调整来模拟期望的状态,直到当前状态变为期望的状态。Operator 中的所有生命周期管理操作都是使用这个非常简单的原则实现的。 -The Operator manages the lifecycle of Flink resources. The following chart illustrates the different possible states and transitions: +Operator 使用 [Java Operator SDK](https://github.com/java-operator-sdk/java-operator-sdk) 构建,并使用 [Native Kubernetes Integration](https://nightlies.apache.org /flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/) 用于启动 Flink deployment 并在后台提交作业。 Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment.
Re: [PR] [FLINK-35092][cdc][starrocks] Add starrocks integration test cases [flink-cdc]
banmoy commented on PR #3231: URL: https://github.com/apache/flink-cdc/pull/3231#issuecomment-2068440105 @yuxiqian Thanks for the work. Left some comments. Also cc @lvyanquan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35092][cdc][starrocks] Add starrocks integration test cases [flink-cdc]
banmoy commented on code in PR #3231: URL: https://github.com/apache/flink-cdc/pull/3231#discussion_r1574081602 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/resources/log4j2-test.properties: ## @@ -15,7 +15,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level=OFF +rootLogger.level=INFO Review Comment: set it to `OFF` ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.starrocks.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.FlinkSinkProvider; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer; +import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME; + +/** IT tests for {@link StarRocksDataSink}. */ +public class StarRocksPipelineITCase extends StarRocksSinkTestBase { +private static final StreamExecutionEnvironment env = +StreamExecutionEnvironment.getExecutionEnvironment(); + +@BeforeClass +public static void before() { +env.setParallelism(DEFAULT_PARALLELISM); +env.enableCheckpointing(3000); +env.setRestartStrategy(RestartStrategies.noRestart()); +} + +@Before +public void initializeDatabaseAndTable() { +createDatabase(StarRocksContainer.STARROCKS_DATABASE_NAME); + +LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME); + +createTable( +StarRocksContainer.STARROCKS_DATABASE_NAME, +StarRocksContainer.STARROCKS_TABLE_NAME, +"id", +Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)")); + +LOG.info("Table {} created.", StarRocksContainer.STARROCKS_TABLE_NAME); +} + +@After +public void destroyDatabaseAndTable() { +dropTable( +StarRocksContainer.STARROCKS_DATABASE_NAME, +StarRocksContainer.STARROCKS_TABLE_NAME); + +LOG.info("Table {} destroyed.", StarRocksContainer.STARROCKS_TABLE_NAME); + +dropDatabase(StarRocksContainer.STARROCKS_DATABASE_NAME); + +LOG.info("Database {} destroyed.", StarRocksContainer.STARROCKS_DATABASE_NAME); +} + +private List generateEvents(TableId tableId) { +Schema schema = +Schema.newBuilder() +.column(new PhysicalColumn("id", DataTypes.INT(), null))
Re: [PR] [FLINK-34970] Translate architecture documents into Chinese [flink-kubernetes-operator]
caicancai commented on code in PR #809: URL: https://github.com/apache/flink-kubernetes-operator/pull/809#discussion_r1574091929 ## docs/content.zh/docs/concepts/architecture.md: ## @@ -24,57 +24,66 @@ specific language governing permissions and limitations under the License. --> -# Architecture + -Flink Kubernetes Operator (Operator) acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. The Operator can be installed on a Kubernetes cluster using [Helm](https://helm.sh). In most production environments it is typically deployed in a designated namespace and controls Flink deployments in one or more managed namespaces. The custom resource definition (CRD) that describes the schema of a `FlinkDeployment` is a cluster wide resource. For a CRD, the declaration must be registered before any resources of that CRDs kind(s) can be used, and the registration process sometimes takes a few seconds. +# 架构 -{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator Architecture" >}} -> Note: There is no support at this time for [upgrading or deleting CRDs using Helm](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/). +Flink Kubernetes Operator(Operator)充当控制平面,用于管理 Apache Flink 应用程序的完整 deployment 生命周期。可以使用 [Helm](https://helm.sh) 在 Kubernetes 集群上安装 Operator。在大多数生产环境中,它通常部署在指定的命名空间中,并控制一个或多个 Flink 部署到受托管的 namespaces 。描述 `FlinkDeployment` 模式的自定义资源定义(CRD)是一个集群范围的资源。对于 CRD,必须在使用该 CRD 类型的任何资源之前注册声明,注册过程有时需要几秒钟。 Review Comment: Sorry, I overlooked this, next time I should read it aloud to check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33460][Connector/JDBC] Support property authentication connection. [flink-connector-jdbc]
RocMarshal commented on PR #115: URL: https://github.com/apache/flink-connector-jdbc/pull/115#issuecomment-2068437230 hi, @Jiabao-Sun master, Could you help to have a review if you had the free time ? Thank you~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator
[ https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839464#comment-17839464 ] Shuai Xu commented on FLINK-35184: -- Hi [~rovboyko] , thx for reporting this bug which is caused by the hashcode() in GenericRowData. Could you please give a rough explanation of your solutions before implementing it? > Hash collision inside MiniBatchStreamingJoin operator > - > > Key: FLINK-35184 > URL: https://issues.apache.org/jira/browse/FLINK-35184 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Roman Boyko >Priority: Major > > The hash collision is possible for InputSideHasNoUniqueKeyBundle. To > reproduce it just launch the following test within > StreamingMiniBatchJoinOperatorTest: > > {code:java} > @Tag("miniBatchSize=6") > @Test > public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) > throws Exception { > leftTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id1", "val1"})); > rightTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id2", "val2"})); > leftKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > leftTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > rightKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > rightTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > joinKeyTypeInfo = InternalTypeInfo.of(new IntType()); > super.beforeEach(testInfo); > testHarness.setStateTtlProcessingTime(1); > testHarness.processElement2(insertRecord(1, 1L)); > testHarness.processElement1(insertRecord(1, 4294967296L)); > testHarness.processElement2(insertRecord(1, 4294967296L)); > testHarness.processElement2(deleteRecord(1, 1L)); > testHarness.close(); > assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, > 4294967296L, 1, 4294967296L)); > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35169) Recycle buffers to freeSegments before releasing data buffer for sort accumulator
[ https://issues.apache.org/jira/browse/FLINK-35169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35169: --- Fix Version/s: 1.20.0 > Recycle buffers to freeSegments before releasing data buffer for sort > accumulator > - > > Key: FLINK-35169 > URL: https://issues.apache.org/jira/browse/FLINK-35169 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.20.0 >Reporter: Yuxin Tan >Assignee: Yuxin Tan >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > When using sortBufferAccumulator, we should recycle the buffers to > freeSegments before releasing the data buffer. The reason is that when > getting buffers from the DataBuffer, it may require more buffers than the > current quantity available in freeSegments. Consequently, to ensure adequate > buffers from DataBuffer, the flushed and recycled buffers should also be > added to freeSegments for reuse. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35169) Recycle buffers to freeSegments before releasing data buffer for sort accumulator
[ https://issues.apache.org/jira/browse/FLINK-35169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-35169. -- Resolution: Fixed master via 68a84fd02fb8e288ff7605073f55834468dcf53a. > Recycle buffers to freeSegments before releasing data buffer for sort > accumulator > - > > Key: FLINK-35169 > URL: https://issues.apache.org/jira/browse/FLINK-35169 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.20.0 >Reporter: Yuxin Tan >Assignee: Yuxin Tan >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > When using sortBufferAccumulator, we should recycle the buffers to > freeSegments before releasing the data buffer. The reason is that when > getting buffers from the DataBuffer, it may require more buffers than the > current quantity available in freeSegments. Consequently, to ensure adequate > buffers from DataBuffer, the flushed and recycled buffers should also be > added to freeSegments for reuse. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35169][runtime] Recycle buffers to freeSegments before releasing data buffer for sort accumulator [flink]
reswqa merged PR #24688: URL: https://github.com/apache/flink/pull/24688 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]
ljz2051 commented on code in PR #24681: URL: https://github.com/apache/flink/pull/24681#discussion_r1574080412 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * The writeBatch operation implementation for ForStDB. + * + * @param The type of key in put access request. + * @param The type of value in put access request. + */ +public class ForStWriteBatchOperation implements ForStDBOperation { + +private static final int PER_RECORD_ESTIMATE_BYTES = 100; + +private final RocksDB db; + +private final List> batchRequest; + +private final WriteOptions writeOptions; + +ForStWriteBatchOperation( +RocksDB db, List> batchRequest, WriteOptions writeOptions) { +this.db = db; +this.batchRequest = batchRequest; +this.writeOptions = writeOptions; +} + +@Override +public CompletableFuture process() throws IOException { +CompletableFuture result = new CompletableFuture<>(); +try (WriteBatch writeBatch = +new WriteBatch(batchRequest.size() * PER_RECORD_ESTIMATE_BYTES)) { +for (Request request : batchRequest) { +ForStInnerTable table = request.table; +if (request.value == null) { +// put(key, null) == delete(key) +writeBatch.delete( +table.getColumnFamilyHandle(), table.serializeKey(request.key)); +} else { +writeBatch.put( +table.getColumnFamilyHandle(), +table.serializeKey(request.key), +table.serializeValue(request.value)); +} +} +db.write(writeOptions, writeBatch); +result.complete(null); +} catch (RocksDBException e) { Review Comment: I think it would be better to catch all types of exceptions here, not just `RocksDBException `, and complete the result future exceptionally. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34970] Translate architecture documents into Chinese [flink-kubernetes-operator]
1996fanrui commented on code in PR #809: URL: https://github.com/apache/flink-kubernetes-operator/pull/809#discussion_r1574071090 ## docs/content.zh/docs/concepts/architecture.md: ## @@ -24,57 +24,66 @@ specific language governing permissions and limitations under the License. --> -# Architecture + -Flink Kubernetes Operator (Operator) acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. The Operator can be installed on a Kubernetes cluster using [Helm](https://helm.sh). In most production environments it is typically deployed in a designated namespace and controls Flink deployments in one or more managed namespaces. The custom resource definition (CRD) that describes the schema of a `FlinkDeployment` is a cluster wide resource. For a CRD, the declaration must be registered before any resources of that CRDs kind(s) can be used, and the registration process sometimes takes a few seconds. +# 架构 -{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator Architecture" >}} -> Note: There is no support at this time for [upgrading or deleting CRDs using Helm](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/). +Flink Kubernetes Operator(Operator)充当控制平面,用于管理 Apache Flink 应用程序的完整 deployment 生命周期。可以使用 [Helm](https://helm.sh) 在 Kubernetes 集群上安装 Operator。在大多数生产环境中,它通常部署在指定的命名空间中,并控制一个或多个 Flink 部署到受托管的 namespaces 。描述 `FlinkDeployment` 模式的自定义资源定义(CRD)是一个集群范围的资源。对于 CRD,必须在使用该 CRD 类型的任何资源之前注册声明,注册过程有时需要几秒钟。 Review Comment: 1. `Flink 部署` -> `Flink deployments ` 2. How about translate `schema` to `结构` instead of `模式`? 3. As https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications mentioned, `是一个集群范围的资源`, we don't need `一个` here. https://github.com/apache/flink-kubernetes-operator/assets/38427477/4b5153c8-73b1-4708-9bb4-83255f7d29a6;> ## docs/content.zh/docs/concepts/architecture.md: ## @@ -24,57 +24,65 @@ specific language governing permissions and limitations under the License. --> -# Architecture + -Flink Kubernetes Operator (Operator) acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. The Operator can be installed on a Kubernetes cluster using [Helm](https://helm.sh). In most production environments it is typically deployed in a designated namespace and controls Flink deployments in one or more managed namespaces. The custom resource definition (CRD) that describes the schema of a `FlinkDeployment` is a cluster wide resource. For a CRD, the declaration must be registered before any resources of that CRDs kind(s) can be used, and the registration process sometimes takes a few seconds. +# 架构 -{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator Architecture" >}} -> Note: There is no support at this time for [upgrading or deleting CRDs using Helm](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/). +Flink Kubernetes Operator(Operator)充当控制平面,用于管理 Apache Flink 应用程序的完整部署生命周期。Operator 可以使用 [Helm](https://helm.sh) 在 Kubernetes 集群上安装。在大多数生产环境中,它通常部署在指定的命名空间中,并控制一个或多个受管命名空间中的 Flink 部署。描述 `FlinkDeployment` 模式的自定义资源定义(CRD)是一个集群范围的资源。对于 CRD,必须在使用该 CRD 类型的任何资源之前注册声明,注册过程有时需要几秒钟。 -## Control Loop -The Operator follow the Kubernetes principles, notably the [control loop](https://kubernetes.io/docs/concepts/architecture/controller/): +{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator 架构" >}} +> Note: 目前不支持[使用 Helm 升级或删除 CRD](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/). -{{< img src="/img/concepts/control_loop.svg" alt="Control Loop" >}} + -Users can interact with the operator using the Kubernetes command-line tool, [kubectl](https://kubernetes.io/docs/tasks/tools/). The Operator continuously tracks cluster events relating to the `FlinkDeployment` and `FlinkSessionJob` custom resources. When the operator receives a new resource update, it will take action to adjust the Kubernetes cluster to the desired state as part of its reconciliation loop. The initial loop consists of the following high-level steps: +## 控制平面 +Operator 遵循 Kubernetes 原则,特别是 [控制平面](https://kubernetes.io/docs/concepts/architecture/controller/): -1. User submits a `FlinkDeployment`/`FlinkSessionJob` custom resource(CR) using `kubectl` -2. Operator observes the current status of the Flink resource (if previously deployed) -3. Operator validates the submitted resource change -4. Operator reconciles any required changes and executes upgrades +{{< img src="/img/concepts/control_loop.svg" alt="控制循环" >}} -The CR can be (re)applied on the cluster any time. The Operator makes continuous adjustments to imitate the desired state until the current state becomes the desired state. All lifecycle management operations are realized using this very simple principle in the Operator. +用户可以使用 Kubernetes 命令行工具
Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]
Zakelly commented on PR #24690: URL: https://github.com/apache/flink/pull/24690#issuecomment-2068416578 Rebased master to resolve conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-35158) Error handling in StateFuture's callback
[ https://issues.apache.org/jira/browse/FLINK-35158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanfei Lei reassigned FLINK-35158: -- Assignee: Yanfei Lei > Error handling in StateFuture's callback > > > Key: FLINK-35158 > URL: https://issues.apache.org/jira/browse/FLINK-35158 > Project: Flink > Issue Type: Sub-task >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]
ljz2051 commented on code in PR #24681: URL: https://github.com/apache/flink/pull/24681#discussion_r1574076315 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.rocksdb.RocksDB; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The general-purpose multiGet operation implementation for ForStDB, which simulates multiGet by + * calling the Get API multiple times with multiple threads. + * + * @param The type of key in get access request. + * @param The type of value in get access request. + */ +public class ForStGeneralMultiGetOperation implements ForStDBOperation> { + +private static final Logger LOG = LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class); + +private final RocksDB db; + +private final List> batchRequest; + +private final Executor executor; + +ForStGeneralMultiGetOperation(RocksDB db, List> batchRequest, Executor executor) { +this.db = db; +this.batchRequest = batchRequest; +this.executor = executor; +} + +@Override +public CompletableFuture> process() throws IOException { + +CompletableFuture> future = new CompletableFuture<>(); +@SuppressWarnings("unchecked") +V[] result = (V[]) new Object[batchRequest.size()]; +Arrays.fill(result, null); + +AtomicInteger counter = new AtomicInteger(batchRequest.size()); +for (int i = 0; i < batchRequest.size(); i++) { +Request request = batchRequest.get(i); +final int index = i; +executor.execute( +() -> { +try { +ForStInnerTable table = request.table; +byte[] key = table.serializeKey(request.key); +byte[] value = db.get(table.getColumnFamilyHandle(), key); Review Comment: @jectpro7 Thanks four your advice. The `MultiGet` api based on remote filesystem is not currently supported by ForStDB, and more importantly, not all file systems will support the `MultiGet` api, so the purpose of this PR is to introduce a general `MultiGet` implementation that **works on all file systems**. In addition, I will introduce the ForStDB native `MultiGet` api in another jira ([FLINK-35163](https://issues.apache.org/jira/browse/FLINK-35163)) to optimize remote state access. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35156][Runtime] Make operators of DataStream V2 integrate with async state processing framework [flink]
fredia commented on code in PR #24678: URL: https://github.com/apache/flink/pull/24678#discussion_r1574075445 ## flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java: ## @@ -46,6 +46,12 @@ public ProcessOperator(OneInputStreamProcessFunction userFunction) { chainingStrategy = ChainingStrategy.ALWAYS; } +@Override +public boolean isAsyncStateProcessingEnabled() { +// For normal operator (without keyed context) the async state processing is unused. Review Comment: I was wondering if it would be better to just inherit async on `KeyedProcessOperator`/`KeyedTwoInputNonBroadcastProcessOperator`/`KeyedTwoInputBroadcastProcessOperator`/`KeyedTwoOutputProcessOperator`? BTW, should we offer an `toAsync()` method in `KeyedPartitionStream`? which allows users to set the execution modes of different operators in a fine-grained manner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-35058) Encountered change event for table db.table whose schema isn't known to this connector
[ https://issues.apache.org/jira/browse/FLINK-35058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] MOBIN closed FLINK-35058. - Resolution: Not A Bug > Encountered change event for table db.table whose schema isn't known to this > connector > -- > > Key: FLINK-35058 > URL: https://issues.apache.org/jira/browse/FLINK-35058 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: 1.17.1 >Reporter: MOBIN >Priority: Major > > Flink1.17.1 > flink-cdc:flink-sql-connector-mysql-cdc-2.4.1.jar > {code:java} > CREATE TABLE `test_cdc_timestamp` ( > `id` BIGINT COMMENT '主键id', > > proctime AS PROCTIME(), > PRIMARY KEY(id) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'x', > 'scan.startup.mode' = 'timestamp', > 'scan.startup.timestamp-millis' = '171241920' , > 'port' = '3306', > 'username' = 'xxx', > 'password' = 'xxx', > 'database-name' = 'xxtablename', > 'table-name' = 'xxdatabase', > 'scan.incremental.snapshot.enabled' = 'false', > 'debezium.snapshot.locking.mode' = 'none', > 'server-id' = '5701', > 'server-time-zone' = 'Asia/Shanghai', > 'debezium.skipped.operations' = 'd' > ); {code} > When I use 'scan.startup.mode' = 'latent-offset 'or'initial' to synchronize > data normally, when I use 'scan.startup.mode' = 'timestamp', the following > error is reported > {code:java} > 2024-04-09 11:11:15.619 [debezium-engine] INFO io.debezium.util.Threads - > Requested thread factory for connector MySqlConnector, id = > mysql_binlog_source named = change-event-source-coordinator > 2024-04-09 11:11:15.621 [debezium-engine] INFO io.debezium.util.Threads - > Creating thread > debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator > 2024-04-09 11:11:15.629 > [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] > INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Metrics registered > 2024-04-09 11:11:15.630 > [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] > INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Context created > 2024-04-09 11:11:15.642 > [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] > INFO io.debezium.connector.mysql.MySqlSnapshotChangeEventSource - No > previous offset has been found > 2024-04-09 11:11:15.642 > [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] > INFO io.debezium.connector.mysql.MySqlSnapshotChangeEventSource - According > to the connector configuration only schema will be snapshotted > 2024-04-09 11:11:15.644 > [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] > INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Snapshot ended > with SnapshotResult [status=SKIPPED, offset=null] > 2024-04-09 11:11:15.652 > [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] > INFO io.debezium.util.Threads - Requested thread factory for connector > MySqlConnector, id = mysql_binlog_source named = binlog-client > 2024-04-09 11:11:15.656 > [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] > INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Starting streaming > 2024-04-09 11:11:15.682 > [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] > INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource - GTID set > purged on server: > 0969640a-1d48-11ed-b6cf-28dee561557c:1-27603868993,70958f24-2253-11eb-891d-f875a48ad7b1:1-50323,ec1e6593-2251-11eb-9c18-f875a48ad539:1-25345454762 > 2024-04-09 11:11:15.682 > [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] > INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Skip 0 > events on streaming start > 2024-04-09 11:11:15.682 > [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] > INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Skip 0 > rows on streaming start > 2024-04-09 11:11:15.683 > [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] > INFO io.debezium.util.Threads - Creating thread > debezium-mysqlconnector-mysql_binlog_source-binlog-client > 2024-04-09 11:11:15.686 [blc.mysql.com:3306] INFO > io.debezium.util.Threads - Creating thread > debezium-mysqlconnector-mysql_binlog_source-binlog-client > 2024-04-09 11:11:15.700 [blc.mysql.com:3306] INFO > io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Connected to > MySQL binlog at xxx.mysql.com:3306, starting at MySqlOffsetContext >
[jira] [Commented] (FLINK-35178) Checkpoint CLAIM mode does not fully control snapshot ownership
[ https://issues.apache.org/jira/browse/FLINK-35178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839458#comment-17839458 ] Jinzhong Li commented on FLINK-35178: - [~elon] Thanks for reporting this. For case 1: Let me confirm the job configuration. You configured the parameter "state.checkpoints.create-subdir: true", right? For case 2: This is the expected behavior of the "CLAIM-MODE + incremental checkpoint" for current RocksdbStateBackend. The reason behind this is that the new job will perform incremental-checkpoint based on the old sst-files belonging to previous job. So the checkpoint folder of old job needs to be preserved until the checkpoint of new job no longer references any sst-files of the old job. In this case, i think, the lifecycle of the checkpoint-dir for both old and new jobs should be managed by the fink framework, and it should not be the user's responsibility to delete/cleanup it. > Checkpoint CLAIM mode does not fully control snapshot ownership > --- > > Key: FLINK-35178 > URL: https://issues.apache.org/jira/browse/FLINK-35178 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-20-14-51-21-062.png > > > When I enable incremental checkpointing, and the task fails or is canceled > for some reason, restarting the task from {{-s checkpoint_path}} with > {{restoreMode CLAIM}} allows the Flink job to recover from the last > checkpoint, it just discards the previous checkpoint. > Then I found that this leads to the following two cases: > 1. If the new checkpoint_x meta file does not reference files in the shared > directory under the previous jobID: > the shared and taskowned directories from the previous Job will be left as > empty directories, and these two directories will persist without being > deleted by Flink. !image-2024-04-20-14-51-21-062.png! > 2. If the new checkpoint_x meta file references files in the shared directory > under the previous jobID: > the chk-(x-1) from the previous job will be discarded, but there will still > be state data in the shared directory under that job, which might persist for > a relatively long time. Here arises the question: the previous job is no > longer running, and it's unclear whether users should delete the state data. > Deleting it could lead to errors when the task is restarted, as the meta > might reference files that can no longer be found; this could be confusing > for users. > > The potential solution might be to reuse the previous job's jobID when > restoring from {{{}-s checkpoint_path{}}}, or to add a new parameter that > allows users to specify the jobID they want to recover from; > > Please correct me if there's anything I've misunderstood. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35027) Implement checkpoint drain in AsyncExecutionController
[ https://issues.apache.org/jira/browse/FLINK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanfei Lei resolved FLINK-35027. Resolution: Resolved > Implement checkpoint drain in AsyncExecutionController > -- > > Key: FLINK-35027 > URL: https://issues.apache.org/jira/browse/FLINK-35027 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / Task >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35027][runtime/checkpoint] Implement checkpoint drain in AsyncExecutionController [flink]
fredia merged PR #24676: URL: https://github.com/apache/flink/pull/24676 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35028][runtime] Timer firing under async execution model [flink]
fredia commented on code in PR #24672: URL: https://github.com/apache/flink/pull/24672#discussion_r1574062845 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.function.BiConsumerWithException; + +import static org.apache.flink.runtime.asyncprocessing.KeyAccountingUnit.EMPTY_RECORD; + +/** + * An implementation of {@link InternalTimerService} that is used by {@link + * org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}. + * The timer service will set {@link RecordContext} for the timers before invoking action to + * preserve the execution order between timer firing and records processing. + * + * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Timers>FLIP-425 + * timers section. + * @param Type of timer's key. + * @param Type of the namespace to which timers are scoped. + */ +public class InternalTimerServiceAsyncImpl extends InternalTimerServiceImpl { + +private AsyncExecutionController asyncExecutionController; + +InternalTimerServiceAsyncImpl( +TaskIOMetricGroup taskIOMetricGroup, +KeyGroupRange localKeyGroupRange, +KeyContext keyContext, +ProcessingTimeService processingTimeService, +KeyGroupedInternalPriorityQueue processingTimeTimersQueue, +KeyGroupedInternalPriorityQueue eventTimeTimersQueue, +StreamTaskCancellationContext cancellationContext, +AsyncExecutionController asyncExecutionController) { +super( +taskIOMetricGroup, +localKeyGroupRange, +keyContext, +processingTimeService, +processingTimeTimersQueue, +eventTimeTimersQueue, +cancellationContext); +this.asyncExecutionController = asyncExecutionController; +this.processingTimeCallback = this::onProcessingTime; +} + +private void onProcessingTime(long time) throws Exception { +// null out the timer in case the Triggerable calls registerProcessingTimeTimer() +// inside the callback. +nextTimer = null; + +InternalTimer timer; + +while ((timer = processingTimeTimersQueue.peek()) != null +&& timer.getTimestamp() <= time +&& !cancellationContext.isCancelled()) { +RecordContext recordCtx = +asyncExecutionController.buildContext(EMPTY_RECORD, timer.getKey()); +recordCtx.retain(); +asyncExecutionController.setCurrentContext(recordCtx); +keyContext.setCurrentKey(timer.getKey()); +processingTimeTimersQueue.poll(); +final InternalTimer timerToTrigger = timer; +asyncExecutionController.syncPointRequestWithCallback( +() -> triggerTarget.onProcessingTime(timerToTrigger)); +taskIOMetricGroup.getNumFiredTimers().inc(); +recordCtx.release(); +} + +if (timer != null && nextTimer == null) { +nextTimer = +processingTimeService.registerTimer( +timer.getTimestamp(), this::onProcessingTime); +} +} + +/** + * Advance one watermark, this will fire some event timers. + * + * @param time the time in watermark. + */ +@Override +public void
Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]
RocMarshal commented on code in PR #810: URL: https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1574054494 ## docs/content.zh/docs/concepts/overview.md: ## @@ -24,86 +24,93 @@ specific language governing permissions and limitations under the License. --> -# Overview -Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. Although Flink’s native Kubernetes integration already allows you to directly deploy Flink applications on a running Kubernetes(k8s) cluster, [custom resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) and the [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have also become central to a Kubernetes native deployment experience. + -Flink Kubernetes Operator aims to capture the responsibilities of a human operator who is managing Flink deployments. Human operators have deep knowledge of how Flink deployments ought to behave, how to start clusters, how to deploy jobs, how to upgrade them and how to react if there are problems. The main goal of the operator is the automation of these activities, which cannot be achieved through the Flink native integration alone. +# 概述 +Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 [自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) 和 [operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 Kubernetes 本地部署体验的核心。 Review Comment: ```suggestion Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)和 [operator 模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 Kubernetes 本地部署体验的核心。 ``` ## docs/content.zh/docs/concepts/overview.md: ## @@ -24,86 +24,93 @@ specific language governing permissions and limitations under the License. --> -# Overview -Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. Although Flink’s native Kubernetes integration already allows you to directly deploy Flink applications on a running Kubernetes(k8s) cluster, [custom resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) and the [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have also become central to a Kubernetes native deployment experience. + -Flink Kubernetes Operator aims to capture the responsibilities of a human operator who is managing Flink deployments. Human operators have deep knowledge of how Flink deployments ought to behave, how to start clusters, how to deploy jobs, how to upgrade them and how to react if there are problems. The main goal of the operator is the automation of these activities, which cannot be achieved through the Flink native integration alone. +# 概述 +Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 [自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) 和 [operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 Kubernetes 本地部署体验的核心。 -## Features -### Core +Flink Kubernetes Operator 通过自定义资源定义(CRD)来扩展 Kubernetes API,以便通过本地 k8s 工具(如 kubectl)管理和操作 Flink 部署。Operator 的核心功能包括: + + + +## 特征 + + + +### 核心 - Fully-automated [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" >}}) Review Comment: just a minor sus: why not do translation for these lines ? ## docs/content.zh/docs/concepts/overview.md: ## @@ -24,86 +24,93 @@ specific language governing permissions and limitations under the License. --> -# Overview -Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. Although Flink’s native Kubernetes integration already allows you to directly deploy Flink applications on a running Kubernetes(k8s) cluster, [custom resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) and the [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have also become central to a Kubernetes native deployment experience. + -Flink Kubernetes Operator aims to capture the responsibilities of a human operator who is managing Flink deployments. Human operators have deep knowledge of how Flink deployments ought to behave, how to start clusters, how to deploy jobs, how to upgrade them and how to react if there are problems. The main goal of the operator is the
Re: [PR] [FLINK-35156][Runtime] Make operators of DataStream V2 integrate with async state processing framework [flink]
Zakelly commented on code in PR #24678: URL: https://github.com/apache/flink/pull/24678#discussion_r1574052485 ## flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java: ## @@ -23,15 +23,15 @@ import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** Operator for {@link OneInputStreamProcessFunction}. */ public class ProcessOperator -extends AbstractUdfStreamOperator> +extends AbstractAsyncStateUdfStreamOperator> Review Comment: Thanks for the reminder! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33681] Reuse input/output metrics of SourceOperator/SinkWriterOperator for task [flink]
X-czh commented on PR #23998: URL: https://github.com/apache/flink/pull/23998#issuecomment-2068376358 @affo Thanks! @huwh Could you help take a look at the PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Hotfix] Improve the Readme flink source code compilation document [flink]
flinkbot commented on PR #24694: URL: https://github.com/apache/flink/pull/24694#issuecomment-2068368531 ## CI report: * b5857ed21eeda0a1338a5df27caa1743ea5cf150 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [Hotfix] Building Apache Flink from Source [flink]
caicancai opened a new pull request, #24694: URL: https://github.com/apache/flink/pull/24694 ## What is the purpose of the change ①mvn clean package compilation, using IDEA to run unit tests may fail to build ② https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/flinkdev/building/, the website is also mvn clean install, compile, I think both should stay the same ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][ci] Polish ci concurrency group [flink-cdc]
GOODBOY008 commented on PR #3241: URL: https://github.com/apache/flink-cdc/pull/3241#issuecomment-2068366576 And bump maven-setup action version to avoid warning. https://github.com/apache/flink-cdc/assets/13617900/91047893-f711-4b7c-beac-054ecaee0332;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33584][Filesystems] Update Hadoop Filesystem dependencies to 3.3.6 [flink]
masteryhx commented on PR #23844: URL: https://github.com/apache/flink/pull/23844#issuecomment-2068360164 Hi, just kindly ping. Could we resolve this by the solution metioned by @MartijnVisser ? We tried to use positionable read with ByteBuffer on hadoop to imporve the performance but found it's only supported after 3.3.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35185) Resuming Externalized Checkpoint(rocks, incremental, no parallelism change) end-to-end test failed
[ https://issues.apache.org/jira/browse/FLINK-35185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35185: --- Description: {code:java} pr 21 00:52:11 Running externalized checkpoints test, with ORIGINAL_DOP=2 NEW_DOP=2 and STATE_BACKEND_TYPE=rocks STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true SIMULATE_FAILURE=false ... Apr 21 00:52:20 Job (8a6bda88c7c422823bcf0d6f7a1e8cae) is running. Apr 21 00:52:20 Waiting for job (8a6bda88c7c422823bcf0d6f7a1e8cae) to have at least 1 completed checkpoints ... Apr 21 00:52:20 Waiting for job to process up to 200 records, current progress: 0 records ... Apr 21 00:52:22 Waiting for job to process up to 200 records, current progress: 139 records ... Apr 21 00:52:23 Waiting for job to process up to 200 records, current progress: 196 records ... Apr 21 00:52:26 Cancelling job 8a6bda88c7c422823bcf0d6f7a1e8cae. Apr 21 00:52:27 Cancelled job 8a6bda88c7c422823bcf0d6f7a1e8cae. Apr 21 00:52:27 Restoring job with externalized checkpoint at /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-00385748458/externalized-chckpt-e2e-backend-dir/8a6bda88c7c422823bcf0d6f7a1e8cae/chk-8 ... Apr 21 00:52:32 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. Apr 21 00:52:36 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. Apr 21 00:52:39 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. Apr 21 00:52:42 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. Apr 21 00:52:45 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. Apr 21 00:52:49 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. Apr 21 00:52:52 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. Apr 21 00:52:55 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. Apr 21 00:52:58 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. Apr 21 00:53:01 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. Apr 21 00:53:02 Job (47369cbdc25eea5a9cc6d26c91c4b141) has not started within a timeout of 10 sec Apr 21 00:53:02 Stopping job timeout watchdog (with pid=173454) Apr 21 00:53:02 [FAIL] Test script contains errors. Apr 21 00:53:02 Checking of logs skipped. Apr 21 00:53:02 Apr 21 00:53:02 [FAIL] 'Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) end-to-end test' failed after 1 minutes and 2 seconds! Test exited with exit code 1 {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59046=logs=b31992a1-93b0-59f3-2c17-4a9deb43d11c=36dcb94b-d88d-5832-815b-4b36f2c7af14=3508 > Resuming Externalized Checkpoint(rocks, incremental, no parallelism change) > end-to-end test failed > --- > > Key: FLINK-35185 > URL: https://issues.apache.org/jira/browse/FLINK-35185 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Major > > {code:java} > pr 21 00:52:11 Running externalized checkpoints test, with ORIGINAL_DOP=2 > NEW_DOP=2 and STATE_BACKEND_TYPE=rocks STATE_BACKEND_FILE_ASYNC=true > STATE_BACKEND_ROCKSDB_INCREMENTAL=true SIMULATE_FAILURE=false ... > Apr 21 00:52:20 Job (8a6bda88c7c422823bcf0d6f7a1e8cae) is running. > Apr 21 00:52:20 Waiting for job (8a6bda88c7c422823bcf0d6f7a1e8cae) to have at > least 1 completed checkpoints ... > Apr 21 00:52:20 Waiting for job to process up to 200 records, current > progress: 0 records ... > Apr 21 00:52:22 Waiting for job to process up to 200 records, current > progress: 139 records ... > Apr 21 00:52:23 Waiting for job to process up to 200 records, current > progress: 196 records ... > Apr 21 00:52:26 Cancelling job 8a6bda88c7c422823bcf0d6f7a1e8cae. > Apr 21 00:52:27 Cancelled job 8a6bda88c7c422823bcf0d6f7a1e8cae. > Apr 21 00:52:27 Restoring job with externalized checkpoint at > /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-00385748458/externalized-chckpt-e2e-backend-dir/8a6bda88c7c422823bcf0d6f7a1e8cae/chk-8 > ... > Apr 21 00:52:32 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. > Apr 21 00:52:36 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. > Apr 21 00:52:39 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. > Apr 21 00:52:42 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. > Apr 21 00:52:45 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. > Apr 21 00:52:49 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. > Apr 21 00:52:52 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. > Apr 21 00:52:55 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. > Apr 21 00:52:58 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running. > Apr 21 00:53:01 Job
[jira] [Created] (FLINK-35185) Resuming Externalized Checkpoint(rocks, incremental, no parallelism change) end-to-end test failed
Weijie Guo created FLINK-35185: -- Summary: Resuming Externalized Checkpoint(rocks, incremental, no parallelism change) end-to-end test failed Key: FLINK-35185 URL: https://issues.apache.org/jira/browse/FLINK-35185 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.20.0 Reporter: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35175) HadoopDataInputStream can't compile with Hadoop 3.2.3
[ https://issues.apache.org/jira/browse/FLINK-35175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu resolved FLINK-35175. -- Fix Version/s: 1.20.0 Assignee: Hangxiang Yu Resolution: Fixed merged a4c71c8d into master > HadoopDataInputStream can't compile with Hadoop 3.2.3 > - > > Key: FLINK-35175 > URL: https://issues.apache.org/jira/browse/FLINK-35175 > Project: Flink > Issue Type: Bug >Affects Versions: 1.20.0 >Reporter: Ryan Skraba >Assignee: Hangxiang Yu >Priority: Critical > Labels: pull-request-available > Fix For: 1.20.0 > > > Unfortunately, introduced in FLINK-35045: > [PREADWRITEBUFFER|https://github.com/apache/flink/commit/a312a3bdd258e0ff7d6f94e979b32e2bc762b82f#diff-3ed57be01895ba0f792110e40f4283427c55528f11a5105b4bf34ebd4e6fef0dR182] > was added in Hadoop releases > [3.3.0|https://github.com/apache/hadoop/blob/rel/release-3.3.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72] > and > [2.10.0|https://github.com/apache/hadoop/blob/rel/release-2.10.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72]. > It doesn't exist in flink.hadoop.version > [3.2.3|https://github.com/apache/hadoop/blob/rel/release-3.2.3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java], > which we are using in end-to-end tests. > {code:java} > 00:23:55.093 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile > (default-compile) on project flink-hadoop-fs: Compilation failure: > Compilation failure: > 00:23:55.093 [ERROR] > /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[151,63] > cannot find symbol > 00:23:55.094 [ERROR] symbol: variable READBYTEBUFFER > 00:23:55.094 [ERROR] location: interface > org.apache.hadoop.fs.StreamCapabilities > 00:23:55.094 [ERROR] > /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[182,63] > cannot find symbol > 00:23:55.094 [ERROR] symbol: variable PREADBYTEBUFFER > 00:23:55.094 [ERROR] location: interface > org.apache.hadoop.fs.StreamCapabilities > 00:23:55.094 [ERROR] > /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[183,43] > incompatible types: long cannot be converted to > org.apache.hadoop.io.ByteBufferPool > 00:23:55.094 [ERROR] -> [Help 1] {code} > * 1.20 compile_cron_hadoop313 > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59012=logs=87489130-75dc-54e4-1f45-80c30aa367a3=73da6d75-f30d-5d5a-acbe-487a9dcff678=3630 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35175) HadoopDataInputStream can't compile with Hadoop 3.2.3
[ https://issues.apache.org/jira/browse/FLINK-35175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839447#comment-17839447 ] Weijie Guo commented on FLINK-35175: hadoop313: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59046=logs=b1fcf054-9138-5463-c73c-a49979b9ac2a=9291ac46-dd95-5135-b799-3839e65a8691=3893 > HadoopDataInputStream can't compile with Hadoop 3.2.3 > - > > Key: FLINK-35175 > URL: https://issues.apache.org/jira/browse/FLINK-35175 > Project: Flink > Issue Type: Bug >Affects Versions: 1.20.0 >Reporter: Ryan Skraba >Priority: Critical > Labels: pull-request-available > > Unfortunately, introduced in FLINK-35045: > [PREADWRITEBUFFER|https://github.com/apache/flink/commit/a312a3bdd258e0ff7d6f94e979b32e2bc762b82f#diff-3ed57be01895ba0f792110e40f4283427c55528f11a5105b4bf34ebd4e6fef0dR182] > was added in Hadoop releases > [3.3.0|https://github.com/apache/hadoop/blob/rel/release-3.3.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72] > and > [2.10.0|https://github.com/apache/hadoop/blob/rel/release-2.10.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72]. > It doesn't exist in flink.hadoop.version > [3.2.3|https://github.com/apache/hadoop/blob/rel/release-3.2.3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java], > which we are using in end-to-end tests. > {code:java} > 00:23:55.093 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile > (default-compile) on project flink-hadoop-fs: Compilation failure: > Compilation failure: > 00:23:55.093 [ERROR] > /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[151,63] > cannot find symbol > 00:23:55.094 [ERROR] symbol: variable READBYTEBUFFER > 00:23:55.094 [ERROR] location: interface > org.apache.hadoop.fs.StreamCapabilities > 00:23:55.094 [ERROR] > /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[182,63] > cannot find symbol > 00:23:55.094 [ERROR] symbol: variable PREADBYTEBUFFER > 00:23:55.094 [ERROR] location: interface > org.apache.hadoop.fs.StreamCapabilities > 00:23:55.094 [ERROR] > /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[183,43] > incompatible types: long cannot be converted to > org.apache.hadoop.io.ByteBufferPool > 00:23:55.094 [ERROR] -> [Help 1] {code} > * 1.20 compile_cron_hadoop313 > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59012=logs=87489130-75dc-54e4-1f45-80c30aa367a3=73da6d75-f30d-5d5a-acbe-487a9dcff678=3630 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35175][filesystem] Avoid reading with ByteBuffer on hadoop version below 3.3.0 [flink]
masteryhx closed pull request #24691: [FLINK-35175][filesystem] Avoid reading with ByteBuffer on hadoop version below 3.3.0 URL: https://github.com/apache/flink/pull/24691 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
[ https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839445#comment-17839445 ] Weijie Guo commented on FLINK-35041: test_cron_jdk21 core https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59037=logs=d06b80b4-9e88-5d40-12a2-18072cf60528=609ecd5a-3f6e-5d0c-2239-2096b155a4d0=8911 > IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed > -- > > Key: FLINK-35041 > URL: https://issues.apache.org/jira/browse/FLINK-35041 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Blocker > > {code:java} > Apr 08 03:22:45 03:22:45.450 [ERROR] > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration > -- Time elapsed: 0.034 s <<< FAILURE! > Apr 08 03:22:45 org.opentest4j.AssertionFailedError: > Apr 08 03:22:45 > Apr 08 03:22:45 expected: false > Apr 08 03:22:45 but was: true > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Apr 08 03:22:45 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211) > Apr 08 03:22:45 at java.lang.reflect.Method.invoke(Method.java:498) > Apr 08 03:22:45 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {code} > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35175) HadoopDataInputStream can't compile with Hadoop 3.2.3
[ https://issues.apache.org/jira/browse/FLINK-35175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839444#comment-17839444 ] Weijie Guo commented on FLINK-35175: 1.20 compile_cron_hadoop313 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59037=logs=b1fcf054-9138-5463-c73c-a49979b9ac2a=9291ac46-dd95-5135-b799-3839e65a8691=3641 > HadoopDataInputStream can't compile with Hadoop 3.2.3 > - > > Key: FLINK-35175 > URL: https://issues.apache.org/jira/browse/FLINK-35175 > Project: Flink > Issue Type: Bug >Affects Versions: 1.20.0 >Reporter: Ryan Skraba >Priority: Critical > Labels: pull-request-available > > Unfortunately, introduced in FLINK-35045: > [PREADWRITEBUFFER|https://github.com/apache/flink/commit/a312a3bdd258e0ff7d6f94e979b32e2bc762b82f#diff-3ed57be01895ba0f792110e40f4283427c55528f11a5105b4bf34ebd4e6fef0dR182] > was added in Hadoop releases > [3.3.0|https://github.com/apache/hadoop/blob/rel/release-3.3.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72] > and > [2.10.0|https://github.com/apache/hadoop/blob/rel/release-2.10.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72]. > It doesn't exist in flink.hadoop.version > [3.2.3|https://github.com/apache/hadoop/blob/rel/release-3.2.3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java], > which we are using in end-to-end tests. > {code:java} > 00:23:55.093 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile > (default-compile) on project flink-hadoop-fs: Compilation failure: > Compilation failure: > 00:23:55.093 [ERROR] > /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[151,63] > cannot find symbol > 00:23:55.094 [ERROR] symbol: variable READBYTEBUFFER > 00:23:55.094 [ERROR] location: interface > org.apache.hadoop.fs.StreamCapabilities > 00:23:55.094 [ERROR] > /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[182,63] > cannot find symbol > 00:23:55.094 [ERROR] symbol: variable PREADBYTEBUFFER > 00:23:55.094 [ERROR] location: interface > org.apache.hadoop.fs.StreamCapabilities > 00:23:55.094 [ERROR] > /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[183,43] > incompatible types: long cannot be converted to > org.apache.hadoop.io.ByteBufferPool > 00:23:55.094 [ERROR] -> [Help 1] {code} > * 1.20 compile_cron_hadoop313 > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59012=logs=87489130-75dc-54e4-1f45-80c30aa367a3=73da6d75-f30d-5d5a-acbe-487a9dcff678=3630 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35166) Improve the performance of Hybrid Shuffle when enable memory decoupling
[ https://issues.apache.org/jira/browse/FLINK-35166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuxin Tan reassigned FLINK-35166: - Assignee: Jiang Xin > Improve the performance of Hybrid Shuffle when enable memory decoupling > --- > > Key: FLINK-35166 > URL: https://issues.apache.org/jira/browse/FLINK-35166 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Jiang Xin >Assignee: Jiang Xin >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > Currently, the tiered result partition creates the SortBufferAccumulator with > the number of expected buffers as min(numSubpartitions+1, 512), thus the > SortBufferAccumulator may obtain very few buffers when the parallelism is > small. We can easily make the number of expected buffers 512 by default to > have a better performance when the buffers are sufficient. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]
jectpro7 commented on code in PR #24681: URL: https://github.com/apache/flink/pull/24681#discussion_r1573855255 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.rocksdb.RocksDB; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The general-purpose multiGet operation implementation for ForStDB, which simulates multiGet by + * calling the Get API multiple times with multiple threads. + * + * @param The type of key in get access request. + * @param The type of value in get access request. + */ +public class ForStGeneralMultiGetOperation implements ForStDBOperation> { + +private static final Logger LOG = LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class); + +private final RocksDB db; + +private final List> batchRequest; + +private final Executor executor; + +ForStGeneralMultiGetOperation(RocksDB db, List> batchRequest, Executor executor) { +this.db = db; +this.batchRequest = batchRequest; +this.executor = executor; +} + +@Override +public CompletableFuture> process() throws IOException { + +CompletableFuture> future = new CompletableFuture<>(); +@SuppressWarnings("unchecked") +V[] result = (V[]) new Object[batchRequest.size()]; +Arrays.fill(result, null); + +AtomicInteger counter = new AtomicInteger(batchRequest.size()); +for (int i = 0; i < batchRequest.size(); i++) { +Request request = batchRequest.get(i); +final int index = i; +executor.execute( +() -> { +try { +ForStInnerTable table = request.table; +byte[] key = table.serializeKey(request.key); +byte[] value = db.get(table.getColumnFamilyHandle(), key); Review Comment: Hi @ljz2051, it creates many rpc request here, as FLIP-426 mentioned the rpc round-trip overhead is the bottleneck. It might be better by using `multiGetAsList` ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * The writeBatch operation implementation for ForStDB. + * + * @param The type of key in put access request. + * @param The type of value in put access request. + */ +public class ForStWriteBatchOperation implements ForStDBOperation { + +private static final int PER_RECORD_ESTIMATE_BYTES = 100; + +private final RocksDB db; + +private final List> batchRequest; + +private final WriteOptions writeOptions; + +ForStWriteBatchOperation( +RocksDB db, List>
Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]
caicancai commented on code in PR #810: URL: https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1573750686 ## docs/content.zh/docs/concepts/overview.md: ## @@ -24,86 +24,93 @@ specific language governing permissions and limitations under the License. --> -# Overview -Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. Although Flink’s native Kubernetes integration already allows you to directly deploy Flink applications on a running Kubernetes(k8s) cluster, [custom resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) and the [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have also become central to a Kubernetes native deployment experience. + -Flink Kubernetes Operator aims to capture the responsibilities of a human operator who is managing Flink deployments. Human operators have deep knowledge of how Flink deployments ought to behave, how to start clusters, how to deploy jobs, how to upgrade them and how to react if there are problems. The main goal of the operator is the automation of these activities, which cannot be achieved through the Flink native integration alone. +# 概述 +Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 的原生 Kubernetes 集成已经允许您直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 [自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) 和 [operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 Kubernetes 本地部署体验的核心。 Review Comment: thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34970] Translate architecture documents into Chinese [flink-kubernetes-operator]
caicancai commented on code in PR #809: URL: https://github.com/apache/flink-kubernetes-operator/pull/809#discussion_r1573750776 ## docs/content.zh/docs/concepts/architecture.md: ## @@ -24,57 +24,66 @@ specific language governing permissions and limitations under the License. --> -# Architecture + -Flink Kubernetes Operator (Operator) acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. The Operator can be installed on a Kubernetes cluster using [Helm](https://helm.sh). In most production environments it is typically deployed in a designated namespace and controls Flink deployments in one or more managed namespaces. The custom resource definition (CRD) that describes the schema of a `FlinkDeployment` is a cluster wide resource. For a CRD, the declaration must be registered before any resources of that CRDs kind(s) can be used, and the registration process sometimes takes a few seconds. +# 架构 -{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator Architecture" >}} -> Note: There is no support at this time for [upgrading or deleting CRDs using Helm](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/). +Flink Kubernetes Operator(Operator)充当控制平面,用于管理 Apache Flink 应用程序的完整deployment生命周期。可以使用 [Helm](https://helm.sh) 在 Kubernetes 集群上安装 Operator。在大多数生产环境中,它通常部署在指定的命名空间中,并控制一个或多个Flink 部署到受托管的 namespaces。描述 `FlinkDeployment` 模式的自定义资源定义(CRD)是一个集群范围的资源。对于 CRD,必须在使用该 CRD 类型的任何资源之前注册声明,注册过程有时需要几秒钟。 -## Control Loop -The Operator follow the Kubernetes principles, notably the [control loop](https://kubernetes.io/docs/concepts/architecture/controller/): +{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator 架构" >}} +> Note: 目前不支持[使用 Helm 升级或删除 CRD](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/). -{{< img src="/img/concepts/control_loop.svg" alt="Control Loop" >}} + -Users can interact with the operator using the Kubernetes command-line tool, [kubectl](https://kubernetes.io/docs/tasks/tools/). The Operator continuously tracks cluster events relating to the `FlinkDeployment` and `FlinkSessionJob` custom resources. When the operator receives a new resource update, it will take action to adjust the Kubernetes cluster to the desired state as part of its reconciliation loop. The initial loop consists of the following high-level steps: +## 控制平面 +Operator 遵循 Kubernetes 原则,特别是 [控制平面](https://kubernetes.io/docs/concepts/architecture/controller/): -1. User submits a `FlinkDeployment`/`FlinkSessionJob` custom resource(CR) using `kubectl` -2. Operator observes the current status of the Flink resource (if previously deployed) -3. Operator validates the submitted resource change -4. Operator reconciles any required changes and executes upgrades +{{< img src="/img/concepts/control_loop.svg" alt="控制循环" >}} -The CR can be (re)applied on the cluster any time. The Operator makes continuous adjustments to imitate the desired state until the current state becomes the desired state. All lifecycle management operations are realized using this very simple principle in the Operator. +用户可以使用 Kubernetes 命令行工具 [kubectl](https://kubernetes.io/docs/tasks/tools/) 与Operator进行交互。Operator 不断跟踪与 `FlinkDeployment` 和 `FlinkSessionJob` 自定义资源相关的集群事件。当 Operator 接收到新的资源更新时,它将调整 Kubernetes 集群以达到所需状态,这个调整将作为其协调循环的一部分。初始循环包括以下高级步骤: -The Operator is built with the [Java Operator SDK](https://github.com/java-operator-sdk/java-operator-sdk) and uses the [Native Kubernetes Integration](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/) for launching Flink deployments and submitting jobs under the hood. The Java Operator SDK is a higher level framework and related tooling to support writing Kubernetes Operators in Java. Both the Java Operator SDK and Flink's native kubernetes integration itself is using the [Fabric8 Kubernetes Client](https://github.com/fabric8io/kubernetes-client) to interact with the Kubernetes API Server. +1. 用户使用 `kubectl` 提交 `FlinkDeployment`/`FlinkSessionJob` 自定义资源(CR) +2. Operator 观察 Flink 资源的当前状态(如果先前已部署) +3. Operator 验证提交的资源更改 +4. Operator 协调任何必要的更改并执行升级 -## Flink Resource Lifecycle +CR 可以随时在集群上(重新)应用。Operator 通过不断调整来模拟期望的状态,直到当前状态变为期望的状态。Operator 中的所有生命周期管理操作都是使用这个非常简单的原则实现的。 -The Operator manages the lifecycle of Flink resources. The following chart illustrates the different possible states and transitions: +Operator 使用 [Java Operator SDK](https://github.com/java-operator-sdk/java-operator-sdk) 构建,并使用 [Native Kubernetes Integration](https://nightlies.apache.org /flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/) 用于启动 Flink deployment 并在后台提交作业。 +Java Operator SDK 是一个更高级别的框架和相关工具,用于支持使用 Java 编写 Kubernetes Operator。Java Operator SDK 和 Flink 的原生 kubernetes 集成本身都使用 [Fabric8 Kubernetes 客户端](https://github.com/fabric8io/kubernetes-client) 与
[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator
[ https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839382#comment-17839382 ] Roman Boyko commented on FLINK-35184: - please assign this bug on me, I'm working on it. > Hash collision inside MiniBatchStreamingJoin operator > - > > Key: FLINK-35184 > URL: https://issues.apache.org/jira/browse/FLINK-35184 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Roman Boyko >Priority: Major > > The hash collision is possible for InputSideHasNoUniqueKeyBundle. To > reproduce it just launch the following test within > StreamingMiniBatchJoinOperatorTest: > > {code:java} > @Tag("miniBatchSize=6") > @Test > public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) > throws Exception { > leftTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id1", "val1"})); > rightTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id2", "val2"})); > leftKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > leftTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > rightKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > rightTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > joinKeyTypeInfo = InternalTypeInfo.of(new IntType()); > super.beforeEach(testInfo); > testHarness.setStateTtlProcessingTime(1); > testHarness.processElement2(insertRecord(1, 1L)); > testHarness.processElement1(insertRecord(1, 4294967296L)); > testHarness.processElement2(insertRecord(1, 4294967296L)); > testHarness.processElement2(deleteRecord(1, 1L)); > testHarness.close(); > assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, > 4294967296L, 1, 4294967296L)); > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator
Roman Boyko created FLINK-35184: --- Summary: Hash collision inside MiniBatchStreamingJoin operator Key: FLINK-35184 URL: https://issues.apache.org/jira/browse/FLINK-35184 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.19.0 Reporter: Roman Boyko The hash collision is possible for InputSideHasNoUniqueKeyBundle. To reproduce it just launch the following test within StreamingMiniBatchJoinOperatorTest: {code:java} @Tag("miniBatchSize=6") @Test public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) throws Exception { leftTypeInfo = InternalTypeInfo.of( RowType.of( new LogicalType[] {new IntType(), new BigIntType()}, new String[] {"id1", "val1"})); rightTypeInfo = InternalTypeInfo.of( RowType.of( new LogicalType[] {new IntType(), new BigIntType()}, new String[] {"id2", "val2"})); leftKeySelector = HandwrittenSelectorUtil.getRowDataSelector( new int[] {0}, leftTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])); rightKeySelector = HandwrittenSelectorUtil.getRowDataSelector( new int[] {0}, rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])); joinKeyTypeInfo = InternalTypeInfo.of(new IntType()); super.beforeEach(testInfo); testHarness.setStateTtlProcessingTime(1); testHarness.processElement2(insertRecord(1, 1L)); testHarness.processElement1(insertRecord(1, 4294967296L)); testHarness.processElement2(insertRecord(1, 4294967296L)); testHarness.processElement2(deleteRecord(1, 1L)); testHarness.close(); assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 4294967296L, 1, 4294967296L)); } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)