(flink) branch master updated: [FLINK-34247][doc] Fix the wrong zh link for flink-configuration-file and repeated flink configuration file

2024-02-05 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new d44e940e9d2 [FLINK-34247][doc] Fix the wrong zh link for 
flink-configuration-file and repeated flink configuration file
d44e940e9d2 is described below

commit d44e940e9d28518115cf6e8d5ba4f963faf30f52
Author: Rui Fan <1996fan...@gmail.com>
AuthorDate: Mon Feb 5 13:21:34 2024 +0800

[FLINK-34247][doc] Fix the wrong zh link for flink-configuration-file and 
repeated flink configuration file
---
 docs/content.zh/docs/connectors/table/filesystem.md|  4 ++--
 docs/content.zh/docs/deployment/advanced/historyserver.md  |  2 +-
 docs/content.zh/docs/deployment/cli.md |  4 ++--
 docs/content.zh/docs/deployment/filesystems/azure.md   |  6 +++---
 docs/content.zh/docs/deployment/filesystems/gcs.md |  6 +++---
 docs/content.zh/docs/deployment/filesystems/oss.md |  6 +++---
 docs/content.zh/docs/deployment/filesystems/s3.md  | 12 ++--
 docs/content.zh/docs/deployment/ha/kubernetes_ha.md|  2 +-
 docs/content.zh/docs/deployment/ha/zookeeper_ha.md |  4 ++--
 docs/content.zh/docs/deployment/memory/mem_migration.md|  4 ++--
 docs/content.zh/docs/deployment/metric_reporters.md|  2 +-
 .../deployment/resource-providers/native_kubernetes.md |  4 ++--
 .../deployment/resource-providers/standalone/docker.md |  8 
 .../deployment/resource-providers/standalone/kubernetes.md |  2 +-
 .../deployment/resource-providers/standalone/overview.md   |  8 
 docs/content.zh/docs/deployment/resource-providers/yarn.md |  2 +-
 docs/content.zh/docs/deployment/security/security-ssl.md   |  2 +-
 docs/content.zh/docs/deployment/trace_reporters.md |  2 +-
 docs/content.zh/docs/dev/datastream/execution/parallel.md  |  2 +-
 .../docs/dev/datastream/fault-tolerance/checkpointing.md   |  2 +-
 .../docs/dev/datastream/fault-tolerance/state_backends.md  |  2 +-
 docs/content.zh/docs/dev/table/catalogs.md |  4 ++--
 .../docs/dev/table/hive-compatibility/hiveserver2.md   |  4 ++--
 docs/content.zh/docs/dev/table/sql-gateway/overview.md |  4 ++--
 docs/content.zh/docs/dev/table/sqlClient.md|  2 +-
 docs/content.zh/docs/ops/debugging/flame_graphs.md |  2 +-
 docs/content.zh/docs/ops/metrics.md|  4 ++--
 docs/content.zh/docs/ops/rest_api.md   |  2 +-
 docs/content.zh/docs/ops/state/state_backends.md   | 14 +++---
 docs/content.zh/docs/ops/state/task_failure_recovery.md|  8 
 .../docs/try-flink/flink-operations-playground.md  |  4 ++--
 .../deployment/resource-providers/standalone/docker.md |  2 +-
 docs/content/docs/deployment/resource-providers/yarn.md|  2 +-
 .../docs/dev/datastream/fault-tolerance/state_backends.md  |  2 +-
 docs/content/docs/dev/table/sqlClient.md   |  2 +-
 docs/content/docs/ops/state/task_failure_recovery.md   |  4 ++--
 36 files changed, 73 insertions(+), 73 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/filesystem.md 
b/docs/content.zh/docs/connectors/table/filesystem.md
index 17d05045a20..584f5009081 100644
--- a/docs/content.zh/docs/connectors/table/filesystem.md
+++ b/docs/content.zh/docs/connectors/table/filesystem.md
@@ -241,8 +241,8 @@ CREATE TABLE MyUserTableWithFilepath (
 
 **注意:** 对于 bulk formats 数据 (parquet、orc、avro),滚动策略与 checkpoint 间隔(pending 
状态的文件会在下个 checkpoint 完成)控制了 part 文件的大小和个数。
 
-**注意:** 对于 row formats 数据 (csv、json),如果想使得分区文件更快在文件系统中可见,可以设置  
`sink.rolling-policy.file-size` 或 `sink.rolling-policy.rollover-interval` 属性以及在 
[Flink 配置文件]({{< ref "docs/deployment/config#flink-configuration-file" >}}) 中的 
`execution.checkpointing.interval` 属性。
-对于其他 formats (avro、orc),可以只设置 [Flink 配置文件]({{< ref 
"docs/deployment/config#flink-configuration-file" >}}) 中的 
`execution.checkpointing.interval` 属性。
+**注意:** 对于 row formats 数据 (csv、json),如果想使得分区文件更快在文件系统中可见,可以设置  
`sink.rolling-policy.file-size` 或 `sink.rolling-policy.rollover-interval` 属性以及在 
[Flink 配置文件]({{< ref "docs/deployment/config#flink-配置文件" >}}) 中的 
`execution.checkpointing.interval` 属性。
+对于其他 formats (avro、orc),可以只设置 [Flink 配置文件]({{< ref 
"docs/deployment/config#flink-配置文件" >}}) 中的 `execution.checkpointing.interval` 
属性。
 
 
 
diff --git a/docs/content.zh/docs/deployment/advanced/historyserver.md 
b/docs/content.zh/docs/deployment/advanced/historyserver.md
index 16950b4a06b..bef18431585 100644
--- a/docs/content.zh/docs/deployment/advanced/historyserver.md
+++ b/docs/content.zh/docs/deployment/advanced/historyserver.md
@@ -58,7 +58,7 @@ bin/historyserver.sh (start|start-foreground|stop)
 
 **JobManager**
 
-已完成作业的存档在 JobManager 上进行,将已存档的作业信息上传到文件系统目录中。你可以在 [Flink 配置文件]({{< ref 
"docs/deployment/config#flink-configuration-fil

(flink) branch master updated: [FLINK-34343][rpc] Use actor path when rejecting early messages

2024-02-05 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 839f298c383 [FLINK-34343][rpc] Use actor path when rejecting early 
messages
839f298c383 is described below

commit 839f298c3838f2f4981e271554b82fae770747d8
Author: Chesnay Schepler 
AuthorDate: Sat Feb 3 11:20:08 2024 +0100

[FLINK-34343][rpc] Use actor path when rejecting early messages
---
 .../src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
index f3724fba0e5..dc4e342f35a 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
@@ -179,7 +179,7 @@ class PekkoRpcActor 
extends AbstractActor {
 new EndpointNotStartedException(
 String.format(
 "Discard message %s, because the rpc 
endpoint %s has not been started yet.",
-message, rpcEndpoint.getAddress(;
+message, getSelf().path(;
 }
 }
 



(flink) branch release-1.18 updated: [FLINK-34343][rpc] Use actor path when rejecting early messages

2024-02-05 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new ee9945cd785 [FLINK-34343][rpc] Use actor path when rejecting early 
messages
ee9945cd785 is described below

commit ee9945cd785577d0f68092823b71abbb53d127f8
Author: Chesnay Schepler 
AuthorDate: Sat Feb 3 11:20:08 2024 +0100

[FLINK-34343][rpc] Use actor path when rejecting early messages
---
 .../src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
index f3724fba0e5..dc4e342f35a 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
@@ -179,7 +179,7 @@ class PekkoRpcActor 
extends AbstractActor {
 new EndpointNotStartedException(
 String.format(
 "Discard message %s, because the rpc 
endpoint %s has not been started yet.",
-message, rpcEndpoint.getAddress(;
+message, getSelf().path(;
 }
 }
 



(flink) branch release-1.17 updated: [FLINK-34343][rpc] Use actor path when rejecting early messages

2024-02-05 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new 3f22b6363e6 [FLINK-34343][rpc] Use actor path when rejecting early 
messages
3f22b6363e6 is described below

commit 3f22b6363e6cad4352821f42907ec8a2a181e675
Author: Chesnay Schepler 
AuthorDate: Sat Feb 3 11:20:08 2024 +0100

[FLINK-34343][rpc] Use actor path when rejecting early messages
---
 .../src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 8029956bbd9..59d30e16564 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -179,7 +179,7 @@ class AkkaRpcActor 
extends AbstractActor {
 new EndpointNotStartedException(
 String.format(
 "Discard message %s, because the rpc 
endpoint %s has not been started yet.",
-message, rpcEndpoint.getAddress(;
+message, getSelf().path(;
 }
 }
 



(flink) 02/02: [FLINK-33441] Remove unused json plan

2024-02-05 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3d0cddef4188995ea3d0731d6a88e7a6af56f44e
Author: bvarghese1 
AuthorDate: Tue Jan 16 18:20:34 2024 -0800

[FLINK-33441] Remove unused json plan
---
 .../UnionJsonPlanTest_jsonplan/testUnion.out   | 157 -
 1 file changed, 157 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/UnionJsonPlanTest_jsonplan/testUnion.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/UnionJsonPlanTest_jsonplan/testUnion.out
deleted file mode 100644
index 17fba7f6a3d..000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/UnionJsonPlanTest_jsonplan/testUnion.out
+++ /dev/null
@@ -1,157 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-"id" : 1,
-"type" : "stream-exec-table-source-scan_1",
-"scanTableSource" : {
-  "table" : {
-"identifier" : "`default_catalog`.`default_database`.`MyTable`",
-"resolvedTable" : {
-  "schema" : {
-"columns" : [ {
-  "name" : "a",
-  "dataType" : "BIGINT"
-}, {
-  "name" : "b",
-  "dataType" : "INT NOT NULL"
-}, {
-  "name" : "c",
-  "dataType" : "VARCHAR(2147483647)"
-}, {
-  "name" : "d",
-  "dataType" : "TIMESTAMP(3)"
-} ],
-"watermarkSpecs" : [ ]
-  },
-  "partitionKeys" : [ ],
-  "options" : {
-"bounded" : "false",
-"connector" : "values"
-  }
-}
-  },
-  "abilities" : [ {
-"type" : "ProjectPushDown",
-"projectedFields" : [ [ 0 ], [ 1 ] ],
-"producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
-  }, {
-"type" : "ReadingMetadata",
-"metadataKeys" : [ ],
-"producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
-  } ]
-},
-"outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
-"description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])",
-"inputProperties" : [ ]
-  }, {
-"id" : 2,
-"type" : "stream-exec-table-source-scan_1",
-"scanTableSource" : {
-  "table" : {
-"identifier" : "`default_catalog`.`default_database`.`MyTable2`",
-"resolvedTable" : {
-  "schema" : {
-"columns" : [ {
-  "name" : "d",
-  "dataType" : "BIGINT"
-}, {
-  "name" : "e",
-  "dataType" : "INT NOT NULL"
-} ],
-"watermarkSpecs" : [ ]
-  },
-  "partitionKeys" : [ ],
-  "options" : {
-"bounded" : "false",
-"connector" : "values"
-  }
-}
-  }
-},
-"outputType" : "ROW<`d` BIGINT, `e` INT NOT NULL>",
-"description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[d, e])",
-"inputProperties" : [ ]
-  }, {
-"id" : 3,
-"type" : "stream-exec-union_1",
-"inputProperties" : [ {
-  "requiredDistribution" : {
-"type" : "UNKNOWN"
-  },
-  "damBehavior" : "PIPELINED",
-  "priority" : 0
-}, {
-  "requiredDistribution" : {
-"type" : "UNKNOWN"
-  },
-  "damBehavior" : "PIPELINED",
-  "priority" : 0
-} ],
-"outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
-"description" : "Union(all=[true], union=[a, b])"
-  }, {
-"id" : 4,
-"type" : "stream-exec-sink_1",
-"configuration" : {
-  "table.exec.sink.keyed-shuffle" : "AUTO",
-  "table.exec.sink.not-null-enforcer" : "ERROR",
-  "table.exec.sink.rowtime-inserter" : "ENABLED",
-  "table.exec.sink.type-length-enforcer" : "IGNORE",
-  "table.exec.sink.upsert-materialize" : "AUTO"
-},
-"dynamicTableSink" : {
-  "table" : {
-"identifier" : "`default_catalog`.`default_database`.`MySink`",
-"resolvedTable" : {
-  "schema" : {
-"columns" : [ {
-  "name" : "a",
-  "dataType" : "BIGINT"
-}, {
-  "name" : "b",
-  "dataType" : "INT"
-} ],
-"watermarkSpecs" : [ ]
-  },
-  "partitionKeys" : [ ],
-  "options" : {
-"connector" : "values",
-"table-sink-class" : "DEFAULT"
-  }
-}
-  }
-},
-"inputChangelogMode" : [ "INSERT" ],
-"inputProperties" : [ {
-  "requiredDistribution" : {
-"type" : "UNKNOWN"
-  },
-  "damBehavior" : "PIPELINED",
-  "priority" : 0
-} ],
-"outputType" : "ROW

(flink) 01/02: [FLINK-33441] Move Union restore tests

2024-02-05 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f5d889fb70688bc3e29fc979dc233d2c9674fa96
Author: bvarghese1 
AuthorDate: Tue Jan 16 18:16:42 2024 -0800

[FLINK-33441] Move Union restore tests

- Moving from nodes.exec.testutil to nodes.exec.stream
- Related commit: f362dcc
---
 .../plan/nodes/exec/{testutils => stream}/UnionRestoreTest.java   | 4 ++--
 .../plan/nodes/exec/{testutils => stream}/UnionTestPrograms.java  | 3 +--
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/UnionRestoreTest.java
similarity index 90%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionRestoreTest.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/UnionRestoreTest.java
index ca27c175fc6..b271e0221ad 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/UnionRestoreTest.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
-import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Arrays;
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/UnionTestPrograms.java
similarity index 98%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionTestPrograms.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/UnionTestPrograms.java
index 562199588b5..e40ee481fce 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/UnionTestPrograms.java
@@ -16,9 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
-import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion;
 import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.SourceTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;



(flink) branch master updated (839f298c383 -> 3d0cddef418)

2024-02-05 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 839f298c383 [FLINK-34343][rpc] Use actor path when rejecting early 
messages
 new f5d889fb706 [FLINK-33441] Move Union restore tests
 new 3d0cddef418 [FLINK-33441] Remove unused json plan

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../{testutils => stream}/UnionRestoreTest.java|   4 +-
 .../{testutils => stream}/UnionTestPrograms.java   |   3 +-
 .../UnionJsonPlanTest_jsonplan/testUnion.out   | 157 -
 3 files changed, 3 insertions(+), 161 deletions(-)
 rename 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/{testutils
 => stream}/UnionRestoreTest.java (90%)
 rename 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/{testutils
 => stream}/UnionTestPrograms.java (98%)
 delete mode 100644 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/UnionJsonPlanTest_jsonplan/testUnion.out



(flink) branch master updated (3d0cddef418 -> 79cccd7103a)

2024-02-05 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 3d0cddef418 [FLINK-33441] Remove unused json plan
 new 39e2eaf6805 [hotfix][core] Makes ExecutorUtils#gracefulShutdown return 
any outstanding tasks
 new 79cccd7103a [FLINK-34007][k8s] Adds graceful shutdown logic to 
KubernetesLeaderElector

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/flink/util/ExecutorUtils.java | 14 ++
 .../kubeclient/resources/KubernetesLeaderElector.java | 19 ++-
 2 files changed, 24 insertions(+), 9 deletions(-)



(flink) 02/02: [FLINK-34007][k8s] Adds graceful shutdown logic to KubernetesLeaderElector

2024-02-05 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 79cccd7103a304bfa07104dcafd1f65a032c88ce
Author: Matthias Pohl 
AuthorDate: Fri Feb 2 14:47:42 2024 +0100

[FLINK-34007][k8s] Adds graceful shutdown logic to KubernetesLeaderElector

We need to make sure that any ongoing leadership event is properly handled. 
shutdownNow and the Precondition are too aggressive.
---
 .../kubeclient/resources/KubernetesLeaderElector.java | 19 ++-
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
index 0bf65ddbac2..728f46abcb6 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
@@ -21,7 +21,7 @@ package org.apache.flink.kubernetes.kubeclient.resources;
 import org.apache.flink.annotation.VisibleForTesting;
 import 
org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
 
@@ -160,10 +161,18 @@ public class KubernetesLeaderElector {
 // code that handles the leader loss
 cancelCurrentLeaderElectionSession();
 
-final List outstandingTasks = 
executorService.shutdownNow();
-Preconditions.checkState(
-outstandingTasks.isEmpty(),
-"All tasks that handle the leadership revocation should 
have been executed.");
+// the shutdown of the executor needs to happen gracefully for 
scenarios where the
+// release is called in the executorService. Interrupting this 
logic will result in the
+// leadership-lost event not being sent to the client.
+final List outStandingTasks =
+ExecutorUtils.gracefulShutdown(30, TimeUnit.SECONDS, 
executorService);
+
+if (!outStandingTasks.isEmpty()) {
+LOG.warn(
+"{} events were not processed before stopping the {} 
instance.",
+outStandingTasks.size(),
+KubernetesLeaderElector.class.getSimpleName());
+}
 }
 }
 



(flink) 01/02: [hotfix][core] Makes ExecutorUtils#gracefulShutdown return any outstanding tasks

2024-02-05 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 39e2eaf68055803956909ab995ebe73aefd308a5
Author: Matthias Pohl 
AuthorDate: Fri Feb 2 14:34:47 2024 +0100

[hotfix][core] Makes ExecutorUtils#gracefulShutdown return any outstanding 
tasks
---
 .../src/main/java/org/apache/flink/util/ExecutorUtils.java | 14 ++
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
index 939ffd6624c..99e2f3317e5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
@@ -21,6 +21,8 @@ package org.apache.flink.util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -38,8 +40,9 @@ public class ExecutorUtils {
  * @param timeout to wait for the termination of all ExecutorServices
  * @param unit of the timeout
  * @param executorServices to shut down
+ * @return Tasks that were not executed prior to a {@link 
ExecutorService#shutdownNow()}.
  */
-public static void gracefulShutdown(
+public static List gracefulShutdown(
 long timeout, TimeUnit unit, ExecutorService... executorServices) {
 for (ExecutorService executorService : executorServices) {
 executorService.shutdown();
@@ -50,22 +53,23 @@ public class ExecutorUtils {
 long timeLeft = unit.toMillis(timeout);
 boolean hasTimeLeft = timeLeft > 0L;
 
+final List outstandingTasks = new ArrayList<>();
 for (ExecutorService executorService : executorServices) {
 if (wasInterrupted || !hasTimeLeft) {
-executorService.shutdownNow();
+outstandingTasks.addAll(executorService.shutdownNow());
 } else {
 try {
 if (!executorService.awaitTermination(timeLeft, 
TimeUnit.MILLISECONDS)) {
 LOG.warn(
 "ExecutorService did not terminate in time. 
Shutting it down now.");
-executorService.shutdownNow();
+outstandingTasks.addAll(executorService.shutdownNow());
 }
 } catch (InterruptedException e) {
 LOG.warn(
 "Interrupted while shutting down executor 
services. Shutting all "
 + "remaining ExecutorServices down now.",
 e);
-executorService.shutdownNow();
+outstandingTasks.addAll(executorService.shutdownNow());
 
 wasInterrupted = true;
 
@@ -76,6 +80,8 @@ public class ExecutorUtils {
 hasTimeLeft = timeLeft > 0L;
 }
 }
+
+return outstandingTasks;
 }
 
 /**



(flink) branch release-1.18 updated: [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location

2024-02-05 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new a15515ebc0e [FLINK-34324][test] Makes all s3 related operations being 
declared and called in a single location
a15515ebc0e is described below

commit a15515ebc0e4c59ea0642e745e942591c28b3a3c
Author: Matthias Pohl 
AuthorDate: Wed Jan 31 15:02:24 2024 +0100

[FLINK-34324][test] Makes all s3 related operations being declared and 
called in a single location
---
 .../test-scripts/test_file_sink.sh | 111 +++--
 1 file changed, 59 insertions(+), 52 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh 
b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
index 711f74b6672..5ed1fda2c68 100755
--- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
@@ -20,53 +20,16 @@
 OUT_TYPE="${1:-local}" # other type: s3
 SINK_TO_TEST="${2:-"StreamingFileSink"}"
 
-S3_PREFIX=temp/test_file_sink-$(uuidgen)
-OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
-S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
 source "$(dirname "$0")"/common.sh
 
-if [ "${OUT_TYPE}" == "s3" ]; then
-  source "$(dirname "$0")"/common_s3.sh
-else
-  echo "S3 environment is not loaded for non-s3 test runs (test run type: 
$OUT_TYPE)."
-fi
-
-# randomly set up openSSL with dynamically/statically linked libraries
-OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo 
"static"; fi)
-echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection 
between 'dynamic' and 'static')"
-
-s3_setup hadoop
-set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
-set_config_key "metrics.fetcher.update-interval" "2000"
-# this test relies on global failovers
-set_config_key "jobmanager.execution.failover-strategy" "full"
-
+# OUTPUT_PATH is a local folder that can be used as a download folder for 
remote data
+# the helper functions will access this folder
+RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)"
+OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}"
 mkdir -p $OUTPUT_PATH
 
-if [ "${OUT_TYPE}" == "local" ]; then
-  echo "Use local output"
-  JOB_OUTPUT_PATH=${OUTPUT_PATH}
-elif [ "${OUT_TYPE}" == "s3" ]; then
-  echo "Use s3 output"
-  JOB_OUTPUT_PATH=${S3_OUTPUT_PATH}
-  set_config_key "state.checkpoints.dir" 
"s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk"
-  mkdir -p "$OUTPUT_PATH-chk"
-else
-  echo "Unknown output type: ${OUT_TYPE}"
-  exit 1
-fi
-
-# make sure we delete the file at the end
-function out_cleanup {
-  s3_delete_by_full_path_prefix "$S3_PREFIX"
-  s3_delete_by_full_path_prefix "${S3_PREFIX}-chk"
-  rollback_openssl_lib
-}
-if [ "${OUT_TYPE}" == "s3" ]; then
-  on_exit out_cleanup
-fi
-
-TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
+# JOB_OUTPUT_PATH is the location where the job writes its data to
+JOB_OUTPUT_PATH="${OUTPUT_PATH}"
 
 ###
 # Get all lines in part files and sort them numerically.
@@ -79,9 +42,6 @@ 
TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.
 #   sorted content of part files
 ###
 function get_complete_result {
-  if [ "${OUT_TYPE}" == "s3" ]; then
-s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" 
"part-" true
-  fi
   find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
 }
 
@@ -89,20 +49,67 @@ function get_complete_result {
 # Get total number of lines in part files.
 #
 # Globals:
-#   S3_PREFIX
+#   OUTPUT_PATH
 # Arguments:
 #   None
 # Returns:
 #   line number in part files
 ###
 function get_total_number_of_valid_lines {
-  if [ "${OUT_TYPE}" == "local" ]; then
-get_complete_result | wc -l | tr -d '[:space:]'
-  elif [ "${OUT_TYPE}" == "s3" ]; then
-s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-"
-  fi
+  get_complete_result | wc -l | tr -d '[:space:]'
 }
 
+if [ "${OUT_TYPE}" == "local" ]; then
+  echo "[INFO] Test run in local environment: No S3 environment is not loaded."
+elif [ "${OUT_TYPE}" == "s3" ]; then
+  # the s3 context requires additional
+  source "$(dirname "$0")"/common_s3.sh
+  s3_setup hadoop
+
+  # overwrites JOB_OUTPUT_PATH to point to S3
+  S3_DATA_PREFIX="${RANDOM_PREFIX}"
+  S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk"
+  JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}"
+  set_config_key "state.checkpoints.dir" 
"s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}"
+
+  # overwrites implementation for local runs
+  function get_complete_result {
+# copies the data from S3 to the local OUTPUT_PATH
+s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" 
"$FILE_SINK_TEST_TEMP_SUBFOLDER" "part-" true
+
+# and prints the sorted output
+find "${OUTP