[PR] [hotfix] Using commons-lang3 for exception checking [flink-kubernetes-operator]

2024-04-21 Thread via GitHub


haoxins opened a new pull request, #818:
URL: https://github.com/apache/flink-kubernetes-operator/pull/818

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request adds a new feature to periodically create 
and maintain savepoints through the `FlinkDeployment` custom resource.)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *Periodic savepoint trigger is introduced to the custom resource*
 - *The operator checks on reconciliation whether the required time has 
passed*
 - *The JobManager's dispose savepoint API is used to clean up obsolete 
savepoints*
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
(yes / no)
 - Core observer or reconciler logic that is regularly executed: (yes / no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-04-21 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839474#comment-17839474
 ] 

Roman Boyko commented on FLINK-34380:
-

Hi [~xuyangzhong] , [~xu_shuai_] !

1) The RowKind can't be fixed in current architecture, because +I and +U are 
separated in different batches in this example. And this would be are bit 
tricky to fix it.

2) But the records order is really incorrect in this example and it can be 
easily fixed - https://github.com/rovboyko/flink/tree/fix/FLINK-34380

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-21 Thread via GitHub


caicancai commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1574122219


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,86 +24,93 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
+
 
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 
[operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 
也已成为 Kubernetes 本地部署体验的核心。
 
-## Features
-### Core
+Flink Kubernetes Operator 通过自定义资源定义(CRD)来扩展 Kubernetes API,以便通过本地 k8s 工具(如 
kubectl)管理和操作 Flink 部署。Operator 的核心功能包括:
+
+
+
+## 特征
+
+
+
+### 核心
 - Fully-automated [Job Lifecycle Management]({{< ref 
"docs/custom-resource/job-management" >}})
-  - Running, suspending and deleting applications
-  - Stateful and stateless application upgrades
-  - Triggering and managing savepoints
-  - Handling errors, rolling-back broken upgrades
+  - 运行、暂停和删除应用程序
+  - 有状态和无状态应用程序升级
+  - 保存点的触发和管理
+  - 任务管理器的扩展和缩减
 - Multiple Flink version support: v1.15, v1.16, v1.17, v1.18
 - [Deployment Modes]({{< ref 
"docs/custom-resource/overview#application-deployments" >}}):
-  - Application cluster
-  - Session cluster
-  - Session job
+  - 应用程序集群
+  - 会话集群
+  - 会话作业
 - Built-in [High 
Availability](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/)
   
 - Extensible framework
   - [Custom validators]({{< ref 
"docs/operations/plugins#custom-flink-resource-validators" >}})
   - [Custom resource listeners]({{< ref 
"docs/operations/plugins#custom-flink-resource-listeners" >}})  
 - Advanced [Configuration]({{< ref "docs/operations/configuration" >}}) 
management
-  - Default configurations with dynamic updates
-  - Per job configuration
-  - Environment variables
+  - 默认配置与动态更新
+  - 作业配置
+  - 任务管理器配置
 - POD augmentation via [Pod Templates]({{< ref 
"docs/custom-resource/pod-template" >}})
-  - Native Kubernetes POD definitions
-  - Layering (Base/JobManager/TaskManager overrides)
+  - 原生Kubernetes POD定义
+  - 用于自定义容器和资源
 - [Job Autoscaler]({{< ref "docs/custom-resource/autoscaler" >}})
-  - Collect lag and utilization metrics
-  - Scale job vertices to the ideal parallelism
-  - Scale up and down as the load changes
-### Operations
+  - 收集延迟和利用率指标
+  - 根据指标自动调整任务管理器数量
+  - 根据负载的变化进行扩展和缩减
+
+
+
+### 运营
 - Operator [Metrics]({{< ref "docs/operations/metrics-logging#metrics" >}})
-  - Utilizes the well-established [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
-  - Pluggable metrics reporters
-  - Detailed resources and kubernetes api access metrics
+  - 使用成熟的 [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
+  - 可插拔的指标报告器
+  - 详细的资源和 kubernetes api 访问指标
 - Fully-customizable [Logging]({{< ref 
"docs/operations/metrics-logging#logging" >}})
-  - Default log configuration
-  - Per job log configuration
-  - Sidecar based log forwarders
-- Flink Web UI and REST Endpoint Access
-  - Fully supported Flink Native Kubernetes [service expose 
types](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui)
-  - Dynamic [Ingress templates]({{< ref "docs/operations/ingress" >}})
+  - 默认日志配置
+  - 每个作业日志配置
+  - 基于 sidecar 的日志转发器
+- Flink Web UI 和 REST 端点访问
+  - 完整支持 Flink 原生 Kubernetes 
[服务暴露类型](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui)
+  - 通过 [Ingress 模板]({{< ref "docs/operations/ingress" >}}) 动态暴露服务
 - [Helm based installation]({{< ref "docs/operations/helm" >}})
-  - Automated [RBAC configuration]({{< ref "docs/operations/rbac" >}})
-  - Advanced customization techniques
-- Up-to-date public 

Re: [PR] [BP-1.18][FLINK-35169][runtime] Recycle buffers to freeSegments before releasing data buffer for sort accumulator [flink]

2024-04-21 Thread via GitHub


flinkbot commented on PR #24696:
URL: https://github.com/apache/flink/pull/24696#issuecomment-2068485490

   
   ## CI report:
   
   * 51874ab3e4d2d2b8b84c22bc1e3e6326dba6839a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-1.18][FLINK-35169][runtime] Recycle buffers to freeSegments before releasing data buffer for sort accumulator [flink]

2024-04-21 Thread via GitHub


TanYuxin-tyx opened a new pull request, #24696:
URL: https://github.com/apache/flink/pull/24696

   
   
   ## What is the purpose of the change
   
   Backport to Flink 1.18 for 
https://github.com/apache/flink/pull/24688#event-12553480360
   
   *When using sortBufferAccumulator, we should recycle the buffers to 
freeSegments before releasing the data buffer. The reason is that when getting 
buffers from the DataBuffer, it may require more buffers than the current 
quantity available in freeSegments. Consequently, to ensure adequate buffers 
from DataBuffer, the flushed and recycled buffers should also be added to 
freeSegments for reuse.*
   
   
   ## Brief change log
   
 - *Reuse the recycled buffers before releasing data buffer for sort 
accumulator*
   
   
   ## Verifying this change
   
   
   This change is already covered by added tests 
*SortBufferAccumulatorTest#testReuseRecycledBuffersWhenFlushDataBuffer*.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP][FLINK-35169][runtime] Recycle buffers to freeSegments before releasing data buffer for sort accumulator [flink]

2024-04-21 Thread via GitHub


flinkbot commented on PR #24695:
URL: https://github.com/apache/flink/pull/24695#issuecomment-2068471232

   
   ## CI report:
   
   * 4923275887fdcdfe189f070894fd779dc8fe0f46 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP][FLINK-35169][runtime] Recycle buffers to freeSegments before releasing data buffer for sort accumulator [flink]

2024-04-21 Thread via GitHub


TanYuxin-tyx opened a new pull request, #24695:
URL: https://github.com/apache/flink/pull/24695

   
   
   ## What is the purpose of the change
   
   Backport to Flink 1.19 for 
https://github.com/apache/flink/pull/24688#event-12553480360
   
   *When using sortBufferAccumulator, we should recycle the buffers to 
freeSegments before releasing the data buffer. The reason is that when getting 
buffers from the DataBuffer, it may require more buffers than the current 
quantity available in freeSegments. Consequently, to ensure adequate buffers 
from DataBuffer, the flushed and recycled buffers should also be added to 
freeSegments for reuse.*
   
   
   ## Brief change log
   
 - *Reuse the recycled buffers before releasing data buffer for sort 
accumulator*
   
   
   ## Verifying this change
   
   
   This change is already covered by added tests 
*SortBufferAccumulatorTest#testReuseRecycledBuffersWhenFlushDataBuffer*.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35120][doris] Add Doris integration test cases [flink-cdc]

2024-04-21 Thread via GitHub


JNSimba commented on PR #3227:
URL: https://github.com/apache/flink-cdc/pull/3227#issuecomment-2068453899

   It seems that some codes are duplicated with pr  #3222


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34970] Translate architecture documents into Chinese [flink-kubernetes-operator]

2024-04-21 Thread via GitHub


caicancai commented on code in PR #809:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/809#discussion_r1574098275


##
docs/content.zh/docs/concepts/architecture.md:
##
@@ -24,57 +24,66 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Architecture
+
 
-Flink Kubernetes Operator (Operator) acts as a control plane to manage the 
complete deployment lifecycle of Apache Flink applications. The Operator can be 
installed on a Kubernetes cluster using [Helm](https://helm.sh). In most 
production environments it is typically deployed in a designated namespace and 
controls Flink deployments in one or more managed namespaces. The custom 
resource definition (CRD) that describes the schema of a `FlinkDeployment` is a 
cluster wide resource. For a CRD, the declaration must be registered before any 
resources of that CRDs kind(s) can be used, and the registration process 
sometimes takes a few seconds.
+# 架构
 
-{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator 
Architecture" >}}
-> Note: There is no support at this time for [upgrading or deleting CRDs using 
Helm](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/).
+Flink Kubernetes Operator(Operator)充当控制平面,用于管理 Apache Flink 应用程序的完整 deployment 
生命周期。可以使用 [Helm](https://helm.sh) 在 Kubernetes 集群上安装 
Operator。在大多数生产环境中,它通常部署在指定的命名空间中,并控制一个或多个 Flink 部署到受托管的 namespaces 。描述 
`FlinkDeployment` 模式的自定义资源定义(CRD)是一个集群范围的资源。对于 CRD,必须在使用该 CRD 
类型的任何资源之前注册声明,注册过程有时需要几秒钟。
 
-## Control Loop
-The Operator follow the Kubernetes principles, notably the [control 
loop](https://kubernetes.io/docs/concepts/architecture/controller/):
+{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator 
架构" >}}
+> Note: 目前不支持[使用 Helm 升级或删除 
CRD](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/).
 
-{{< img src="/img/concepts/control_loop.svg" alt="Control Loop" >}}
+
 
-Users can interact with the operator using the Kubernetes command-line tool, 
[kubectl](https://kubernetes.io/docs/tasks/tools/). The Operator continuously 
tracks cluster events relating to the `FlinkDeployment` and `FlinkSessionJob` 
custom resources. When the operator receives a new resource update, it will 
take action to adjust the Kubernetes cluster to the desired state as part of 
its reconciliation loop. The initial loop consists of the following high-level 
steps:
+## 控制平面
+Operator 遵循 Kubernetes 原则,特别是 
[控制平面](https://kubernetes.io/docs/concepts/architecture/controller/):
 
-1. User submits a `FlinkDeployment`/`FlinkSessionJob` custom resource(CR) 
using `kubectl`
-2. Operator observes the current status of the Flink resource (if previously 
deployed)
-3. Operator validates the submitted resource change
-4. Operator reconciles any required changes and executes upgrades
+{{< img src="/img/concepts/control_loop.svg" alt="控制循环" >}}
 
-The CR can be (re)applied on the cluster any time. The Operator makes 
continuous adjustments to imitate the desired state until the current state 
becomes the desired state. All lifecycle management operations are realized 
using this very simple principle in the Operator.
+用户可以使用 Kubernetes 命令行工具 [kubectl](https://kubernetes.io/docs/tasks/tools/) 与 
Operator 进行交互。Operator 不断跟踪与 `FlinkDeployment` 和 `FlinkSessionJob` 
自定义资源相关的集群事件。当 Operator 接收到新的资源更新时,它将调整 Kubernetes 
集群以达到所需状态,这个调整将作为其协调循环的一部分。初始循环包括以下高级步骤:
 
-The Operator is built with the [Java Operator 
SDK](https://github.com/java-operator-sdk/java-operator-sdk) and uses the 
[Native Kubernetes 
Integration](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/)
 for launching Flink deployments and submitting jobs under the hood. The Java 
Operator SDK is a higher level framework and related tooling to support writing 
Kubernetes Operators in Java. Both the Java Operator SDK and Flink's native 
kubernetes integration itself is using the [Fabric8 Kubernetes 
Client](https://github.com/fabric8io/kubernetes-client) to interact with the 
Kubernetes API Server.
+1. 用户使用 `kubectl` 提交 `FlinkDeployment`/`FlinkSessionJob` 自定义资源(CR)
+2. Operator 观察 Flink 资源的当前状态(如果先前已部署)
+3. Operator 验证提交的资源更改
+4. Operator 协调任何必要的更改并执行升级
 
-## Flink Resource Lifecycle
+CR 可以随时在集群上(重新)应用。Operator 通过不断调整来模拟期望的状态,直到当前状态变为期望的状态。Operator 
中的所有生命周期管理操作都是使用这个非常简单的原则实现的。
 
-The Operator manages the lifecycle of Flink resources. The following chart 
illustrates the different possible states and transitions:
+Operator 使用 [Java Operator 
SDK](https://github.com/java-operator-sdk/java-operator-sdk) 构建,并使用 [Native 
Kubernetes Integration](https://nightlies.apache.org 
/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/) 
用于启动 Flink deployment 并在后台提交作业。

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

Re: [PR] [FLINK-35092][cdc][starrocks] Add starrocks integration test cases [flink-cdc]

2024-04-21 Thread via GitHub


banmoy commented on PR #3231:
URL: https://github.com/apache/flink-cdc/pull/3231#issuecomment-2068440105

   @yuxiqian Thanks for the work. Left some comments.  Also cc @lvyanquan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35092][cdc][starrocks] Add starrocks integration test cases [flink-cdc]

2024-04-21 Thread via GitHub


banmoy commented on code in PR #3231:
URL: https://github.com/apache/flink-cdc/pull/3231#discussion_r1574081602


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/resources/log4j2-test.properties:
##
@@ -15,7 +15,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level=OFF
+rootLogger.level=INFO

Review Comment:
   set it to `OFF`



##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.starrocks.sink;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.PhysicalColumn;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer;
+import 
org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL;
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL;
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD;
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME;
+
+/** IT tests for {@link StarRocksDataSink}. */
+public class StarRocksPipelineITCase extends StarRocksSinkTestBase {
+private static final StreamExecutionEnvironment env =
+StreamExecutionEnvironment.getExecutionEnvironment();
+
+@BeforeClass
+public static void before() {
+env.setParallelism(DEFAULT_PARALLELISM);
+env.enableCheckpointing(3000);
+env.setRestartStrategy(RestartStrategies.noRestart());
+}
+
+@Before
+public void initializeDatabaseAndTable() {
+createDatabase(StarRocksContainer.STARROCKS_DATABASE_NAME);
+
+LOG.info("Database {} created.", 
StarRocksContainer.STARROCKS_DATABASE_NAME);
+
+createTable(
+StarRocksContainer.STARROCKS_DATABASE_NAME,
+StarRocksContainer.STARROCKS_TABLE_NAME,
+"id",
+Arrays.asList("id INT NOT NULL", "number DOUBLE", "name 
VARCHAR(51)"));
+
+LOG.info("Table {} created.", StarRocksContainer.STARROCKS_TABLE_NAME);
+}
+
+@After
+public void destroyDatabaseAndTable() {
+dropTable(
+StarRocksContainer.STARROCKS_DATABASE_NAME,
+StarRocksContainer.STARROCKS_TABLE_NAME);
+
+LOG.info("Table {} destroyed.", 
StarRocksContainer.STARROCKS_TABLE_NAME);
+
+dropDatabase(StarRocksContainer.STARROCKS_DATABASE_NAME);
+
+LOG.info("Database {} destroyed.", 
StarRocksContainer.STARROCKS_DATABASE_NAME);
+}
+
+private List generateEvents(TableId tableId) {
+Schema schema =
+Schema.newBuilder()
+.column(new PhysicalColumn("id", DataTypes.INT(), 
null))

Re: [PR] [FLINK-34970] Translate architecture documents into Chinese [flink-kubernetes-operator]

2024-04-21 Thread via GitHub


caicancai commented on code in PR #809:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/809#discussion_r1574091929


##
docs/content.zh/docs/concepts/architecture.md:
##
@@ -24,57 +24,66 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Architecture
+
 
-Flink Kubernetes Operator (Operator) acts as a control plane to manage the 
complete deployment lifecycle of Apache Flink applications. The Operator can be 
installed on a Kubernetes cluster using [Helm](https://helm.sh). In most 
production environments it is typically deployed in a designated namespace and 
controls Flink deployments in one or more managed namespaces. The custom 
resource definition (CRD) that describes the schema of a `FlinkDeployment` is a 
cluster wide resource. For a CRD, the declaration must be registered before any 
resources of that CRDs kind(s) can be used, and the registration process 
sometimes takes a few seconds.
+# 架构
 
-{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator 
Architecture" >}}
-> Note: There is no support at this time for [upgrading or deleting CRDs using 
Helm](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/).
+Flink Kubernetes Operator(Operator)充当控制平面,用于管理 Apache Flink 应用程序的完整 deployment 
生命周期。可以使用 [Helm](https://helm.sh) 在 Kubernetes 集群上安装 
Operator。在大多数生产环境中,它通常部署在指定的命名空间中,并控制一个或多个 Flink 部署到受托管的 namespaces 。描述 
`FlinkDeployment` 模式的自定义资源定义(CRD)是一个集群范围的资源。对于 CRD,必须在使用该 CRD 
类型的任何资源之前注册声明,注册过程有时需要几秒钟。

Review Comment:
   Sorry, I overlooked this, next time I should read it aloud to check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33460][Connector/JDBC] Support property authentication connection. [flink-connector-jdbc]

2024-04-21 Thread via GitHub


RocMarshal commented on PR #115:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/115#issuecomment-2068437230

   hi, @Jiabao-Sun master, Could you help to have a review if you had the free 
time ?
   Thank you~  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-21 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839464#comment-17839464
 ] 

Shuai Xu commented on FLINK-35184:
--

Hi [~rovboyko] , thx for reporting this bug which is caused by the hashcode() 
in GenericRowData. 
Could you please give a rough explanation of your solutions before implementing 
it?

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35169) Recycle buffers to freeSegments before releasing data buffer for sort accumulator

2024-04-21 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35169:
---
Fix Version/s: 1.20.0

> Recycle buffers to freeSegments before releasing data buffer for sort 
> accumulator
> -
>
> Key: FLINK-35169
> URL: https://issues.apache.org/jira/browse/FLINK-35169
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.20.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> When using sortBufferAccumulator, we should recycle the buffers to 
> freeSegments before releasing the data buffer. The reason is that when 
> getting buffers from the DataBuffer, it may require more buffers than the 
> current quantity available in freeSegments. Consequently, to ensure adequate 
> buffers from DataBuffer, the flushed and recycled buffers should also be 
> added to freeSegments for reuse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35169) Recycle buffers to freeSegments before releasing data buffer for sort accumulator

2024-04-21 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-35169.
--
Resolution: Fixed

master via 68a84fd02fb8e288ff7605073f55834468dcf53a.

> Recycle buffers to freeSegments before releasing data buffer for sort 
> accumulator
> -
>
> Key: FLINK-35169
> URL: https://issues.apache.org/jira/browse/FLINK-35169
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.20.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> When using sortBufferAccumulator, we should recycle the buffers to 
> freeSegments before releasing the data buffer. The reason is that when 
> getting buffers from the DataBuffer, it may require more buffers than the 
> current quantity available in freeSegments. Consequently, to ensure adequate 
> buffers from DataBuffer, the flushed and recycled buffers should also be 
> added to freeSegments for reuse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35169][runtime] Recycle buffers to freeSegments before releasing data buffer for sort accumulator [flink]

2024-04-21 Thread via GitHub


reswqa merged PR #24688:
URL: https://github.com/apache/flink/pull/24688


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-21 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1574080412


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The writeBatch operation implementation for ForStDB.
+ *
+ * @param  The type of key in put access request.
+ * @param  The type of value in put access request.
+ */
+public class ForStWriteBatchOperation implements ForStDBOperation {
+
+private static final int PER_RECORD_ESTIMATE_BYTES = 100;
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final WriteOptions writeOptions;
+
+ForStWriteBatchOperation(
+RocksDB db, List> batchRequest, WriteOptions 
writeOptions) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.writeOptions = writeOptions;
+}
+
+@Override
+public CompletableFuture process() throws IOException {
+CompletableFuture result = new CompletableFuture<>();
+try (WriteBatch writeBatch =
+new WriteBatch(batchRequest.size() * 
PER_RECORD_ESTIMATE_BYTES)) {
+for (Request request : batchRequest) {
+ForStInnerTable table = request.table;
+if (request.value == null) {
+// put(key, null) == delete(key)
+writeBatch.delete(
+table.getColumnFamilyHandle(), 
table.serializeKey(request.key));
+} else {
+writeBatch.put(
+table.getColumnFamilyHandle(),
+table.serializeKey(request.key),
+table.serializeValue(request.value));
+}
+}
+db.write(writeOptions, writeBatch);
+result.complete(null);
+} catch (RocksDBException e) {

Review Comment:
   I think it would be better to catch all types of exceptions here, not just 
`RocksDBException `, and complete the result future exceptionally.  WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34970] Translate architecture documents into Chinese [flink-kubernetes-operator]

2024-04-21 Thread via GitHub


1996fanrui commented on code in PR #809:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/809#discussion_r1574071090


##
docs/content.zh/docs/concepts/architecture.md:
##
@@ -24,57 +24,66 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Architecture
+
 
-Flink Kubernetes Operator (Operator) acts as a control plane to manage the 
complete deployment lifecycle of Apache Flink applications. The Operator can be 
installed on a Kubernetes cluster using [Helm](https://helm.sh). In most 
production environments it is typically deployed in a designated namespace and 
controls Flink deployments in one or more managed namespaces. The custom 
resource definition (CRD) that describes the schema of a `FlinkDeployment` is a 
cluster wide resource. For a CRD, the declaration must be registered before any 
resources of that CRDs kind(s) can be used, and the registration process 
sometimes takes a few seconds.
+# 架构
 
-{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator 
Architecture" >}}
-> Note: There is no support at this time for [upgrading or deleting CRDs using 
Helm](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/).
+Flink Kubernetes Operator(Operator)充当控制平面,用于管理 Apache Flink 应用程序的完整 deployment 
生命周期。可以使用 [Helm](https://helm.sh) 在 Kubernetes 集群上安装 
Operator。在大多数生产环境中,它通常部署在指定的命名空间中,并控制一个或多个 Flink 部署到受托管的 namespaces 。描述 
`FlinkDeployment` 模式的自定义资源定义(CRD)是一个集群范围的资源。对于 CRD,必须在使用该 CRD 
类型的任何资源之前注册声明,注册过程有时需要几秒钟。

Review Comment:
   1. `Flink 部署` -> `Flink deployments `
   2. How about translate `schema` to `结构` instead of `模式`?
   3. As 
https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications
 mentioned, `是一个集群范围的资源`, we don't need `一个` here.
   
   https://github.com/apache/flink-kubernetes-operator/assets/38427477/4b5153c8-73b1-4708-9bb4-83255f7d29a6;>
   



##
docs/content.zh/docs/concepts/architecture.md:
##
@@ -24,57 +24,65 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Architecture
+
 
-Flink Kubernetes Operator (Operator) acts as a control plane to manage the 
complete deployment lifecycle of Apache Flink applications. The Operator can be 
installed on a Kubernetes cluster using [Helm](https://helm.sh). In most 
production environments it is typically deployed in a designated namespace and 
controls Flink deployments in one or more managed namespaces. The custom 
resource definition (CRD) that describes the schema of a `FlinkDeployment` is a 
cluster wide resource. For a CRD, the declaration must be registered before any 
resources of that CRDs kind(s) can be used, and the registration process 
sometimes takes a few seconds.
+# 架构
 
-{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator 
Architecture" >}}
-> Note: There is no support at this time for [upgrading or deleting CRDs using 
Helm](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/).
+Flink Kubernetes Operator(Operator)充当控制平面,用于管理 Apache Flink 
应用程序的完整部署生命周期。Operator 可以使用 [Helm](https://helm.sh) 在 Kubernetes 
集群上安装。在大多数生产环境中,它通常部署在指定的命名空间中,并控制一个或多个受管命名空间中的 Flink 部署。描述 `FlinkDeployment` 
模式的自定义资源定义(CRD)是一个集群范围的资源。对于 CRD,必须在使用该 CRD 类型的任何资源之前注册声明,注册过程有时需要几秒钟。
 
-## Control Loop
-The Operator follow the Kubernetes principles, notably the [control 
loop](https://kubernetes.io/docs/concepts/architecture/controller/):
+{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator 
架构" >}}
+> Note: 目前不支持[使用 Helm 升级或删除 
CRD](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/).
 
-{{< img src="/img/concepts/control_loop.svg" alt="Control Loop" >}}
+
 
-Users can interact with the operator using the Kubernetes command-line tool, 
[kubectl](https://kubernetes.io/docs/tasks/tools/). The Operator continuously 
tracks cluster events relating to the `FlinkDeployment` and `FlinkSessionJob` 
custom resources. When the operator receives a new resource update, it will 
take action to adjust the Kubernetes cluster to the desired state as part of 
its reconciliation loop. The initial loop consists of the following high-level 
steps:
+## 控制平面
+Operator 遵循 Kubernetes 原则,特别是 
[控制平面](https://kubernetes.io/docs/concepts/architecture/controller/):
 
-1. User submits a `FlinkDeployment`/`FlinkSessionJob` custom resource(CR) 
using `kubectl`
-2. Operator observes the current status of the Flink resource (if previously 
deployed)
-3. Operator validates the submitted resource change
-4. Operator reconciles any required changes and executes upgrades
+{{< img src="/img/concepts/control_loop.svg" alt="控制循环" >}}
 
-The CR can be (re)applied on the cluster any time. The Operator makes 
continuous adjustments to imitate the desired state until the current state 
becomes the desired state. All lifecycle management operations are realized 
using this very simple principle in the Operator.
+用户可以使用 Kubernetes 命令行工具 

Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-04-21 Thread via GitHub


Zakelly commented on PR #24690:
URL: https://github.com/apache/flink/pull/24690#issuecomment-2068416578

   Rebased master to resolve conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35158) Error handling in StateFuture's callback

2024-04-21 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei reassigned FLINK-35158:
--

Assignee: Yanfei Lei

> Error handling in StateFuture's callback
> 
>
> Key: FLINK-35158
> URL: https://issues.apache.org/jira/browse/FLINK-35158
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-21 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1574076315


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The general-purpose multiGet operation implementation for ForStDB, which 
simulates multiGet by
+ * calling the Get API multiple times with multiple threads.
+ *
+ * @param  The type of key in get access request.
+ * @param  The type of value in get access request.
+ */
+public class ForStGeneralMultiGetOperation implements 
ForStDBOperation> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class);
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final Executor executor;
+
+ForStGeneralMultiGetOperation(RocksDB db, List> 
batchRequest, Executor executor) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.executor = executor;
+}
+
+@Override
+public CompletableFuture> process() throws IOException {
+
+CompletableFuture> future = new CompletableFuture<>();
+@SuppressWarnings("unchecked")
+V[] result = (V[]) new Object[batchRequest.size()];
+Arrays.fill(result, null);
+
+AtomicInteger counter = new AtomicInteger(batchRequest.size());
+for (int i = 0; i < batchRequest.size(); i++) {
+Request request = batchRequest.get(i);
+final int index = i;
+executor.execute(
+() -> {
+try {
+ForStInnerTable table = request.table;
+byte[] key = table.serializeKey(request.key);
+byte[] value = 
db.get(table.getColumnFamilyHandle(), key);

Review Comment:
   @jectpro7  Thanks four your advice. 
   The `MultiGet` api based on remote filesystem is not currently supported by 
ForStDB, and more importantly, not all file systems will support the `MultiGet` 
api, so the purpose of this PR  is to introduce a general `MultiGet` 
implementation that **works on all file systems**.   
   
   In addition, I will introduce the ForStDB native `MultiGet` api in another 
jira ([FLINK-35163](https://issues.apache.org/jira/browse/FLINK-35163)) to  
optimize remote state access.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35156][Runtime] Make operators of DataStream V2 integrate with async state processing framework [flink]

2024-04-21 Thread via GitHub


fredia commented on code in PR #24678:
URL: https://github.com/apache/flink/pull/24678#discussion_r1574075445


##
flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java:
##
@@ -46,6 +46,12 @@ public ProcessOperator(OneInputStreamProcessFunction userFunction) {
 chainingStrategy = ChainingStrategy.ALWAYS;
 }
 
+@Override
+public boolean isAsyncStateProcessingEnabled() {
+// For normal operator (without keyed context) the async state 
processing is unused.

Review Comment:
   I was wondering if it would be better to just inherit async on 
`KeyedProcessOperator`/`KeyedTwoInputNonBroadcastProcessOperator`/`KeyedTwoInputBroadcastProcessOperator`/`KeyedTwoOutputProcessOperator`?
   
   BTW, should we offer an `toAsync()` method in `KeyedPartitionStream`?
   which allows users to set the execution modes of different operators in a 
fine-grained manner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35058) Encountered change event for table db.table whose schema isn't known to this connector

2024-04-21 Thread MOBIN (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MOBIN closed FLINK-35058.
-
Resolution: Not A Bug

> Encountered change event for table db.table whose schema isn't known to this 
> connector
> --
>
> Key: FLINK-35058
> URL: https://issues.apache.org/jira/browse/FLINK-35058
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: 1.17.1
>Reporter: MOBIN
>Priority: Major
>
> Flink1.17.1
> flink-cdc:flink-sql-connector-mysql-cdc-2.4.1.jar
> {code:java}
> CREATE TABLE `test_cdc_timestamp` (
>   `id` BIGINT COMMENT '主键id',
>    
>    proctime AS PROCTIME(),
>    PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = 'x',
>       'scan.startup.mode' = 'timestamp',
>       'scan.startup.timestamp-millis' = '171241920' ,
>       'port' = '3306',
>       'username' = 'xxx',
>       'password' = 'xxx',
>       'database-name' = 'xxtablename',
>       'table-name' = 'xxdatabase',
>       'scan.incremental.snapshot.enabled' = 'false',
>        'debezium.snapshot.locking.mode' = 'none',
>        'server-id' = '5701',
>      'server-time-zone' = 'Asia/Shanghai',
>     'debezium.skipped.operations' = 'd'
>       ); {code}
> When I use 'scan.startup.mode' = 'latent-offset 'or'initial' to synchronize 
> data normally, when I use 'scan.startup.mode' = 'timestamp', the following 
> error is reported
> {code:java}
> 2024-04-09 11:11:15.619 [debezium-engine] INFO  io.debezium.util.Threads  - 
> Requested thread factory for connector MySqlConnector, id = 
> mysql_binlog_source named = change-event-source-coordinator
> 2024-04-09 11:11:15.621 [debezium-engine] INFO  io.debezium.util.Threads  - 
> Creating thread 
> debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator
> 2024-04-09 11:11:15.629 
> [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] 
> INFO  io.debezium.pipeline.ChangeEventSourceCoordinator  - Metrics registered
> 2024-04-09 11:11:15.630 
> [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] 
> INFO  io.debezium.pipeline.ChangeEventSourceCoordinator  - Context created
> 2024-04-09 11:11:15.642 
> [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] 
> INFO  io.debezium.connector.mysql.MySqlSnapshotChangeEventSource  - No 
> previous offset has been found
> 2024-04-09 11:11:15.642 
> [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] 
> INFO  io.debezium.connector.mysql.MySqlSnapshotChangeEventSource  - According 
> to the connector configuration only schema will be snapshotted
> 2024-04-09 11:11:15.644 
> [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] 
> INFO  io.debezium.pipeline.ChangeEventSourceCoordinator  - Snapshot ended 
> with SnapshotResult [status=SKIPPED, offset=null]
> 2024-04-09 11:11:15.652 
> [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] 
> INFO  io.debezium.util.Threads  - Requested thread factory for connector 
> MySqlConnector, id = mysql_binlog_source named = binlog-client
> 2024-04-09 11:11:15.656 
> [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] 
> INFO  io.debezium.pipeline.ChangeEventSourceCoordinator  - Starting streaming
> 2024-04-09 11:11:15.682 
> [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] 
> INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  - GTID set 
> purged on server: 
> 0969640a-1d48-11ed-b6cf-28dee561557c:1-27603868993,70958f24-2253-11eb-891d-f875a48ad7b1:1-50323,ec1e6593-2251-11eb-9c18-f875a48ad539:1-25345454762
> 2024-04-09 11:11:15.682 
> [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] 
> INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  - Skip 0 
> events on streaming start
> 2024-04-09 11:11:15.682 
> [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] 
> INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  - Skip 0 
> rows on streaming start
> 2024-04-09 11:11:15.683 
> [debezium-mysqlconnector-mysql_binlog_source-change-event-source-coordinator] 
> INFO  io.debezium.util.Threads  - Creating thread 
> debezium-mysqlconnector-mysql_binlog_source-binlog-client
> 2024-04-09 11:11:15.686 [blc.mysql.com:3306] INFO  
> io.debezium.util.Threads  - Creating thread 
> debezium-mysqlconnector-mysql_binlog_source-binlog-client
> 2024-04-09 11:11:15.700 [blc.mysql.com:3306] INFO  
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource  - Connected to 
> MySQL binlog at xxx.mysql.com:3306, starting at MySqlOffsetContext 
> 

[jira] [Commented] (FLINK-35178) Checkpoint CLAIM mode does not fully control snapshot ownership

2024-04-21 Thread Jinzhong Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839458#comment-17839458
 ] 

Jinzhong Li commented on FLINK-35178:
-

[~elon]  Thanks for reporting this.

For case 1: Let me confirm the job configuration.   You configured the 
parameter "state.checkpoints.create-subdir: true", right? 

 

For case 2:  This is the expected behavior of the "CLAIM-MODE + incremental 
checkpoint" for current RocksdbStateBackend.

The reason behind this is that the new job will perform incremental-checkpoint 
based on the old sst-files belonging to previous job. So the checkpoint folder 
of old job needs to be preserved until the checkpoint of new job no longer 
references any sst-files of the old job.

In this case, i think, the lifecycle of the checkpoint-dir for both old and new 
jobs should be managed by the fink framework, and it should not be the user's 
responsibility to delete/cleanup it.

> Checkpoint CLAIM mode does not fully control snapshot ownership
> ---
>
> Key: FLINK-35178
> URL: https://issues.apache.org/jira/browse/FLINK-35178
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-20-14-51-21-062.png
>
>
> When I enable incremental checkpointing, and the task fails or is canceled 
> for some reason, restarting the task from {{-s checkpoint_path}} with 
> {{restoreMode CLAIM}} allows the Flink job to recover from the last 
> checkpoint, it just discards the previous checkpoint.
> Then I found that this leads to the following two cases:
> 1. If the new checkpoint_x meta file does not reference files in the shared 
> directory under the previous jobID:         
> the shared and taskowned directories from the previous Job will be left as 
> empty directories, and these two directories will persist without being 
> deleted by Flink. !image-2024-04-20-14-51-21-062.png!
> 2. If the new checkpoint_x meta file references files in the shared directory 
> under the previous jobID:
> the chk-(x-1) from the previous job will be discarded, but there will still 
> be state data in the shared directory under that job, which might persist for 
> a relatively long time. Here arises the question: the previous job is no 
> longer running, and it's unclear whether users should delete the state data. 
> Deleting it could lead to errors when the task is restarted, as the meta 
> might reference files that can no longer be found; this could be confusing 
> for users.
>  
> The potential solution might be to reuse the previous job's jobID when 
> restoring from {{{}-s checkpoint_path{}}}, or to add a new parameter that 
> allows users to specify the jobID they want to recover from;
>  
> Please correct me if there's anything I've misunderstood.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35027) Implement checkpoint drain in AsyncExecutionController

2024-04-21 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei resolved FLINK-35027.

Resolution: Resolved

> Implement checkpoint drain in AsyncExecutionController
> --
>
> Key: FLINK-35027
> URL: https://issues.apache.org/jira/browse/FLINK-35027
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35027][runtime/checkpoint] Implement checkpoint drain in AsyncExecutionController [flink]

2024-04-21 Thread via GitHub


fredia merged PR #24676:
URL: https://github.com/apache/flink/pull/24676


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35028][runtime] Timer firing under async execution model [flink]

2024-04-21 Thread via GitHub


fredia commented on code in PR #24672:
URL: https://github.com/apache/flink/pull/24672#discussion_r1574062845


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import static 
org.apache.flink.runtime.asyncprocessing.KeyAccountingUnit.EMPTY_RECORD;
+
+/**
+ * An implementation of {@link InternalTimerService} that is used by {@link
+ * 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}.
+ * The timer service will set {@link RecordContext} for the timers before 
invoking action to
+ * preserve the execution order between timer firing and records processing.
+ *
+ * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Timers>FLIP-425
+ * timers section.
+ * @param  Type of timer's key.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+public class InternalTimerServiceAsyncImpl extends 
InternalTimerServiceImpl {
+
+private AsyncExecutionController asyncExecutionController;
+
+InternalTimerServiceAsyncImpl(
+TaskIOMetricGroup taskIOMetricGroup,
+KeyGroupRange localKeyGroupRange,
+KeyContext keyContext,
+ProcessingTimeService processingTimeService,
+KeyGroupedInternalPriorityQueue processingTimeTimersQueue,
+KeyGroupedInternalPriorityQueue eventTimeTimersQueue,
+StreamTaskCancellationContext cancellationContext,
+AsyncExecutionController asyncExecutionController) {
+super(
+taskIOMetricGroup,
+localKeyGroupRange,
+keyContext,
+processingTimeService,
+processingTimeTimersQueue,
+eventTimeTimersQueue,
+cancellationContext);
+this.asyncExecutionController = asyncExecutionController;
+this.processingTimeCallback = this::onProcessingTime;
+}
+
+private void onProcessingTime(long time) throws Exception {
+// null out the timer in case the Triggerable calls 
registerProcessingTimeTimer()
+// inside the callback.
+nextTimer = null;
+
+InternalTimer timer;
+
+while ((timer = processingTimeTimersQueue.peek()) != null
+&& timer.getTimestamp() <= time
+&& !cancellationContext.isCancelled()) {
+RecordContext recordCtx =
+asyncExecutionController.buildContext(EMPTY_RECORD, 
timer.getKey());
+recordCtx.retain();
+asyncExecutionController.setCurrentContext(recordCtx);
+keyContext.setCurrentKey(timer.getKey());
+processingTimeTimersQueue.poll();
+final InternalTimer timerToTrigger = timer;
+asyncExecutionController.syncPointRequestWithCallback(
+() -> triggerTarget.onProcessingTime(timerToTrigger));
+taskIOMetricGroup.getNumFiredTimers().inc();
+recordCtx.release();
+}
+
+if (timer != null && nextTimer == null) {
+nextTimer =
+processingTimeService.registerTimer(
+timer.getTimestamp(), this::onProcessingTime);
+}
+}
+
+/**
+ * Advance one watermark, this will fire some event timers.
+ *
+ * @param time the time in watermark.
+ */
+@Override
+public void 

Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-21 Thread via GitHub


RocMarshal commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1574054494


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,86 +24,93 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
+
 
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 
[operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 
也已成为 Kubernetes 本地部署体验的核心。

Review Comment:
   ```suggestion
   Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 
应用程序,但[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)和
 [operator 模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 
也已成为 Kubernetes 本地部署体验的核心。
   ```



##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,86 +24,93 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
+
 
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 
[operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 
也已成为 Kubernetes 本地部署体验的核心。
 
-## Features
-### Core
+Flink Kubernetes Operator 通过自定义资源定义(CRD)来扩展 Kubernetes API,以便通过本地 k8s 工具(如 
kubectl)管理和操作 Flink 部署。Operator 的核心功能包括:
+
+
+
+## 特征
+
+
+
+### 核心
 - Fully-automated [Job Lifecycle Management]({{< ref 
"docs/custom-resource/job-management" >}})

Review Comment:
   just a minor sus: why not do translation for these lines ?



##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,86 +24,93 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
+
 
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the 

Re: [PR] [FLINK-35156][Runtime] Make operators of DataStream V2 integrate with async state processing framework [flink]

2024-04-21 Thread via GitHub


Zakelly commented on code in PR #24678:
URL: https://github.com/apache/flink/pull/24678#discussion_r1574052485


##
flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java:
##
@@ -23,15 +23,15 @@
 import org.apache.flink.datastream.impl.common.TimestampCollector;
 import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateUdfStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /** Operator for {@link OneInputStreamProcessFunction}. */
 public class ProcessOperator
-extends AbstractUdfStreamOperator>
+extends AbstractAsyncStateUdfStreamOperator>

Review Comment:
   Thanks for the reminder!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33681] Reuse input/output metrics of SourceOperator/SinkWriterOperator for task [flink]

2024-04-21 Thread via GitHub


X-czh commented on PR #23998:
URL: https://github.com/apache/flink/pull/23998#issuecomment-2068376358

   @affo Thanks! @huwh Could you help take a look at the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Hotfix] Improve the Readme flink source code compilation document [flink]

2024-04-21 Thread via GitHub


flinkbot commented on PR #24694:
URL: https://github.com/apache/flink/pull/24694#issuecomment-2068368531

   
   ## CI report:
   
   * b5857ed21eeda0a1338a5df27caa1743ea5cf150 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [Hotfix] Building Apache Flink from Source [flink]

2024-04-21 Thread via GitHub


caicancai opened a new pull request, #24694:
URL: https://github.com/apache/flink/pull/24694

   
   
   ## What is the purpose of the change
   
   ①mvn clean package compilation, using IDEA to run unit tests may fail to 
build
   ② 
https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/flinkdev/building/,
 the website is also mvn clean install, compile, I think both should stay the 
same
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][ci] Polish ci concurrency group [flink-cdc]

2024-04-21 Thread via GitHub


GOODBOY008 commented on PR #3241:
URL: https://github.com/apache/flink-cdc/pull/3241#issuecomment-2068366576

   And bump maven-setup action version to avoid warning.
   https://github.com/apache/flink-cdc/assets/13617900/91047893-f711-4b7c-beac-054ecaee0332;>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33584][Filesystems] Update Hadoop Filesystem dependencies to 3.3.6 [flink]

2024-04-21 Thread via GitHub


masteryhx commented on PR #23844:
URL: https://github.com/apache/flink/pull/23844#issuecomment-2068360164

   Hi, just kindly ping. Could we resolve this by the solution metioned by 
@MartijnVisser ?
   We tried to use positionable read with ByteBuffer on hadoop to imporve the 
performance but found it's only supported after 3.3.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35185) Resuming Externalized Checkpoint(rocks, incremental, no parallelism change) end-to-end test failed

2024-04-21 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35185:
---
Description: 

{code:java}
pr 21 00:52:11 Running externalized checkpoints test,   with ORIGINAL_DOP=2 
NEW_DOP=2   and STATE_BACKEND_TYPE=rocks STATE_BACKEND_FILE_ASYNC=true   
STATE_BACKEND_ROCKSDB_INCREMENTAL=true SIMULATE_FAILURE=false ...
Apr 21 00:52:20 Job (8a6bda88c7c422823bcf0d6f7a1e8cae) is running.
Apr 21 00:52:20 Waiting for job (8a6bda88c7c422823bcf0d6f7a1e8cae) to have at 
least 1 completed checkpoints ...
Apr 21 00:52:20 Waiting for job to process up to 200 records, current progress: 
0 records ...
Apr 21 00:52:22 Waiting for job to process up to 200 records, current progress: 
139 records ...
Apr 21 00:52:23 Waiting for job to process up to 200 records, current progress: 
196 records ...
Apr 21 00:52:26 Cancelling job 8a6bda88c7c422823bcf0d6f7a1e8cae.
Apr 21 00:52:27 Cancelled job 8a6bda88c7c422823bcf0d6f7a1e8cae.
Apr 21 00:52:27 Restoring job with externalized checkpoint at 
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-00385748458/externalized-chckpt-e2e-backend-dir/8a6bda88c7c422823bcf0d6f7a1e8cae/chk-8
 ...
Apr 21 00:52:32 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
Apr 21 00:52:36 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
Apr 21 00:52:39 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
Apr 21 00:52:42 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
Apr 21 00:52:45 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
Apr 21 00:52:49 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
Apr 21 00:52:52 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
Apr 21 00:52:55 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
Apr 21 00:52:58 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
Apr 21 00:53:01 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
Apr 21 00:53:02 Job (47369cbdc25eea5a9cc6d26c91c4b141) has not started within a 
timeout of 10 sec
Apr 21 00:53:02 Stopping job timeout watchdog (with pid=173454)
Apr 21 00:53:02 [FAIL] Test script contains errors.
Apr 21 00:53:02 Checking of logs skipped.
Apr 21 00:53:02 
Apr 21 00:53:02 [FAIL] 'Resuming Externalized Checkpoint (rocks, incremental, 
no parallelism change) end-to-end test' failed after 1 minutes and 2 seconds! 
Test exited with exit code 1

{code}

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59046=logs=b31992a1-93b0-59f3-2c17-4a9deb43d11c=36dcb94b-d88d-5832-815b-4b36f2c7af14=3508


> Resuming Externalized Checkpoint(rocks, incremental, no parallelism change) 
> end-to-end test failed 
> ---
>
> Key: FLINK-35185
> URL: https://issues.apache.org/jira/browse/FLINK-35185
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>
> {code:java}
> pr 21 00:52:11 Running externalized checkpoints test,   with ORIGINAL_DOP=2 
> NEW_DOP=2   and STATE_BACKEND_TYPE=rocks STATE_BACKEND_FILE_ASYNC=true   
> STATE_BACKEND_ROCKSDB_INCREMENTAL=true SIMULATE_FAILURE=false ...
> Apr 21 00:52:20 Job (8a6bda88c7c422823bcf0d6f7a1e8cae) is running.
> Apr 21 00:52:20 Waiting for job (8a6bda88c7c422823bcf0d6f7a1e8cae) to have at 
> least 1 completed checkpoints ...
> Apr 21 00:52:20 Waiting for job to process up to 200 records, current 
> progress: 0 records ...
> Apr 21 00:52:22 Waiting for job to process up to 200 records, current 
> progress: 139 records ...
> Apr 21 00:52:23 Waiting for job to process up to 200 records, current 
> progress: 196 records ...
> Apr 21 00:52:26 Cancelling job 8a6bda88c7c422823bcf0d6f7a1e8cae.
> Apr 21 00:52:27 Cancelled job 8a6bda88c7c422823bcf0d6f7a1e8cae.
> Apr 21 00:52:27 Restoring job with externalized checkpoint at 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-00385748458/externalized-chckpt-e2e-backend-dir/8a6bda88c7c422823bcf0d6f7a1e8cae/chk-8
>  ...
> Apr 21 00:52:32 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
> Apr 21 00:52:36 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
> Apr 21 00:52:39 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
> Apr 21 00:52:42 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
> Apr 21 00:52:45 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
> Apr 21 00:52:49 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
> Apr 21 00:52:52 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
> Apr 21 00:52:55 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
> Apr 21 00:52:58 Job (47369cbdc25eea5a9cc6d26c91c4b141) is not yet running.
> Apr 21 00:53:01 Job 

[jira] [Created] (FLINK-35185) Resuming Externalized Checkpoint(rocks, incremental, no parallelism change) end-to-end test failed

2024-04-21 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35185:
--

 Summary: Resuming Externalized Checkpoint(rocks, incremental, no 
parallelism change) end-to-end test failed 
 Key: FLINK-35185
 URL: https://issues.apache.org/jira/browse/FLINK-35185
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35175) HadoopDataInputStream can't compile with Hadoop 3.2.3

2024-04-21 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-35175.
--
Fix Version/s: 1.20.0
 Assignee: Hangxiang Yu
   Resolution: Fixed

merged a4c71c8d into master

> HadoopDataInputStream can't compile with Hadoop 3.2.3
> -
>
> Key: FLINK-35175
> URL: https://issues.apache.org/jira/browse/FLINK-35175
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Assignee: Hangxiang Yu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Unfortunately, introduced in FLINK-35045: 
> [PREADWRITEBUFFER|https://github.com/apache/flink/commit/a312a3bdd258e0ff7d6f94e979b32e2bc762b82f#diff-3ed57be01895ba0f792110e40f4283427c55528f11a5105b4bf34ebd4e6fef0dR182]
>  was added in Hadoop releases 
> [3.3.0|https://github.com/apache/hadoop/blob/rel/release-3.3.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72]
>  and 
> [2.10.0|https://github.com/apache/hadoop/blob/rel/release-2.10.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72].
> It doesn't exist in flink.hadoop.version 
> [3.2.3|https://github.com/apache/hadoop/blob/rel/release-3.2.3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java],
>  which we are using in end-to-end tests.
> {code:java}
> 00:23:55.093 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile 
> (default-compile) on project flink-hadoop-fs: Compilation failure: 
> Compilation failure: 
> 00:23:55.093 [ERROR] 
> /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[151,63]
>  cannot find symbol
> 00:23:55.094 [ERROR]   symbol:   variable READBYTEBUFFER
> 00:23:55.094 [ERROR]   location: interface 
> org.apache.hadoop.fs.StreamCapabilities
> 00:23:55.094 [ERROR] 
> /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[182,63]
>  cannot find symbol
> 00:23:55.094 [ERROR]   symbol:   variable PREADBYTEBUFFER
> 00:23:55.094 [ERROR]   location: interface 
> org.apache.hadoop.fs.StreamCapabilities
> 00:23:55.094 [ERROR] 
> /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[183,43]
>  incompatible types: long cannot be converted to 
> org.apache.hadoop.io.ByteBufferPool
> 00:23:55.094 [ERROR] -> [Help 1] {code}
> * 1.20 compile_cron_hadoop313 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59012=logs=87489130-75dc-54e4-1f45-80c30aa367a3=73da6d75-f30d-5d5a-acbe-487a9dcff678=3630



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35175) HadoopDataInputStream can't compile with Hadoop 3.2.3

2024-04-21 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839447#comment-17839447
 ] 

Weijie Guo commented on FLINK-35175:


hadoop313:

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59046=logs=b1fcf054-9138-5463-c73c-a49979b9ac2a=9291ac46-dd95-5135-b799-3839e65a8691=3893

> HadoopDataInputStream can't compile with Hadoop 3.2.3
> -
>
> Key: FLINK-35175
> URL: https://issues.apache.org/jira/browse/FLINK-35175
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: pull-request-available
>
> Unfortunately, introduced in FLINK-35045: 
> [PREADWRITEBUFFER|https://github.com/apache/flink/commit/a312a3bdd258e0ff7d6f94e979b32e2bc762b82f#diff-3ed57be01895ba0f792110e40f4283427c55528f11a5105b4bf34ebd4e6fef0dR182]
>  was added in Hadoop releases 
> [3.3.0|https://github.com/apache/hadoop/blob/rel/release-3.3.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72]
>  and 
> [2.10.0|https://github.com/apache/hadoop/blob/rel/release-2.10.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72].
> It doesn't exist in flink.hadoop.version 
> [3.2.3|https://github.com/apache/hadoop/blob/rel/release-3.2.3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java],
>  which we are using in end-to-end tests.
> {code:java}
> 00:23:55.093 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile 
> (default-compile) on project flink-hadoop-fs: Compilation failure: 
> Compilation failure: 
> 00:23:55.093 [ERROR] 
> /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[151,63]
>  cannot find symbol
> 00:23:55.094 [ERROR]   symbol:   variable READBYTEBUFFER
> 00:23:55.094 [ERROR]   location: interface 
> org.apache.hadoop.fs.StreamCapabilities
> 00:23:55.094 [ERROR] 
> /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[182,63]
>  cannot find symbol
> 00:23:55.094 [ERROR]   symbol:   variable PREADBYTEBUFFER
> 00:23:55.094 [ERROR]   location: interface 
> org.apache.hadoop.fs.StreamCapabilities
> 00:23:55.094 [ERROR] 
> /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[183,43]
>  incompatible types: long cannot be converted to 
> org.apache.hadoop.io.ByteBufferPool
> 00:23:55.094 [ERROR] -> [Help 1] {code}
> * 1.20 compile_cron_hadoop313 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59012=logs=87489130-75dc-54e4-1f45-80c30aa367a3=73da6d75-f30d-5d5a-acbe-487a9dcff678=3630



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35175][filesystem] Avoid reading with ByteBuffer on hadoop version below 3.3.0 [flink]

2024-04-21 Thread via GitHub


masteryhx closed pull request #24691: [FLINK-35175][filesystem] Avoid reading 
with ByteBuffer on  hadoop version below 3.3.0
URL: https://github.com/apache/flink/pull/24691


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-04-21 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839445#comment-17839445
 ] 

Weijie Guo commented on FLINK-35041:


test_cron_jdk21 core

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59037=logs=d06b80b4-9e88-5d40-12a2-18072cf60528=609ecd5a-3f6e-5d0c-2239-2096b155a4d0=8911

> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Blocker
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35175) HadoopDataInputStream can't compile with Hadoop 3.2.3

2024-04-21 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839444#comment-17839444
 ] 

Weijie Guo commented on FLINK-35175:


1.20 compile_cron_hadoop313

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59037=logs=b1fcf054-9138-5463-c73c-a49979b9ac2a=9291ac46-dd95-5135-b799-3839e65a8691=3641

> HadoopDataInputStream can't compile with Hadoop 3.2.3
> -
>
> Key: FLINK-35175
> URL: https://issues.apache.org/jira/browse/FLINK-35175
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: pull-request-available
>
> Unfortunately, introduced in FLINK-35045: 
> [PREADWRITEBUFFER|https://github.com/apache/flink/commit/a312a3bdd258e0ff7d6f94e979b32e2bc762b82f#diff-3ed57be01895ba0f792110e40f4283427c55528f11a5105b4bf34ebd4e6fef0dR182]
>  was added in Hadoop releases 
> [3.3.0|https://github.com/apache/hadoop/blob/rel/release-3.3.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72]
>  and 
> [2.10.0|https://github.com/apache/hadoop/blob/rel/release-2.10.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72].
> It doesn't exist in flink.hadoop.version 
> [3.2.3|https://github.com/apache/hadoop/blob/rel/release-3.2.3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java],
>  which we are using in end-to-end tests.
> {code:java}
> 00:23:55.093 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile 
> (default-compile) on project flink-hadoop-fs: Compilation failure: 
> Compilation failure: 
> 00:23:55.093 [ERROR] 
> /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[151,63]
>  cannot find symbol
> 00:23:55.094 [ERROR]   symbol:   variable READBYTEBUFFER
> 00:23:55.094 [ERROR]   location: interface 
> org.apache.hadoop.fs.StreamCapabilities
> 00:23:55.094 [ERROR] 
> /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[182,63]
>  cannot find symbol
> 00:23:55.094 [ERROR]   symbol:   variable PREADBYTEBUFFER
> 00:23:55.094 [ERROR]   location: interface 
> org.apache.hadoop.fs.StreamCapabilities
> 00:23:55.094 [ERROR] 
> /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[183,43]
>  incompatible types: long cannot be converted to 
> org.apache.hadoop.io.ByteBufferPool
> 00:23:55.094 [ERROR] -> [Help 1] {code}
> * 1.20 compile_cron_hadoop313 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59012=logs=87489130-75dc-54e4-1f45-80c30aa367a3=73da6d75-f30d-5d5a-acbe-487a9dcff678=3630



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35166) Improve the performance of Hybrid Shuffle when enable memory decoupling

2024-04-21 Thread Yuxin Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuxin Tan reassigned FLINK-35166:
-

Assignee: Jiang Xin

> Improve the performance of Hybrid Shuffle when enable memory decoupling
> ---
>
> Key: FLINK-35166
> URL: https://issues.apache.org/jira/browse/FLINK-35166
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Jiang Xin
>Assignee: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Currently, the tiered result partition creates the SortBufferAccumulator with 
> the number of expected buffers as min(numSubpartitions+1, 512), thus the 
> SortBufferAccumulator may obtain very few buffers when the parallelism is 
> small. We can easily make the number of expected buffers 512 by default to 
> have a better performance when the buffers are sufficient.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-21 Thread via GitHub


jectpro7 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1573855255


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The general-purpose multiGet operation implementation for ForStDB, which 
simulates multiGet by
+ * calling the Get API multiple times with multiple threads.
+ *
+ * @param  The type of key in get access request.
+ * @param  The type of value in get access request.
+ */
+public class ForStGeneralMultiGetOperation implements 
ForStDBOperation> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class);
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final Executor executor;
+
+ForStGeneralMultiGetOperation(RocksDB db, List> 
batchRequest, Executor executor) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.executor = executor;
+}
+
+@Override
+public CompletableFuture> process() throws IOException {
+
+CompletableFuture> future = new CompletableFuture<>();
+@SuppressWarnings("unchecked")
+V[] result = (V[]) new Object[batchRequest.size()];
+Arrays.fill(result, null);
+
+AtomicInteger counter = new AtomicInteger(batchRequest.size());
+for (int i = 0; i < batchRequest.size(); i++) {
+Request request = batchRequest.get(i);
+final int index = i;
+executor.execute(
+() -> {
+try {
+ForStInnerTable table = request.table;
+byte[] key = table.serializeKey(request.key);
+byte[] value = 
db.get(table.getColumnFamilyHandle(), key);

Review Comment:
   Hi @ljz2051, it creates many rpc request here, as FLIP-426 mentioned the rpc 
round-trip overhead is the bottleneck. It might be better by using 
`multiGetAsList`



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The writeBatch operation implementation for ForStDB.
+ *
+ * @param  The type of key in put access request.
+ * @param  The type of value in put access request.
+ */
+public class ForStWriteBatchOperation implements ForStDBOperation {
+
+private static final int PER_RECORD_ESTIMATE_BYTES = 100;
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final WriteOptions writeOptions;
+
+ForStWriteBatchOperation(
+RocksDB db, List> 

Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-21 Thread via GitHub


caicancai commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1573750686


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,86 +24,93 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
+
 
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许您直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 
[operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 
也已成为 Kubernetes 本地部署体验的核心。

Review Comment:
   thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34970] Translate architecture documents into Chinese [flink-kubernetes-operator]

2024-04-21 Thread via GitHub


caicancai commented on code in PR #809:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/809#discussion_r1573750776


##
docs/content.zh/docs/concepts/architecture.md:
##
@@ -24,57 +24,66 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Architecture
+
 
-Flink Kubernetes Operator (Operator) acts as a control plane to manage the 
complete deployment lifecycle of Apache Flink applications. The Operator can be 
installed on a Kubernetes cluster using [Helm](https://helm.sh). In most 
production environments it is typically deployed in a designated namespace and 
controls Flink deployments in one or more managed namespaces. The custom 
resource definition (CRD) that describes the schema of a `FlinkDeployment` is a 
cluster wide resource. For a CRD, the declaration must be registered before any 
resources of that CRDs kind(s) can be used, and the registration process 
sometimes takes a few seconds.
+# 架构
 
-{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator 
Architecture" >}}
-> Note: There is no support at this time for [upgrading or deleting CRDs using 
Helm](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/).
+Flink Kubernetes Operator(Operator)充当控制平面,用于管理 Apache Flink 
应用程序的完整deployment生命周期。可以使用 [Helm](https://helm.sh) 在 Kubernetes 集群上安装 
Operator。在大多数生产环境中,它通常部署在指定的命名空间中,并控制一个或多个Flink 部署到受托管的 namespaces。描述 
`FlinkDeployment` 模式的自定义资源定义(CRD)是一个集群范围的资源。对于 CRD,必须在使用该 CRD 
类型的任何资源之前注册声明,注册过程有时需要几秒钟。
 
-## Control Loop
-The Operator follow the Kubernetes principles, notably the [control 
loop](https://kubernetes.io/docs/concepts/architecture/controller/):
+{{< img src="/img/concepts/architecture.svg" alt="Flink Kubernetes Operator 
架构" >}}
+> Note: 目前不支持[使用 Helm 升级或删除 
CRD](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/).
 
-{{< img src="/img/concepts/control_loop.svg" alt="Control Loop" >}}
+
 
-Users can interact with the operator using the Kubernetes command-line tool, 
[kubectl](https://kubernetes.io/docs/tasks/tools/). The Operator continuously 
tracks cluster events relating to the `FlinkDeployment` and `FlinkSessionJob` 
custom resources. When the operator receives a new resource update, it will 
take action to adjust the Kubernetes cluster to the desired state as part of 
its reconciliation loop. The initial loop consists of the following high-level 
steps:
+## 控制平面
+Operator 遵循 Kubernetes 原则,特别是 
[控制平面](https://kubernetes.io/docs/concepts/architecture/controller/):
 
-1. User submits a `FlinkDeployment`/`FlinkSessionJob` custom resource(CR) 
using `kubectl`
-2. Operator observes the current status of the Flink resource (if previously 
deployed)
-3. Operator validates the submitted resource change
-4. Operator reconciles any required changes and executes upgrades
+{{< img src="/img/concepts/control_loop.svg" alt="控制循环" >}}
 
-The CR can be (re)applied on the cluster any time. The Operator makes 
continuous adjustments to imitate the desired state until the current state 
becomes the desired state. All lifecycle management operations are realized 
using this very simple principle in the Operator.
+用户可以使用 Kubernetes 命令行工具 [kubectl](https://kubernetes.io/docs/tasks/tools/) 
与Operator进行交互。Operator 不断跟踪与 `FlinkDeployment` 和 `FlinkSessionJob` 
自定义资源相关的集群事件。当 Operator 接收到新的资源更新时,它将调整 Kubernetes 
集群以达到所需状态,这个调整将作为其协调循环的一部分。初始循环包括以下高级步骤:
 
-The Operator is built with the [Java Operator 
SDK](https://github.com/java-operator-sdk/java-operator-sdk) and uses the 
[Native Kubernetes 
Integration](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/)
 for launching Flink deployments and submitting jobs under the hood. The Java 
Operator SDK is a higher level framework and related tooling to support writing 
Kubernetes Operators in Java. Both the Java Operator SDK and Flink's native 
kubernetes integration itself is using the [Fabric8 Kubernetes 
Client](https://github.com/fabric8io/kubernetes-client) to interact with the 
Kubernetes API Server.
+1. 用户使用 `kubectl` 提交 `FlinkDeployment`/`FlinkSessionJob` 自定义资源(CR)
+2. Operator 观察 Flink 资源的当前状态(如果先前已部署)
+3. Operator 验证提交的资源更改
+4. Operator 协调任何必要的更改并执行升级
 
-## Flink Resource Lifecycle
+CR 可以随时在集群上(重新)应用。Operator 通过不断调整来模拟期望的状态,直到当前状态变为期望的状态。Operator 
中的所有生命周期管理操作都是使用这个非常简单的原则实现的。
 
-The Operator manages the lifecycle of Flink resources. The following chart 
illustrates the different possible states and transitions:
+Operator 使用 [Java Operator 
SDK](https://github.com/java-operator-sdk/java-operator-sdk) 构建,并使用 [Native 
Kubernetes Integration](https://nightlies.apache.org 
/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/) 
用于启动 Flink deployment 并在后台提交作业。
+Java Operator SDK 是一个更高级别的框架和相关工具,用于支持使用 Java 编写 Kubernetes Operator。Java 
Operator SDK 和 Flink 的原生 kubernetes 集成本身都使用 [Fabric8 Kubernetes 
客户端](https://github.com/fabric8io/kubernetes-client) 与 

[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-21 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839382#comment-17839382
 ] 

Roman Boyko commented on FLINK-35184:
-

please assign this bug on me, I'm working on it.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-21 Thread Roman Boyko (Jira)
Roman Boyko created FLINK-35184:
---

 Summary: Hash collision inside MiniBatchStreamingJoin operator
 Key: FLINK-35184
 URL: https://issues.apache.org/jira/browse/FLINK-35184
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.19.0
Reporter: Roman Boyko


The hash collision is possible for InputSideHasNoUniqueKeyBundle. To reproduce 
it just launch the following test within StreamingMiniBatchJoinOperatorTest:

 
{code:java}
@Tag("miniBatchSize=6")
@Test
public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) throws 
Exception {

leftTypeInfo =
InternalTypeInfo.of(
RowType.of(
new LogicalType[] {new IntType(), new BigIntType()},
new String[] {"id1", "val1"}));

rightTypeInfo =
InternalTypeInfo.of(
RowType.of(
new LogicalType[] {new IntType(), new BigIntType()},
new String[] {"id2", "val2"}));

leftKeySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {0},
leftTypeInfo.toRowType().getChildren().toArray(new 
LogicalType[0]));
rightKeySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {0},
rightTypeInfo.toRowType().getChildren().toArray(new 
LogicalType[0]));

joinKeyTypeInfo = InternalTypeInfo.of(new IntType());

super.beforeEach(testInfo);

testHarness.setStateTtlProcessingTime(1);
testHarness.processElement2(insertRecord(1, 1L));
testHarness.processElement1(insertRecord(1, 4294967296L));
testHarness.processElement2(insertRecord(1, 4294967296L));
testHarness.processElement2(deleteRecord(1, 1L));

testHarness.close();

assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 4294967296L, 
1, 4294967296L));
} {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)