[flink] tag release-1.16.0-ok.1 created (now db5f54a3d07)

2022-11-22 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to tag release-1.16.0-ok.1
in repository https://gitbox.apache.org/repos/asf/flink.git


  at db5f54a3d07 (commit)
This tag includes the following new commits:

 new db5f54a3d07 Update version to 1.16.0-ok.1

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[flink] 01/01: Update version to 1.16.0-ok.1

2022-11-22 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to tag release-1.16.0-ok.1
in repository https://gitbox.apache.org/repos/asf/flink.git

commit db5f54a3d07b09c08dc5e632b42b7d33edad5aff
Author: Dawid Wysakowicz 
AuthorDate: Wed Nov 23 08:21:47 2022 +0100

Update version to 1.16.0-ok.1
---
 docs/config.toml  | 4 ++--
 flink-annotations/pom.xml | 2 +-
 flink-architecture-tests/flink-architecture-tests-base/pom.xml| 2 +-
 flink-architecture-tests/flink-architecture-tests-production/pom.xml  | 2 +-
 flink-architecture-tests/flink-architecture-tests-test/pom.xml| 2 +-
 flink-architecture-tests/pom.xml  | 2 +-
 flink-clients/pom.xml | 2 +-
 flink-connectors/flink-connector-aws-base/pom.xml | 2 +-
 flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml | 2 +-
 flink-connectors/flink-connector-aws-kinesis-streams/pom.xml  | 2 +-
 flink-connectors/flink-connector-base/pom.xml | 2 +-
 flink-connectors/flink-connector-cassandra/pom.xml| 2 +-
 flink-connectors/flink-connector-elasticsearch-base/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch6/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch7/pom.xml   | 2 +-
 flink-connectors/flink-connector-files/pom.xml| 2 +-
 flink-connectors/flink-connector-gcp-pubsub/pom.xml   | 2 +-
 flink-connectors/flink-connector-hbase-1.4/pom.xml| 2 +-
 flink-connectors/flink-connector-hbase-2.2/pom.xml| 2 +-
 flink-connectors/flink-connector-hbase-base/pom.xml   | 2 +-
 flink-connectors/flink-connector-hive/pom.xml | 2 +-
 flink-connectors/flink-connector-jdbc/pom.xml | 2 +-
 flink-connectors/flink-connector-kafka/pom.xml| 2 +-
 flink-connectors/flink-connector-kinesis/pom.xml  | 2 +-
 flink-connectors/flink-connector-pulsar/pom.xml   | 2 +-
 flink-connectors/flink-connector-rabbitmq/pom.xml | 2 +-
 flink-connectors/flink-file-sink-common/pom.xml   | 2 +-
 flink-connectors/flink-hadoop-compatibility/pom.xml   | 2 +-
 flink-connectors/flink-hcatalog/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml | 2 +-
 flink-connectors/flink-sql-connector-aws-kinesis-streams/pom.xml  | 2 +-
 flink-connectors/flink-sql-connector-elasticsearch6/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-elasticsearch7/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hbase-1.4/pom.xml| 2 +-
 flink-connectors/flink-sql-connector-hbase-2.2/pom.xml| 2 +-
 flink-connectors/flink-sql-connector-hive-2.3.9/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-3.1.2/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-kafka/pom.xml| 2 +-
 flink-connectors/flink-sql-connector-kinesis/pom.xml  | 2 +-
 flink-connectors/flink-sql-connector-pulsar/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-rabbitmq/pom.xml | 2 +-
 flink-connectors/pom.xml  | 2 +-
 flink-container/pom.xml   | 2 +-
 flink-contrib/flink-connector-wikiedits/pom.xml   | 2 +-
 flink-contrib/pom.xml | 2 +-
 flink-core/pom.xml| 2 +-
 flink-dist-scala/pom.xml  | 2 +-
 flink-dist/pom.xml| 2 +-
 flink-docs/pom.xml| 2 +-
 flink-dstl/flink-dstl-dfs/pom.xml | 2 +-
 flink-dstl/pom.xml| 2 +-
 flink-end-to-end-tests/flink-batch-sql-test/pom.xml   | 2 +-
 flink-end-to-end-tests/flink-cli-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml| 2 +-
 .../flink-connector-gcp-pubsub-emulator-tests/pom.xml | 2 +-
 flink-end-to-end-tests/flink-dataset-allround-test/pom.xml| 2 +-
 .../flink-dataset-fine-grained-recovery-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-datastream-allround-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml  | 2 +-
 

[flink-ml] branch master updated: [FLINK-30144] Enable object reuse in Flink ML tests

2022-11-22 Thread lindong
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
 new 24eb3ec  [FLINK-30144] Enable object reuse in Flink ML tests
24eb3ec is described below

commit 24eb3ec84b72c0cfb170ea381095b25a8fb8a830
Author: yunfengzhou-hub 
AuthorDate: Wed Nov 23 14:54:05 2022 +0800

[FLINK-30144] Enable object reuse in Flink ML tests

This closes #179.
---
 .../src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java  | 1 +
 .../test/java/org/apache/flink/ml/benchmark/DataGeneratorTest.java  | 1 +
 flink-ml-core/src/test/java/org/apache/flink/ml/api/GraphTest.java  | 1 +
 .../src/test/java/org/apache/flink/ml/api/PipelineTest.java | 1 +
 .../org/apache/flink/ml/common/broadcast/BroadcastUtilsTest.java| 1 +
 .../org/apache/flink/ml/common/datastream/AllReduceImplTest.java| 4 
 .../org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java  | 1 +
 .../test/java/org/apache/flink/ml/common/window/WindowsTest.java| 1 +
 .../src/test/java/org/apache/flink/ml/util/ReadWriteUtilsTest.java  | 1 +
 .../java/org/apache/flink/iteration/IterationConstructionTest.java  | 5 +
 .../compile/DraftExecutionEnvironmentSwitchWrapperTest.java | 1 +
 .../progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java | 6 ++
 flink-ml-lib/src/test/java/org/apache/flink/ml/FunctionsTest.java   | 1 +
 .../src/test/java/org/apache/flink/ml/classification/KnnTest.java   | 1 +
 .../test/java/org/apache/flink/ml/classification/LinearSVCTest.java | 1 +
 .../org/apache/flink/ml/classification/LogisticRegressionTest.java  | 1 +
 .../java/org/apache/flink/ml/classification/NaiveBayesTest.java | 1 +
 .../flink/ml/classification/OnlineLogisticRegressionTest.java   | 1 +
 .../org/apache/flink/ml/clustering/AgglomerativeClusteringTest.java | 1 +
 .../src/test/java/org/apache/flink/ml/clustering/KMeansTest.java| 1 +
 .../test/java/org/apache/flink/ml/clustering/OnlineKMeansTest.java  | 1 +
 .../flink/ml/evaluation/BinaryClassificationEvaluatorTest.java  | 1 +
 .../src/test/java/org/apache/flink/ml/feature/BinarizerTest.java| 2 +-
 .../src/test/java/org/apache/flink/ml/feature/BucketizerTest.java   | 1 +
 flink-ml-lib/src/test/java/org/apache/flink/ml/feature/DCTTest.java | 1 +
 .../java/org/apache/flink/ml/feature/ElementwiseProductTest.java| 1 +
 .../test/java/org/apache/flink/ml/feature/FeatureHasherTest.java| 1 +
 .../src/test/java/org/apache/flink/ml/feature/HashingTFTest.java| 1 +
 flink-ml-lib/src/test/java/org/apache/flink/ml/feature/IDFTest.java | 1 +
 .../src/test/java/org/apache/flink/ml/feature/ImputerTest.java  | 1 +
 .../src/test/java/org/apache/flink/ml/feature/InteractionTest.java  | 2 +-
 .../test/java/org/apache/flink/ml/feature/KBinsDiscretizerTest.java | 1 +
 .../src/test/java/org/apache/flink/ml/feature/MaxAbsScalerTest.java | 1 +
 .../src/test/java/org/apache/flink/ml/feature/MinMaxScalerTest.java | 1 +
 .../src/test/java/org/apache/flink/ml/feature/NGramTest.java| 1 +
 .../src/test/java/org/apache/flink/ml/feature/NormalizerTest.java   | 2 +-
 .../test/java/org/apache/flink/ml/feature/OneHotEncoderTest.java| 1 +
 .../java/org/apache/flink/ml/feature/PolynomialExpansionTest.java   | 2 +-
 .../test/java/org/apache/flink/ml/feature/RandomSplitterTest.java   | 1 +
 .../test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java   | 1 +
 .../src/test/java/org/apache/flink/ml/feature/RobustScalerTest.java | 1 +
 .../test/java/org/apache/flink/ml/feature/StandardScalerTest.java   | 1 +
 .../src/test/java/org/apache/flink/ml/feature/TokenizerTest.java| 1 +
 .../org/apache/flink/ml/feature/VarianceThresholdSelectorTest.java  | 1 +
 .../test/java/org/apache/flink/ml/feature/VectorAssemblerTest.java  | 1 +
 .../test/java/org/apache/flink/ml/feature/VectorIndexerTest.java| 1 +
 .../src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java | 1 +
 .../flink/ml/feature/stringindexer/IndexToStringModelTest.java  | 1 +
 .../apache/flink/ml/feature/stringindexer/StringIndexerTest.java| 1 +
 .../java/org/apache/flink/ml/regression/LinearRegressionTest.java   | 1 +
 .../src/test/java/org/apache/flink/ml/stats/ChiSqTestTest.java  | 1 +
 flink-ml-python/pyflink/ml/tests/test_utils.py  | 1 +
 .../flink/test/iteration/BoundedAllRoundCheckpointITCase.java   | 1 +
 .../flink/test/iteration/BoundedAllRoundStreamIterationITCase.java  | 2 ++
 .../test/iteration/BoundedMixedLifeCycleStreamIterationITCase.java  | 1 +
 .../flink/test/iteration/BoundedPerRoundCheckpointITCase.java   | 1 +
 .../flink/test/iteration/BoundedPerRoundStreamIterationITCase.java  | 1 +
 .../apache/flink/test/iteration/UnboundedStreamIterationITCase.java | 2 ++
 58 files changed, 72 insertions(+), 4 deletions(-)

diff --git 

[flink-table-store] branch master updated: [FLINK-30139] CodeGenLoader fails when temporary directory is a symlink

2022-11-22 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new ae96b335 [FLINK-30139] CodeGenLoader fails when temporary directory is 
a symlink
ae96b335 is described below

commit ae96b335e470a068a7e87f3d1a22c58bf2d82256
Author: Jingsong Lee 
AuthorDate: Wed Nov 23 10:42:42 2022 +0800

[FLINK-30139] CodeGenLoader fails when temporary directory is a symlink

This closes #397
---
 flink-table-store-codegen-loader/pom.xml   |   7 ++
 .../flink/table/store/codegen/CodeGenLoader.java   |  16 ++--
 .../flink/table/store/utils/LocalFileUtils.java|  50 +++
 .../table/store/utils/LocalFileUtilsTest.java  |  80 +
 .../flink/table/store/utils/TempDirUtils.java  | 100 +
 5 files changed, 247 insertions(+), 6 deletions(-)

diff --git a/flink-table-store-codegen-loader/pom.xml 
b/flink-table-store-codegen-loader/pom.xml
index b968b507..1cbe9bd7 100644
--- a/flink-table-store-codegen-loader/pom.xml
+++ b/flink-table-store-codegen-loader/pom.xml
@@ -39,6 +39,13 @@ under the License.
 provided
 
 
+
+org.apache.flink
+flink-table-store-common
+${project.version}
+provided
+
+
 
 
 org.apache.flink
diff --git 
a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
 
b/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
index 938e4cec..6bba6a4c 100644
--- 
a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
+++ 
b/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.codegen;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.table.store.utils.LocalFileUtils;
 import org.apache.flink.util.IOUtils;
 
 import java.io.IOException;
@@ -70,7 +71,8 @@ public class CodeGenLoader {
 final ClassLoader flinkClassLoader = 
CodeGenLoader.class.getClassLoader();
 final Path tmpDirectory =
 Paths.get(ConfigurationUtils.parseTempDirectories(new 
Configuration())[0]);
-Files.createDirectories(tmpDirectory);
+Files.createDirectories(
+
LocalFileUtils.getTargetPathIfContainsSymbolicPath(tmpDirectory));
 Path delegateJar =
 extractResource(
 FLINK_TABLE_STORE_CODEGEN_FAT_JAR,
@@ -110,12 +112,14 @@ public class CodeGenLoader {
 
 // Singleton lazy initialization
 
-private static class CodegenLoaderHolder {
-private static final CodeGenLoader INSTANCE = new CodeGenLoader();
-}
+private static CodeGenLoader instance;
 
-public static CodeGenLoader getInstance() {
-return CodeGenLoader.CodegenLoaderHolder.INSTANCE;
+public static synchronized CodeGenLoader getInstance() {
+if (instance == null) {
+// Avoid NoClassDefFoundError without cause by exception
+instance = new CodeGenLoader();
+}
+return instance;
 }
 
 public  T discover(Class clazz) {
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/LocalFileUtils.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/LocalFileUtils.java
new file mode 100644
index ..a3930ea7
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/LocalFileUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/** Utils for local file. */
+public class LocalFileUtils {
+
+/**
+ * Get a 

[flink-kubernetes-operator] branch main updated: [FLINK-29475] Add error checker for the operator in e2e tests

2022-11-22 Thread gyfora
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new af00c99d [FLINK-29475] Add error checker for the operator in e2e tests
af00c99d is described below

commit af00c99defbe49c84dbd8a3ac4341136ca3efac9
Author: Gabor Somogyi 
AuthorDate: Mon Nov 21 10:15:15 2022 +0100

[FLINK-29475] Add error checker for the operator in e2e tests
---
 e2e-tests/test_application_kubernetes_ha.sh |  2 ++
 e2e-tests/test_application_operations.sh|  2 ++
 e2e-tests/test_multi_sessionjob.sh  |  2 ++
 e2e-tests/test_sessionjob_kubernetes_ha.sh  |  2 ++
 e2e-tests/test_sessionjob_operations.sh |  2 ++
 e2e-tests/utils.sh  | 29 +
 6 files changed, 39 insertions(+)

diff --git a/e2e-tests/test_application_kubernetes_ha.sh 
b/e2e-tests/test_application_kubernetes_ha.sh
index 1797b29a..eda15bc6 100755
--- a/e2e-tests/test_application_kubernetes_ha.sh
+++ b/e2e-tests/test_application_kubernetes_ha.sh
@@ -47,5 +47,7 @@ wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for 
job" ${TIMEOUT} || e
 wait_for_status flinkdep/flink-example-statemachine 
'.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
 wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' 
RUNNING ${TIMEOUT} || exit 1
 
+check_operator_log_for_errors || exit 1
+
 echo "Successfully run the Flink Kubernetes application HA test"
 
diff --git a/e2e-tests/test_application_operations.sh 
b/e2e-tests/test_application_operations.sh
index 457972c8..f6d1ace3 100755
--- a/e2e-tests/test_application_operations.sh
+++ b/e2e-tests/test_application_operations.sh
@@ -67,4 +67,6 @@ wait_for_status flinkdep/flink-example-statemachine 
'.status.jobManagerDeploymen
 wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' 
RUNNING ${TIMEOUT} || exit 1
 assert_available_slots 1 $CLUSTER_ID
 
+check_operator_log_for_errors || exit 1
+
 echo "Successfully run the last-state upgrade test"
diff --git a/e2e-tests/test_multi_sessionjob.sh 
b/e2e-tests/test_multi_sessionjob.sh
index 59990870..09862db5 100755
--- a/e2e-tests/test_multi_sessionjob.sh
+++ b/e2e-tests/test_multi_sessionjob.sh
@@ -38,6 +38,7 @@ jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
 wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || 
exit 1
 wait_for_status $SESSION_CLUSTER_IDENTIFIER 
'.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
 wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING 
${TIMEOUT} || exit 1
+check_operator_log_for_errors || exit 1
 echo "Flink Session Job is running properly"
 
 # Current namespace: flink
@@ -48,4 +49,5 @@ jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
 wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || 
exit 1
 wait_for_status $SESSION_CLUSTER_IDENTIFIER 
'.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
 wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING 
${TIMEOUT} || exit 1
+check_operator_log_for_errors || exit 1
 echo "Flink Session Job is running properly"
diff --git a/e2e-tests/test_sessionjob_kubernetes_ha.sh 
b/e2e-tests/test_sessionjob_kubernetes_ha.sh
index 0ad55b12..7a0fa813 100755
--- a/e2e-tests/test_sessionjob_kubernetes_ha.sh
+++ b/e2e-tests/test_sessionjob_kubernetes_ha.sh
@@ -48,5 +48,7 @@ wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for 
job" ${TIMEOUT} || e
 wait_for_status $SESSION_CLUSTER_IDENTIFIER 
'.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
 wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING 
${TIMEOUT} || exit 1
 
+check_operator_log_for_errors || exit 1
+
 echo "Successfully run the Flink Session Job HA test"
 
diff --git a/e2e-tests/test_sessionjob_operations.sh 
b/e2e-tests/test_sessionjob_operations.sh
index b1c88fc2..c230af8c 100755
--- a/e2e-tests/test_sessionjob_operations.sh
+++ b/e2e-tests/test_sessionjob_operations.sh
@@ -79,3 +79,5 @@ wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
 wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || 
exit 1
 wait_for_status $SESSION_CLUSTER_IDENTIFIER 
'.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
 wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING 
${TIMEOUT} || exit 1
+
+check_operator_log_for_errors || exit 1
diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh
index b8df6a42..447f038c 100755
--- a/e2e-tests/utils.sh
+++ b/e2e-tests/utils.sh
@@ -83,6 +83,11 @@ function wait_for_jobmanager_running() {
 wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 
1
 }
 
+function get_operator_pod_name() {
+   operator_pod_name=$(kubectl get pods 
--selector="app.kubernetes.io/name=flink-kubernetes-operator" -o 

[flink] branch master updated: [FLINK-29938][Connectors/Base] Add open() Method to AsyncSink ElementConverter (#21265)

2022-11-22 Thread dannycranmer
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 91c4d865eab [FLINK-29938][Connectors/Base] Add open() Method to 
AsyncSink ElementConverter (#21265)
91c4d865eab is described below

commit 91c4d865eabad0f7f1b8c7426d87e86afa06d6f6
Author: Danny Cranmer 
AuthorDate: Tue Nov 22 17:13:02 2022 +

[FLINK-29938][Connectors/Base] Add open() Method to AsyncSink 
ElementConverter (#21265)
---
 .../sink/KinesisFirehoseSinkElementConverter.java  | 49 
 .../KinesisFirehoseSinkElementConverterTest.java   | 28 +
 .../sink/KinesisStreamsSinkElementConverter.java   | 50 
 .../KinesisStreamsSinkElementConverterTest.java| 57 +++
 .../base/sink/writer/AsyncSinkWriter.java  |  1 +
 .../base/sink/writer/ElementConverter.java |  5 ++
 .../base/sink/writer/AsyncSinkWriterTest.java  | 66 +-
 .../base/sink/writer/TestElementConverter.java | 43 ++
 8 files changed, 222 insertions(+), 77 deletions(-)

diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
index b90db3387b9..f6e83fef2be 100644
--- 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
@@ -19,6 +19,7 @@ package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.metrics.MetricGroup;
@@ -39,7 +40,8 @@ import software.amazon.awssdk.services.firehose.model.Record;
 @Internal
 public class KinesisFirehoseSinkElementConverter
 implements ElementConverter {
-private boolean schemaOpened = false;
+
+private static final long serialVersionUID = 1L;
 
 /** A serialization schema to specify how the input element should be 
serialized. */
 private final SerializationSchema serializationSchema;
@@ -48,37 +50,34 @@ public class KinesisFirehoseSinkElementConverter
 this.serializationSchema = serializationSchema;
 }
 
+@Override
+public void open(Sink.InitContext context) {
+try {
+serializationSchema.open(
+new SerializationSchema.InitializationContext() {
+@Override
+public MetricGroup getMetricGroup() {
+return new UnregisteredMetricsGroup();
+}
+
+@Override
+public UserCodeClassLoader getUserCodeClassLoader() {
+return SimpleUserCodeClassLoader.create(
+
KinesisFirehoseSinkElementConverter.class.getClassLoader());
+}
+});
+} catch (Exception e) {
+throw new FlinkRuntimeException("Failed to initialize 
serialization schema.", e);
+}
+}
+
 @Override
 public Record apply(InputT element, SinkWriter.Context context) {
-checkOpened();
 return Record.builder()
 
.data(SdkBytes.fromByteArray(serializationSchema.serialize(element)))
 .build();
 }
 
-private void checkOpened() {
-if (!schemaOpened) {
-try {
-serializationSchema.open(
-new SerializationSchema.InitializationContext() {
-@Override
-public MetricGroup getMetricGroup() {
-return new UnregisteredMetricsGroup();
-}
-
-@Override
-public UserCodeClassLoader 
getUserCodeClassLoader() {
-return SimpleUserCodeClassLoader.create(
-
KinesisFirehoseSinkElementConverter.class.getClassLoader());
-}
-});
-schemaOpened = true;
-} catch (Exception e) {
-throw new FlinkRuntimeException("Failed to initialize 
serialization schema.", e);
-}
-}
-}
-
 public 

[flink-connector-hbase] 01/01: Init repository

2022-11-22 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git

commit 4faad76941b29e1ac692a5dffc04ccae44657a0c
Author: Martijn Visser 
AuthorDate: Tue Nov 22 16:00:14 2022 +0100

Init repository
---
 .asf.yaml |  20 ++
 .github/boring-cyborg.yml |  87 +++
 .github/workflows/ci.yml  | 101 
 .github/workflows/push_pr.yml |  27 ++
 .github/workflows/weekly.yml  |  32 +++
 .gitignore|  38 +++
 README.md |  68 +
 tools/ci/log4j.properties |  43 
 tools/maven/checkstyle.xml| 562 ++
 tools/maven/suppressions.xml  |  26 ++
 10 files changed, 1004 insertions(+)

diff --git a/.asf.yaml b/.asf.yaml
new file mode 100644
index 000..35f0424
--- /dev/null
+++ b/.asf.yaml
@@ -0,0 +1,20 @@
+github:
+  enabled_merge_buttons:
+squash: true
+merge: false
+rebase: true
+  labels:
+- flink
+- hbase
+- connector
+- datastream
+- table
+- sql
+  collaborators:
+- flinkbot
+notifications:
+  commits:  commits@flink.apache.org
+  issues:   iss...@flink.apache.org
+  pullrequests: iss...@flink.apache.org
+  jobs: bui...@flink.apache.org
+  jira_options: link label
\ No newline at end of file
diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml
new file mode 100644
index 000..e42d5d9
--- /dev/null
+++ b/.github/boring-cyborg.yml
@@ -0,0 +1,87 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+labelPRBasedOnFilePath:
+  component=BuildSystem:
+- .github/**/*
+- tools/maven/*
+
+  component=Documentation:
+- docs/**/*
+
+  component=Connectors/HBase:
+- flink-connector-hbase*/**/*
+- flink-sql-connector-hbase*/**/*
+
+## IssueLink Adder 
#
+# Insert Issue (Jira/Github etc) link in PR description based on the Issue ID 
in PR title.
+insertIssueLinkInPrDescription:
+  # specify the placeholder for the issue link that should be present in the 
description
+  descriptionIssuePlaceholderRegexp: "^Issue link: (.*)$"
+  matchers:
+# you can have several matches - for different types of issues
+# only the first matching entry is replaced
+jiraIssueMatch:
+  # specify the regexp of issue id that you can find in the title of the PR
+  # the match groups can be used to build the issue id (${1}, ${2}, etc.).
+  titleIssueIdRegexp: \[(FLINK-[0-9]+)\]
+  # the issue link to be added. ${1}, ${2} ... are replaced with the match 
groups from the
+  # title match (remember to use quotes)
+  descriptionIssueLink: 
"[${1}](https://issues.apache.org/jira/browse/${1}/)"
+docOnlyIssueMatch:
+  titleIssueIdRegexp: \[hotfix\]
+  descriptionIssueLink: "`Documentation only change, no JIRA issue`"
+
+## Title Validator 
#
+# Verifies if commit/PR titles match the regexp specified
+verifyTitles:
+  # Regular expression that should be matched by titles of commits or PR
+  titleRegexp: ^\[FLINK-[0-9]+\].*$|^\[FLINK-X\].*$|^\[hotfix].*$
+  # If set to true, it will always check the PR title (as opposed to the 
individual commits).
+  alwaysUsePrTitle: false
+  # If set to true, it will only check the commit in case there is a single 
commit.
+  # In case of multiple commits it will check PR title.
+  # This reflects the standard behaviour of Github that for `Squash & Merge` 
GitHub
+  # uses the PR title rather than commit messages for the squashed commit 
¯\_(ツ)_/¯
+  # For single-commit PRs it takes the squashed commit message from the commit 
as expected.
+  #
+  # If set to false it will check all commit messages. This is useful when you 
do not squash commits at merge.
+  validateEitherPrOrSingleCommitTitle: true
+  # The title the GitHub 

[flink-connector-hbase] branch main created (now 4faad76)

2022-11-22 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git


  at 4faad76  Init repository

This branch includes the following new commits:

 new 4faad76  Init repository

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[flink-connector-kafka] 01/01: Init repository

2022-11-22 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 2727011674de9d8b6a560861639fb6b806fd
Author: Martijn Visser 
AuthorDate: Tue Nov 22 15:38:43 2022 +0100

Init repository
---
 .asf.yaml |  20 ++
 .github/boring-cyborg.yml |  87 +++
 .github/workflows/ci.yml  | 101 
 .github/workflows/push_pr.yml |  27 ++
 .github/workflows/weekly.yml  |  32 +++
 .gitignore|  38 +++
 README.md |  68 +
 tools/ci/log4j.properties |  43 
 tools/maven/checkstyle.xml| 562 ++
 tools/maven/suppressions.xml  |  26 ++
 10 files changed, 1004 insertions(+)

diff --git a/.asf.yaml b/.asf.yaml
new file mode 100644
index 000..0d166d3
--- /dev/null
+++ b/.asf.yaml
@@ -0,0 +1,20 @@
+github:
+  enabled_merge_buttons:
+squash: true
+merge: false
+rebase: true
+  labels:
+- flink
+- kafka
+- connector
+- datastream
+- table
+- sql
+  collaborators:
+- flinkbot
+notifications:
+  commits:  commits@flink.apache.org
+  issues:   iss...@flink.apache.org
+  pullrequests: iss...@flink.apache.org
+  jobs: bui...@flink.apache.org
+  jira_options: link label
\ No newline at end of file
diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml
new file mode 100644
index 000..f2f0edd
--- /dev/null
+++ b/.github/boring-cyborg.yml
@@ -0,0 +1,87 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+labelPRBasedOnFilePath:
+  component=BuildSystem:
+- .github/**/*
+- tools/maven/*
+
+  component=Documentation:
+- docs/**/*
+
+  component=Connectors/Kafka:
+- flink-connector-kafka*/**/*
+- flink-sql-connector-kafka*/**/*
+
+## IssueLink Adder 
#
+# Insert Issue (Jira/Github etc) link in PR description based on the Issue ID 
in PR title.
+insertIssueLinkInPrDescription:
+  # specify the placeholder for the issue link that should be present in the 
description
+  descriptionIssuePlaceholderRegexp: "^Issue link: (.*)$"
+  matchers:
+# you can have several matches - for different types of issues
+# only the first matching entry is replaced
+jiraIssueMatch:
+  # specify the regexp of issue id that you can find in the title of the PR
+  # the match groups can be used to build the issue id (${1}, ${2}, etc.).
+  titleIssueIdRegexp: \[(FLINK-[0-9]+)\]
+  # the issue link to be added. ${1}, ${2} ... are replaced with the match 
groups from the
+  # title match (remember to use quotes)
+  descriptionIssueLink: 
"[${1}](https://issues.apache.org/jira/browse/${1}/)"
+docOnlyIssueMatch:
+  titleIssueIdRegexp: \[hotfix\]
+  descriptionIssueLink: "`Documentation only change, no JIRA issue`"
+
+## Title Validator 
#
+# Verifies if commit/PR titles match the regexp specified
+verifyTitles:
+  # Regular expression that should be matched by titles of commits or PR
+  titleRegexp: ^\[FLINK-[0-9]+\].*$|^\[FLINK-X\].*$|^\[hotfix].*$
+  # If set to true, it will always check the PR title (as opposed to the 
individual commits).
+  alwaysUsePrTitle: false
+  # If set to true, it will only check the commit in case there is a single 
commit.
+  # In case of multiple commits it will check PR title.
+  # This reflects the standard behaviour of Github that for `Squash & Merge` 
GitHub
+  # uses the PR title rather than commit messages for the squashed commit 
¯\_(ツ)_/¯
+  # For single-commit PRs it takes the squashed commit message from the commit 
as expected.
+  #
+  # If set to false it will check all commit messages. This is useful when you 
do not squash commits at merge.
+  validateEitherPrOrSingleCommitTitle: true
+  # The title the GitHub 

[flink-connector-kafka] branch main created (now 2111172)

2022-11-22 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


  at 272  Init repository

This branch includes the following new commits:

 new 272  Init repository

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[flink] branch master updated (f21090b529d -> 06fcd34eb26)

2022-11-22 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from f21090b529d [FLINK-24742][table][docs] Add info about SQL client key 
strokes to docs (#17649)
 add 06fcd34eb26 [FLINK-30044][ci] Deduplicate plugin parser loops

No new revisions were added by this update.

Summary of changes:
 .../ci/utils/dependency/DependencyParser.java  | 40 +++-
 .../flink/tools/ci/utils/deploy/DeployParser.java  | 31 +
 .../flink/tools/ci/utils/shade/ShadeParser.java| 31 ++---
 .../flink/tools/ci/utils/shared/ParserUtils.java   | 73 ++
 .../tools/ci/utils/deploy/DeployParserTest.java| 23 +++
 5 files changed, 111 insertions(+), 87 deletions(-)
 create mode 100644 
tools/ci/flink-ci-tools/src/main/java/org/apache/flink/tools/ci/utils/shared/ParserUtils.java



[flink] branch master updated (88a161101bb -> f21090b529d)

2022-11-22 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 88a161101bb [FLINK-16582][runtime][test] Fix the buffer leaks in 
NettyBufferPoolTest
 add f21090b529d [FLINK-24742][table][docs] Add info about SQL client key 
strokes to docs (#17649)

No new revisions were added by this update.

Summary of changes:
 docs/content.zh/docs/dev/table/sqlClient.md | 34 +
 docs/content/docs/dev/table/sqlClient.md| 34 +
 2 files changed, 68 insertions(+)



[flink-connector-aws] branch main updated: [FLINK-29900][Connectors/DynamoDB] Implement Table API for DynamoDB sink

2022-11-22 Thread dannycranmer
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
 new b2606b6  [FLINK-29900][Connectors/DynamoDB] Implement Table API for 
DynamoDB sink
b2606b6 is described below

commit b2606b634df44c1bc412e72c9494842703052998
Author: Hong Liang Teoh 
AuthorDate: Sun Nov 6 21:24:55 2022 +

[FLINK-29900][Connectors/DynamoDB] Implement Table API for DynamoDB sink
---
 flink-connector-dynamodb/pom.xml   |  44 +-
 .../dynamodb/table/DynamoDbConfiguration.java  |  44 ++
 .../dynamodb/table/DynamoDbConnectorOptions.java   |  52 ++
 .../dynamodb/table/DynamoDbDynamicSink.java| 182 +++
 .../dynamodb/table/DynamoDbDynamicSinkFactory.java |  84 +++
 .../dynamodb/table/RowDataElementConverter.java|  73 +++
 .../table/RowDataToAttributeValueConverter.java| 104 
 .../table/converter/ArrayAttributeConverter.java   |  69 +++
 .../converter/ArrayAttributeConverterProvider.java |  82 +++
 .../org.apache.flink.table.factories.Factory   |  16 +
 .../dynamodb/sink/examples/example_dynamodb.sql|  67 +++
 .../example_dynamodb_static_partitions.sql |  67 +++
 .../table/DynamoDbDynamicSinkFactoryTest.java  | 295 +++
 .../table/RowDataElementConverterTest.java | 148 ++
 .../RowDataToAttributeValueConverterTest.java  | 561 +
 .../converter/ArrayAttributeConverterTest.java |  90 
 flink-sql-connector-dynamodb/pom.xml   |   1 +
 .../src/main/resources/META-INF/NOTICE |  25 +-
 .../services/dynamodb/execution.interceptors   |   1 +
 19 files changed, 1984 insertions(+), 21 deletions(-)

diff --git a/flink-connector-dynamodb/pom.xml b/flink-connector-dynamodb/pom.xml
index 25f7756..932f069 100644
--- a/flink-connector-dynamodb/pom.xml
+++ b/flink-connector-dynamodb/pom.xml
@@ -18,19 +18,19 @@ specific language governing permissions and limitations
 under the License.
 -->
 http://maven.apache.org/POM/4.0.0;
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
 
-4.0.0
+   4.0.0
 
-
-org.apache.flink
-flink-connector-aws-parent
+   
+   org.apache.flink
+   flink-connector-aws-parent
1.0.0-SNAPSHOT
-
+   
 
-flink-connector-dynamodb
-Flink : Connectors : Amazon DynamoDB
+   flink-connector-dynamodb
+   Flink : Connectors : Amazon DynamoDB
 
jar
 
@@ -76,6 +76,22 @@ under the License.
dynamodb-enhanced

 
+   
+   
+   org.apache.flink
+   flink-table-api-java-bridge
+   ${flink.version}
+   provided
+   true
+   
+   
+   org.apache.flink
+   flink-table-runtime
+   ${flink.version}
+   provided
+   true
+   
+


org.apache.flink
@@ -106,5 +122,15 @@ under the License.
${flink.version}
test

+
+   
+   
+   org.apache.flink
+   flink-table-common
+   ${flink.version}
+   test-jar
+   test
+   
+

 
diff --git 
a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java
 
b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java
new file mode 100644
index 000..ad4ca3c
--- /dev/null
+++ 
b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ 

[flink] 01/02: [FLINK-29639][runtime] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ.

2022-11-22 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d521b030890c9c14749ff220548499fb92bd
Author: Weijie Guo 
AuthorDate: Wed Nov 16 21:05:24 2022 +0800

[FLINK-29639][runtime] Migrate ClientTransportErrorHandlingTest & 
PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to 
JUnit5 and AssertJ.

(cherry picked from commit 97c9abf9791d8c08db27f0ef2a2a78488321b0a1)
---
 .../ResultPartitionDeploymentDescriptorTest.java   |  41 +++---
 .../netty/ClientTransportErrorHandlingTest.java| 106 +++-
 .../netty/PartitionRequestClientFactoryTest.java   | 141 -
 .../util/NettyShuffleDescriptorBuilder.java|   1 -
 4 files changed, 146 insertions(+), 143 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 39e91a45a1f..423512f7862 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -31,19 +31,16 @@ import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionC
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link ResultPartitionDeploymentDescriptor}. */
-public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
+class ResultPartitionDeploymentDescriptorTest {
 private static final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
 private static final int numberOfPartitions = 5;
 
@@ -73,18 +70,18 @@ public class ResultPartitionDeploymentDescriptorTest 
extends TestLogger {
 
 /** Tests simple de/serialization with {@link UnknownShuffleDescriptor}. */
 @Test
-public void testSerializationOfUnknownShuffleDescriptor() throws 
IOException {
+void testSerializationOfUnknownShuffleDescriptor() throws IOException {
 ShuffleDescriptor shuffleDescriptor = new 
UnknownShuffleDescriptor(resultPartitionID);
 ShuffleDescriptor shuffleDescriptorCopy =
 CommonTestUtils.createCopySerializable(shuffleDescriptor);
-assertThat(shuffleDescriptorCopy, 
instanceOf(UnknownShuffleDescriptor.class));
-assertThat(shuffleDescriptorCopy.getResultPartitionID(), 
is(resultPartitionID));
-assertThat(shuffleDescriptorCopy.isUnknown(), is(true));
+
assertThat(shuffleDescriptorCopy).isInstanceOf(UnknownShuffleDescriptor.class);
+
assertThat(resultPartitionID).isEqualTo(shuffleDescriptorCopy.getResultPartitionID());
+assertThat(shuffleDescriptorCopy.isUnknown()).isTrue();
 }
 
 /** Tests simple de/serialization with {@link NettyShuffleDescriptor}. */
 @Test
-public void testSerializationWithNettyShuffleDescriptor() throws 
IOException {
+void testSerializationWithNettyShuffleDescriptor() throws IOException {
 ShuffleDescriptor shuffleDescriptor =
 new NettyShuffleDescriptor(
 producerLocation,
@@ -94,13 +91,13 @@ public class ResultPartitionDeploymentDescriptorTest 
extends TestLogger {
 ResultPartitionDeploymentDescriptor copy =
 
createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor);
 
-assertThat(copy.getShuffleDescriptor(), 
instanceOf(NettyShuffleDescriptor.class));
+
assertThat(copy.getShuffleDescriptor()).isInstanceOf(NettyShuffleDescriptor.class);
 NettyShuffleDescriptor shuffleDescriptorCopy =
 (NettyShuffleDescriptor) copy.getShuffleDescriptor();
-assertThat(shuffleDescriptorCopy.getResultPartitionID(), 
is(resultPartitionID));
-assertThat(shuffleDescriptorCopy.isUnknown(), is(false));
-assertThat(shuffleDescriptorCopy.isLocalTo(producerLocation), 
is(true));
-assertThat(shuffleDescriptorCopy.getConnectionId(), is(connectionID));
+
assertThat(resultPartitionID).isEqualTo(shuffleDescriptorCopy.getResultPartitionID());
+assertThat(shuffleDescriptorCopy.isUnknown()).isFalse();
+assertThat(shuffleDescriptorCopy.isLocalTo(producerLocation)).isTrue();
+

[flink] 02/02: [FLINK-29639][runtime] Print resourceId of remote taskmanager when encounter transport exception.

2022-11-22 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7a4fb7c86fbb28bee63bba829ebdda7b886170e3
Author: Weijie Guo 
AuthorDate: Wed Nov 16 21:24:23 2022 +0800

[FLINK-29639][runtime] Print resourceId of remote taskmanager when 
encounter transport exception.

This closes #21358

(cherry picked from commit 93c834be953f1336adb3ec5b5bf759a20e25eddf)
---
 .../flink/runtime/io/network/ConnectionID.java | 22 +
 .../runtime/io/network/NetworkClientHandler.java   |  2 ++
 .../CreditBasedPartitionRequestClientHandler.java  | 37 +++--
 .../network/netty/NettyPartitionRequestClient.java | 16 +++--
 .../netty/PartitionRequestClientFactory.java   |  4 +++
 .../runtime/shuffle/NettyShuffleDescriptor.java| 38 +++---
 .../ResultPartitionDeploymentDescriptorTest.java   |  5 +--
 .../runtime/deployment/ShuffleDescriptorTest.java  | 13 ++--
 .../netty/ClientTransportErrorHandlingTest.java| 13 +---
 ...editBasedPartitionRequestClientHandlerTest.java |  4 +++
 .../netty/NettyPartitionRequestClientTest.java |  4 ++-
 .../runtime/io/network/netty/NettyTestUtil.java|  4 ++-
 .../netty/PartitionRequestClientFactoryTest.java   |  2 +-
 .../partition/consumer/InputChannelBuilder.java|  3 +-
 .../util/NettyShuffleDescriptorBuilder.java|  7 ++--
 15 files changed, 134 insertions(+), 40 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
index 6cb0fa29f20..fb60340c673 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
@@ -43,18 +44,26 @@ public class ConnectionID implements Serializable {
 
 private final int connectionIndex;
 
+private final ResourceID resourceID;
+
 public ConnectionID(TaskManagerLocation connectionInfo, int 
connectionIndex) {
 this(
+connectionInfo.getResourceID(),
 new InetSocketAddress(connectionInfo.address(), 
connectionInfo.dataPort()),
 connectionIndex);
 }
 
-public ConnectionID(InetSocketAddress address, int connectionIndex) {
+public ConnectionID(ResourceID resourceID, InetSocketAddress address, int 
connectionIndex) {
+this.resourceID = checkNotNull(resourceID);
 this.address = checkNotNull(address);
 checkArgument(connectionIndex >= 0);
 this.connectionIndex = connectionIndex;
 }
 
+public ResourceID getResourceID() {
+return resourceID;
+}
+
 public InetSocketAddress getAddress() {
 return address;
 }
@@ -75,15 +84,14 @@ public class ConnectionID implements Serializable {
 }
 
 final ConnectionID ra = (ConnectionID) other;
-if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != 
connectionIndex) {
-return false;
-}
-
-return true;
+return ra.getAddress().equals(address)
+&& ra.getConnectionIndex() == connectionIndex
+&& ra.getResourceID().equals(resourceID);
 }
 
 @Override
 public String toString() {
-return address + " [" + connectionIndex + "]";
+return String.format(
+"%s (%s) [%s]", address, resourceID.getStringWithMetadata(), 
connectionIndex);
 }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
index 18ae9e6d9a9..354da6c3a32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
@@ -39,6 +39,8 @@ public interface NetworkClientHandler extends ChannelHandler {
 
 void cancelRequestFor(InputChannelID inputChannelId);
 
+void setConnectionId(ConnectionID connectionId);
+
 /**
  * Return whether there is channel error.
  *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index fe008c1afed..63efdf117c2 100644
--- 

[flink] branch release-1.15 updated (28a877b8880 -> 7a4fb7c86fb)

2022-11-22 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


from 28a877b8880 [FLINK-16582][runtime][test] Fix the buffer leaks in 
NettyBufferPoolTest
 new d521b030890 [FLINK-29639][runtime] Migrate 
ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & 
ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ.
 new 7a4fb7c86fb [FLINK-29639][runtime] Print resourceId of remote 
taskmanager when encounter transport exception.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/io/network/ConnectionID.java |  22 +++-
 .../runtime/io/network/NetworkClientHandler.java   |   2 +
 .../CreditBasedPartitionRequestClientHandler.java  |  37 +-
 .../network/netty/NettyPartitionRequestClient.java |  16 ++-
 .../netty/PartitionRequestClientFactory.java   |   4 +
 .../runtime/shuffle/NettyShuffleDescriptor.java|  38 --
 .../ResultPartitionDeploymentDescriptorTest.java   |  46 ---
 .../runtime/deployment/ShuffleDescriptorTest.java  |  13 +-
 .../netty/ClientTransportErrorHandlingTest.java| 119 -
 ...editBasedPartitionRequestClientHandlerTest.java |   4 +
 .../netty/NettyPartitionRequestClientTest.java |   4 +-
 .../runtime/io/network/netty/NettyTestUtil.java|   4 +-
 .../netty/PartitionRequestClientFactoryTest.java   | 143 -
 .../partition/consumer/InputChannelBuilder.java|   3 +-
 .../util/NettyShuffleDescriptorBuilder.java|   8 +-
 15 files changed, 280 insertions(+), 183 deletions(-)



[flink] 02/02: [FLINK-29639] Print resourceId of remote taskmanager when encounter transport exception.

2022-11-22 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit df061627591a1f62942e0700d51bb79259fb8f54
Author: Weijie Guo 
AuthorDate: Wed Nov 16 21:24:23 2022 +0800

[FLINK-29639] Print resourceId of remote taskmanager when encounter 
transport exception.

This closes #21361

(cherry picked from commit 93c834be953f1336adb3ec5b5bf759a20e25eddf)
---
 .../flink/runtime/io/network/ConnectionID.java | 22 +
 .../runtime/io/network/NetworkClientHandler.java   |  2 ++
 .../CreditBasedPartitionRequestClientHandler.java  | 37 +++--
 .../network/netty/NettyPartitionRequestClient.java | 16 +++--
 .../netty/PartitionRequestClientFactory.java   |  4 +++
 .../runtime/shuffle/NettyShuffleDescriptor.java| 38 +++---
 .../ResultPartitionDeploymentDescriptorTest.java   |  5 +--
 .../runtime/deployment/ShuffleDescriptorTest.java  | 13 ++--
 .../netty/ClientTransportErrorHandlingTest.java| 13 +---
 ...editBasedPartitionRequestClientHandlerTest.java |  4 +++
 .../netty/NettyPartitionRequestClientTest.java |  4 ++-
 .../runtime/io/network/netty/NettyTestUtil.java|  4 ++-
 .../netty/PartitionRequestClientFactoryTest.java   | 24 +-
 .../partition/consumer/InputChannelBuilder.java|  3 +-
 .../util/NettyShuffleDescriptorBuilder.java|  8 ++---
 15 files changed, 149 insertions(+), 48 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
index 6cb0fa29f20..fb60340c673 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
@@ -43,18 +44,26 @@ public class ConnectionID implements Serializable {
 
 private final int connectionIndex;
 
+private final ResourceID resourceID;
+
 public ConnectionID(TaskManagerLocation connectionInfo, int 
connectionIndex) {
 this(
+connectionInfo.getResourceID(),
 new InetSocketAddress(connectionInfo.address(), 
connectionInfo.dataPort()),
 connectionIndex);
 }
 
-public ConnectionID(InetSocketAddress address, int connectionIndex) {
+public ConnectionID(ResourceID resourceID, InetSocketAddress address, int 
connectionIndex) {
+this.resourceID = checkNotNull(resourceID);
 this.address = checkNotNull(address);
 checkArgument(connectionIndex >= 0);
 this.connectionIndex = connectionIndex;
 }
 
+public ResourceID getResourceID() {
+return resourceID;
+}
+
 public InetSocketAddress getAddress() {
 return address;
 }
@@ -75,15 +84,14 @@ public class ConnectionID implements Serializable {
 }
 
 final ConnectionID ra = (ConnectionID) other;
-if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != 
connectionIndex) {
-return false;
-}
-
-return true;
+return ra.getAddress().equals(address)
+&& ra.getConnectionIndex() == connectionIndex
+&& ra.getResourceID().equals(resourceID);
 }
 
 @Override
 public String toString() {
-return address + " [" + connectionIndex + "]";
+return String.format(
+"%s (%s) [%s]", address, resourceID.getStringWithMetadata(), 
connectionIndex);
 }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
index 18ae9e6d9a9..354da6c3a32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
@@ -39,6 +39,8 @@ public interface NetworkClientHandler extends ChannelHandler {
 
 void cancelRequestFor(InputChannelID inputChannelId);
 
+void setConnectionId(ConnectionID connectionId);
+
 /**
  * Return whether there is channel error.
  *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index fe008c1afed..63efdf117c2 100644
--- 

[flink] 01/02: [hotfix] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ.

2022-11-22 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e1c75638af65210c2f780996431dbc38f47517a0
Author: Weijie Guo 
AuthorDate: Wed Nov 16 21:05:24 2022 +0800

[hotfix] Migrate ClientTransportErrorHandlingTest & 
PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to 
JUnit5 and AssertJ.

(cherry picked from commit 97c9abf9791d8c08db27f0ef2a2a78488321b0a1)
---
 .../ResultPartitionDeploymentDescriptorTest.java   |  39 
 .../netty/ClientTransportErrorHandlingTest.java| 106 ++---
 .../netty/PartitionRequestClientFactoryTest.java   |  99 ++-
 3 files changed, 122 insertions(+), 122 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 33c79a1be4a..1fe6b9e329e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -31,20 +31,17 @@ import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionC
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link ResultPartitionDeploymentDescriptor}. */
-public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
+class ResultPartitionDeploymentDescriptorTest {
 private static final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
 private static final int numberOfPartitions = 5;
 
@@ -78,18 +75,18 @@ public class ResultPartitionDeploymentDescriptorTest 
extends TestLogger {
 
 /** Tests simple de/serialization with {@link UnknownShuffleDescriptor}. */
 @Test
-public void testSerializationOfUnknownShuffleDescriptor() throws 
IOException {
+void testSerializationOfUnknownShuffleDescriptor() throws IOException {
 ShuffleDescriptor shuffleDescriptor = new 
UnknownShuffleDescriptor(resultPartitionID);
 ShuffleDescriptor shuffleDescriptorCopy =
 CommonTestUtils.createCopySerializable(shuffleDescriptor);
-assertThat(shuffleDescriptorCopy, 
instanceOf(UnknownShuffleDescriptor.class));
-assertThat(shuffleDescriptorCopy.getResultPartitionID(), 
is(resultPartitionID));
-assertThat(shuffleDescriptorCopy.isUnknown(), is(true));
+
assertThat(shuffleDescriptorCopy).isInstanceOf(UnknownShuffleDescriptor.class);
+
assertThat(resultPartitionID).isEqualTo(shuffleDescriptorCopy.getResultPartitionID());
+assertThat(shuffleDescriptorCopy.isUnknown()).isTrue();
 }
 
 /** Tests simple de/serialization with {@link NettyShuffleDescriptor}. */
 @Test
-public void testSerializationWithNettyShuffleDescriptor() throws 
IOException {
+void testSerializationWithNettyShuffleDescriptor() throws IOException {
 ShuffleDescriptor shuffleDescriptor =
 new NettyShuffleDescriptor(
 producerLocation,
@@ -99,13 +96,13 @@ public class ResultPartitionDeploymentDescriptorTest 
extends TestLogger {
 ResultPartitionDeploymentDescriptor copy =
 
createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor);
 
-assertThat(copy.getShuffleDescriptor(), 
instanceOf(NettyShuffleDescriptor.class));
+
assertThat(copy.getShuffleDescriptor()).isInstanceOf(NettyShuffleDescriptor.class);
 NettyShuffleDescriptor shuffleDescriptorCopy =
 (NettyShuffleDescriptor) copy.getShuffleDescriptor();
-assertThat(shuffleDescriptorCopy.getResultPartitionID(), 
is(resultPartitionID));
-assertThat(shuffleDescriptorCopy.isUnknown(), is(false));
-assertThat(shuffleDescriptorCopy.isLocalTo(producerLocation), 
is(true));
-assertThat(shuffleDescriptorCopy.getConnectionId(), is(connectionID));
+
assertThat(resultPartitionID).isEqualTo(shuffleDescriptorCopy.getResultPartitionID());
+assertThat(shuffleDescriptorCopy.isUnknown()).isFalse();
+

[flink] branch release-1.16 updated (d8f7777f321 -> df061627591)

2022-11-22 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


from d8ff321 [FLINK-16582][runtime][test] Fix the buffer leaks in 
NettyBufferPoolTest
 new e1c75638af6 [hotfix] Migrate ClientTransportErrorHandlingTest & 
PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to 
JUnit5 and AssertJ.
 new df061627591 [FLINK-29639] Print resourceId of remote taskmanager when 
encounter transport exception.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/io/network/ConnectionID.java |  22 ++--
 .../runtime/io/network/NetworkClientHandler.java   |   2 +
 .../CreditBasedPartitionRequestClientHandler.java  |  37 ++-
 .../network/netty/NettyPartitionRequestClient.java |  16 ++-
 .../netty/PartitionRequestClientFactory.java   |   4 +
 .../runtime/shuffle/NettyShuffleDescriptor.java|  38 +--
 .../ResultPartitionDeploymentDescriptorTest.java   |  44 
 .../runtime/deployment/ShuffleDescriptorTest.java  |  13 ++-
 .../netty/ClientTransportErrorHandlingTest.java| 119 ++--
 ...editBasedPartitionRequestClientHandlerTest.java |   4 +
 .../netty/NettyPartitionRequestClientTest.java |   4 +-
 .../runtime/io/network/netty/NettyTestUtil.java|   4 +-
 .../netty/PartitionRequestClientFactoryTest.java   | 121 -
 .../partition/consumer/InputChannelBuilder.java|   3 +-
 .../util/NettyShuffleDescriptorBuilder.java|   8 +-
 15 files changed, 270 insertions(+), 169 deletions(-)



[flink] branch release-1.15 updated (6b565055beb -> 28a877b8880)

2022-11-22 Thread gaoyunhaii
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a change to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


from 6b565055beb [FLINK-28695][network] Fix the bug of old netty client 
isn't closed when netty server closes channel and no input channel
 add 28a877b8880 [FLINK-16582][runtime][test] Fix the buffer leaks in 
NettyBufferPoolTest

No new revisions were added by this update.

Summary of changes:
 .../io/network/netty/NettyBufferPoolTest.java  | 58 --
 1 file changed, 44 insertions(+), 14 deletions(-)



[flink] branch release-1.16 updated: [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest

2022-11-22 Thread gaoyunhaii
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
 new d8ff321 [FLINK-16582][runtime][test] Fix the buffer leaks in 
NettyBufferPoolTest
d8ff321 is described below

commit d8ff3216101fd31dbcdc4a725d2e7ead4113
Author: Yun Gao 
AuthorDate: Mon Dec 7 18:06:34 2020 +0800

[FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest
---
 .../io/network/netty/NettyBufferPoolTest.java  | 58 --
 1 file changed, 44 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
index 3797d025e77..0aaa11ab64c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
@@ -18,36 +18,61 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import org.junit.After;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /** Tests for the {@link NettyBufferPool} wrapper. */
 public class NettyBufferPoolTest {
 
+private final List needReleasing = new ArrayList<>();
+
+@After
+public void tearDown() {
+try {
+// Release all of the buffers.
+for (ByteBuf buf : needReleasing) {
+buf.release();
+}
+
+// Checks in a separate loop in case we have sliced buffers.
+for (ByteBuf buf : needReleasing) {
+assertEquals(0, buf.refCnt());
+}
+} finally {
+needReleasing.clear();
+}
+}
+
 @Test
 public void testNoHeapAllocations() throws Exception {
-NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
+final NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
 
 // Buffers should prefer to be direct
-assertTrue(nettyBufferPool.buffer().isDirect());
-assertTrue(nettyBufferPool.buffer(128).isDirect());
-assertTrue(nettyBufferPool.buffer(128, 256).isDirect());
+assertTrue(releaseLater(nettyBufferPool.buffer()).isDirect());
+assertTrue(releaseLater(nettyBufferPool.buffer(128)).isDirect());
+assertTrue(releaseLater(nettyBufferPool.buffer(128, 256)).isDirect());
 
 // IO buffers should prefer to be direct
-assertTrue(nettyBufferPool.ioBuffer().isDirect());
-assertTrue(nettyBufferPool.ioBuffer(128).isDirect());
-assertTrue(nettyBufferPool.ioBuffer(128, 256).isDirect());
+assertTrue(releaseLater(nettyBufferPool.ioBuffer()).isDirect());
+assertTrue(releaseLater(nettyBufferPool.ioBuffer(128)).isDirect());
+assertTrue(releaseLater(nettyBufferPool.ioBuffer(128, 
256)).isDirect());
 
 // Currently we fakes the heap buffer allocation with direct buffers
-assertTrue(nettyBufferPool.heapBuffer().isDirect());
-assertTrue(nettyBufferPool.heapBuffer(128).isDirect());
-assertTrue(nettyBufferPool.heapBuffer(128, 256).isDirect());
+assertTrue(releaseLater(nettyBufferPool.heapBuffer()).isDirect());
+assertTrue(releaseLater(nettyBufferPool.heapBuffer(128)).isDirect());
+assertTrue(releaseLater(nettyBufferPool.heapBuffer(128, 
256)).isDirect());
 
 // Composite buffers allocates the corresponding type of buffers when 
extending its capacity
-
assertTrue(nettyBufferPool.compositeHeapBuffer().capacity(1024).isDirect());
-
assertTrue(nettyBufferPool.compositeHeapBuffer(10).capacity(1024).isDirect());
+
assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer()).capacity(1024).isDirect());
+
assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer(10)).capacity(1024).isDirect());
 
 // Is direct buffer pooled!
 assertTrue(nettyBufferPool.isDirectBufferPooled());
@@ -60,16 +85,21 @@ public class NettyBufferPoolTest {
 
 {
 // Single large buffer allocates one chunk
-nettyBufferPool.directBuffer(chunkSize - 64);
+releaseLater(nettyBufferPool.directBuffer(chunkSize - 64));
 long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get();
 assertEquals(chunkSize, allocated);
 }
 
 {
 // Allocate a little more (one more chunk required)
-nettyBufferPool.directBuffer(128);
+releaseLater(nettyBufferPool.directBuffer(128));
 long allocated = 

[flink] branch master updated: [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest

2022-11-22 Thread gaoyunhaii
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 88a161101bb [FLINK-16582][runtime][test] Fix the buffer leaks in 
NettyBufferPoolTest
88a161101bb is described below

commit 88a161101bb33b4c088325788bd11d41f9369355
Author: Yun Gao 
AuthorDate: Mon Dec 7 18:06:34 2020 +0800

[FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest

This closes #14319.
---
 .../io/network/netty/NettyBufferPoolTest.java  | 58 --
 1 file changed, 44 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
index 3797d025e77..0aaa11ab64c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
@@ -18,36 +18,61 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import org.junit.After;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /** Tests for the {@link NettyBufferPool} wrapper. */
 public class NettyBufferPoolTest {
 
+private final List needReleasing = new ArrayList<>();
+
+@After
+public void tearDown() {
+try {
+// Release all of the buffers.
+for (ByteBuf buf : needReleasing) {
+buf.release();
+}
+
+// Checks in a separate loop in case we have sliced buffers.
+for (ByteBuf buf : needReleasing) {
+assertEquals(0, buf.refCnt());
+}
+} finally {
+needReleasing.clear();
+}
+}
+
 @Test
 public void testNoHeapAllocations() throws Exception {
-NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
+final NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
 
 // Buffers should prefer to be direct
-assertTrue(nettyBufferPool.buffer().isDirect());
-assertTrue(nettyBufferPool.buffer(128).isDirect());
-assertTrue(nettyBufferPool.buffer(128, 256).isDirect());
+assertTrue(releaseLater(nettyBufferPool.buffer()).isDirect());
+assertTrue(releaseLater(nettyBufferPool.buffer(128)).isDirect());
+assertTrue(releaseLater(nettyBufferPool.buffer(128, 256)).isDirect());
 
 // IO buffers should prefer to be direct
-assertTrue(nettyBufferPool.ioBuffer().isDirect());
-assertTrue(nettyBufferPool.ioBuffer(128).isDirect());
-assertTrue(nettyBufferPool.ioBuffer(128, 256).isDirect());
+assertTrue(releaseLater(nettyBufferPool.ioBuffer()).isDirect());
+assertTrue(releaseLater(nettyBufferPool.ioBuffer(128)).isDirect());
+assertTrue(releaseLater(nettyBufferPool.ioBuffer(128, 
256)).isDirect());
 
 // Currently we fakes the heap buffer allocation with direct buffers
-assertTrue(nettyBufferPool.heapBuffer().isDirect());
-assertTrue(nettyBufferPool.heapBuffer(128).isDirect());
-assertTrue(nettyBufferPool.heapBuffer(128, 256).isDirect());
+assertTrue(releaseLater(nettyBufferPool.heapBuffer()).isDirect());
+assertTrue(releaseLater(nettyBufferPool.heapBuffer(128)).isDirect());
+assertTrue(releaseLater(nettyBufferPool.heapBuffer(128, 
256)).isDirect());
 
 // Composite buffers allocates the corresponding type of buffers when 
extending its capacity
-
assertTrue(nettyBufferPool.compositeHeapBuffer().capacity(1024).isDirect());
-
assertTrue(nettyBufferPool.compositeHeapBuffer(10).capacity(1024).isDirect());
+
assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer()).capacity(1024).isDirect());
+
assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer(10)).capacity(1024).isDirect());
 
 // Is direct buffer pooled!
 assertTrue(nettyBufferPool.isDirectBufferPooled());
@@ -60,16 +85,21 @@ public class NettyBufferPoolTest {
 
 {
 // Single large buffer allocates one chunk
-nettyBufferPool.directBuffer(chunkSize - 64);
+releaseLater(nettyBufferPool.directBuffer(chunkSize - 64));
 long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get();
 assertEquals(chunkSize, allocated);
 }
 
 {
 // Allocate a little more (one more chunk required)
-nettyBufferPool.directBuffer(128);
+releaseLater(nettyBufferPool.directBuffer(128));
 long allocated = 

[flink] branch master updated: [FLINK-29423][rest] Remove custom JobDetails serializer

2022-11-22 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new ec89c17da5d [FLINK-29423][rest] Remove custom JobDetails serializer
ec89c17da5d is described below

commit ec89c17da5d555ca44f89d3f8739fa9ae00734b7
Author: Chesnay Schepler 
AuthorDate: Fri Nov 11 13:55:40 2022 +0100

[FLINK-29423][rest] Remove custom JobDetails serializer
---
 .../shortcodes/generated/rest_v1_dispatcher.html   |  33 +++-
 docs/static/generated/rest_v1_dispatcher.yml   |  41 ++---
 .../src/test/resources/rest_api_v1.snapshot|  33 +++-
 .../runtime/messages/webmonitor/JobDetails.java| 167 +
 4 files changed, 141 insertions(+), 133 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index ac1c58c1299..baa681d261b 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -1096,7 +1096,38 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
 "jobs" : {
   "type" : "array",
   "items" : {
-"type" : "any"
+"type" : "object",
+"id" : 
"urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobDetails",
+"properties" : {
+  "duration" : {
+"type" : "integer"
+  },
+  "end-time" : {
+"type" : "integer"
+  },
+  "jid" : {
+"type" : "any"
+  },
+  "last-modification" : {
+"type" : "integer"
+  },
+  "name" : {
+"type" : "string"
+  },
+  "start-time" : {
+"type" : "integer"
+  },
+  "state" : {
+"type" : "string",
+"enum" : [ "INITIALIZING", "CREATED", "RUNNING", "FAILING", 
"FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", 
"RECONCILING" ]
+  },
+  "tasks" : {
+"type" : "object",
+"additionalProperties" : {
+  "type" : "integer"
+}
+  }
+}
   }
 }
   }
diff --git a/docs/static/generated/rest_v1_dispatcher.yml 
b/docs/static/generated/rest_v1_dispatcher.yml
index c1dcc55d097..146b66469f3 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -1886,18 +1886,6 @@ components:
 total:
   type: integer
   format: int64
-CurrentAttempts:
-  type: object
-  properties:
-currentAttempts:
-  uniqueItems: true
-  type: array
-  items:
-type: integer
-format: int32
-representativeAttempt:
-  type: integer
-  format: int32
 DashboardConfiguration:
   type: object
   properties:
@@ -2218,36 +2206,27 @@ components:
 JobDetails:
   type: object
   properties:
-currentExecutionAttempts:
-  type: object
-  additionalProperties:
-type: object
-additionalProperties:
-  $ref: '#/components/schemas/CurrentAttempts'
 duration:
   type: integer
   format: int64
-endTime:
+end-time:
   type: integer
   format: int64
-jobId:
+jid:
   $ref: '#/components/schemas/JobID'
-jobName:
-  type: string
-lastUpdateTime:
+last-modification:
   type: integer
   format: int64
-numTasks:
-  type: integer
-  format: int32
-startTime:
+name:
+  type: string
+start-time:
   type: integer
   format: int64
-status:
+state:
   $ref: '#/components/schemas/JobStatus'
-tasksPerState:
-  type: array
-  items:
+tasks:
+  type: object
+  additionalProperties:
 type: integer
 format: int32
 JobDetailsInfo:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index ea31ca3d138..0ae17a6ce11 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -786,7 +786,38 @@
 "jobs" : {
   "type" : "array",
   "items" : {
-"type" : "any"
+"type" : "object",
+"id" : 
"urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobDetails",
+"properties" : {
+  "jid" : {
+"type" : "any"
+  },
+  "name" : {
+"type" : "string"
+  },
+  "start-time" : {
+