[flink] tag release-1.16.0-ok.1 created (now db5f54a3d07)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to tag release-1.16.0-ok.1 in repository https://gitbox.apache.org/repos/asf/flink.git at db5f54a3d07 (commit) This tag includes the following new commits: new db5f54a3d07 Update version to 1.16.0-ok.1 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[flink] 01/01: Update version to 1.16.0-ok.1
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to tag release-1.16.0-ok.1 in repository https://gitbox.apache.org/repos/asf/flink.git commit db5f54a3d07b09c08dc5e632b42b7d33edad5aff Author: Dawid Wysakowicz AuthorDate: Wed Nov 23 08:21:47 2022 +0100 Update version to 1.16.0-ok.1 --- docs/config.toml | 4 ++-- flink-annotations/pom.xml | 2 +- flink-architecture-tests/flink-architecture-tests-base/pom.xml| 2 +- flink-architecture-tests/flink-architecture-tests-production/pom.xml | 2 +- flink-architecture-tests/flink-architecture-tests-test/pom.xml| 2 +- flink-architecture-tests/pom.xml | 2 +- flink-clients/pom.xml | 2 +- flink-connectors/flink-connector-aws-base/pom.xml | 2 +- flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml | 2 +- flink-connectors/flink-connector-aws-kinesis-streams/pom.xml | 2 +- flink-connectors/flink-connector-base/pom.xml | 2 +- flink-connectors/flink-connector-cassandra/pom.xml| 2 +- flink-connectors/flink-connector-elasticsearch-base/pom.xml | 2 +- flink-connectors/flink-connector-elasticsearch6/pom.xml | 2 +- flink-connectors/flink-connector-elasticsearch7/pom.xml | 2 +- flink-connectors/flink-connector-files/pom.xml| 2 +- flink-connectors/flink-connector-gcp-pubsub/pom.xml | 2 +- flink-connectors/flink-connector-hbase-1.4/pom.xml| 2 +- flink-connectors/flink-connector-hbase-2.2/pom.xml| 2 +- flink-connectors/flink-connector-hbase-base/pom.xml | 2 +- flink-connectors/flink-connector-hive/pom.xml | 2 +- flink-connectors/flink-connector-jdbc/pom.xml | 2 +- flink-connectors/flink-connector-kafka/pom.xml| 2 +- flink-connectors/flink-connector-kinesis/pom.xml | 2 +- flink-connectors/flink-connector-pulsar/pom.xml | 2 +- flink-connectors/flink-connector-rabbitmq/pom.xml | 2 +- flink-connectors/flink-file-sink-common/pom.xml | 2 +- flink-connectors/flink-hadoop-compatibility/pom.xml | 2 +- flink-connectors/flink-hcatalog/pom.xml | 2 +- flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml | 2 +- flink-connectors/flink-sql-connector-aws-kinesis-streams/pom.xml | 2 +- flink-connectors/flink-sql-connector-elasticsearch6/pom.xml | 2 +- flink-connectors/flink-sql-connector-elasticsearch7/pom.xml | 2 +- flink-connectors/flink-sql-connector-hbase-1.4/pom.xml| 2 +- flink-connectors/flink-sql-connector-hbase-2.2/pom.xml| 2 +- flink-connectors/flink-sql-connector-hive-2.3.9/pom.xml | 2 +- flink-connectors/flink-sql-connector-hive-3.1.2/pom.xml | 2 +- flink-connectors/flink-sql-connector-kafka/pom.xml| 2 +- flink-connectors/flink-sql-connector-kinesis/pom.xml | 2 +- flink-connectors/flink-sql-connector-pulsar/pom.xml | 2 +- flink-connectors/flink-sql-connector-rabbitmq/pom.xml | 2 +- flink-connectors/pom.xml | 2 +- flink-container/pom.xml | 2 +- flink-contrib/flink-connector-wikiedits/pom.xml | 2 +- flink-contrib/pom.xml | 2 +- flink-core/pom.xml| 2 +- flink-dist-scala/pom.xml | 2 +- flink-dist/pom.xml| 2 +- flink-docs/pom.xml| 2 +- flink-dstl/flink-dstl-dfs/pom.xml | 2 +- flink-dstl/pom.xml| 2 +- flink-end-to-end-tests/flink-batch-sql-test/pom.xml | 2 +- flink-end-to-end-tests/flink-cli-test/pom.xml | 2 +- flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml| 2 +- .../flink-connector-gcp-pubsub-emulator-tests/pom.xml | 2 +- flink-end-to-end-tests/flink-dataset-allround-test/pom.xml| 2 +- .../flink-dataset-fine-grained-recovery-test/pom.xml | 2 +- flink-end-to-end-tests/flink-datastream-allround-test/pom.xml | 2 +- flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml | 2 +-
[flink-ml] branch master updated: [FLINK-30144] Enable object reuse in Flink ML tests
This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-ml.git The following commit(s) were added to refs/heads/master by this push: new 24eb3ec [FLINK-30144] Enable object reuse in Flink ML tests 24eb3ec is described below commit 24eb3ec84b72c0cfb170ea381095b25a8fb8a830 Author: yunfengzhou-hub AuthorDate: Wed Nov 23 14:54:05 2022 +0800 [FLINK-30144] Enable object reuse in Flink ML tests This closes #179. --- .../src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java | 1 + .../test/java/org/apache/flink/ml/benchmark/DataGeneratorTest.java | 1 + flink-ml-core/src/test/java/org/apache/flink/ml/api/GraphTest.java | 1 + .../src/test/java/org/apache/flink/ml/api/PipelineTest.java | 1 + .../org/apache/flink/ml/common/broadcast/BroadcastUtilsTest.java| 1 + .../org/apache/flink/ml/common/datastream/AllReduceImplTest.java| 4 .../org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java | 1 + .../test/java/org/apache/flink/ml/common/window/WindowsTest.java| 1 + .../src/test/java/org/apache/flink/ml/util/ReadWriteUtilsTest.java | 1 + .../java/org/apache/flink/iteration/IterationConstructionTest.java | 5 + .../compile/DraftExecutionEnvironmentSwitchWrapperTest.java | 1 + .../progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java | 6 ++ flink-ml-lib/src/test/java/org/apache/flink/ml/FunctionsTest.java | 1 + .../src/test/java/org/apache/flink/ml/classification/KnnTest.java | 1 + .../test/java/org/apache/flink/ml/classification/LinearSVCTest.java | 1 + .../org/apache/flink/ml/classification/LogisticRegressionTest.java | 1 + .../java/org/apache/flink/ml/classification/NaiveBayesTest.java | 1 + .../flink/ml/classification/OnlineLogisticRegressionTest.java | 1 + .../org/apache/flink/ml/clustering/AgglomerativeClusteringTest.java | 1 + .../src/test/java/org/apache/flink/ml/clustering/KMeansTest.java| 1 + .../test/java/org/apache/flink/ml/clustering/OnlineKMeansTest.java | 1 + .../flink/ml/evaluation/BinaryClassificationEvaluatorTest.java | 1 + .../src/test/java/org/apache/flink/ml/feature/BinarizerTest.java| 2 +- .../src/test/java/org/apache/flink/ml/feature/BucketizerTest.java | 1 + flink-ml-lib/src/test/java/org/apache/flink/ml/feature/DCTTest.java | 1 + .../java/org/apache/flink/ml/feature/ElementwiseProductTest.java| 1 + .../test/java/org/apache/flink/ml/feature/FeatureHasherTest.java| 1 + .../src/test/java/org/apache/flink/ml/feature/HashingTFTest.java| 1 + flink-ml-lib/src/test/java/org/apache/flink/ml/feature/IDFTest.java | 1 + .../src/test/java/org/apache/flink/ml/feature/ImputerTest.java | 1 + .../src/test/java/org/apache/flink/ml/feature/InteractionTest.java | 2 +- .../test/java/org/apache/flink/ml/feature/KBinsDiscretizerTest.java | 1 + .../src/test/java/org/apache/flink/ml/feature/MaxAbsScalerTest.java | 1 + .../src/test/java/org/apache/flink/ml/feature/MinMaxScalerTest.java | 1 + .../src/test/java/org/apache/flink/ml/feature/NGramTest.java| 1 + .../src/test/java/org/apache/flink/ml/feature/NormalizerTest.java | 2 +- .../test/java/org/apache/flink/ml/feature/OneHotEncoderTest.java| 1 + .../java/org/apache/flink/ml/feature/PolynomialExpansionTest.java | 2 +- .../test/java/org/apache/flink/ml/feature/RandomSplitterTest.java | 1 + .../test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java | 1 + .../src/test/java/org/apache/flink/ml/feature/RobustScalerTest.java | 1 + .../test/java/org/apache/flink/ml/feature/StandardScalerTest.java | 1 + .../src/test/java/org/apache/flink/ml/feature/TokenizerTest.java| 1 + .../org/apache/flink/ml/feature/VarianceThresholdSelectorTest.java | 1 + .../test/java/org/apache/flink/ml/feature/VectorAssemblerTest.java | 1 + .../test/java/org/apache/flink/ml/feature/VectorIndexerTest.java| 1 + .../src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java | 1 + .../flink/ml/feature/stringindexer/IndexToStringModelTest.java | 1 + .../apache/flink/ml/feature/stringindexer/StringIndexerTest.java| 1 + .../java/org/apache/flink/ml/regression/LinearRegressionTest.java | 1 + .../src/test/java/org/apache/flink/ml/stats/ChiSqTestTest.java | 1 + flink-ml-python/pyflink/ml/tests/test_utils.py | 1 + .../flink/test/iteration/BoundedAllRoundCheckpointITCase.java | 1 + .../flink/test/iteration/BoundedAllRoundStreamIterationITCase.java | 2 ++ .../test/iteration/BoundedMixedLifeCycleStreamIterationITCase.java | 1 + .../flink/test/iteration/BoundedPerRoundCheckpointITCase.java | 1 + .../flink/test/iteration/BoundedPerRoundStreamIterationITCase.java | 1 + .../apache/flink/test/iteration/UnboundedStreamIterationITCase.java | 2 ++ 58 files changed, 72 insertions(+), 4 deletions(-) diff --git
[flink-table-store] branch master updated: [FLINK-30139] CodeGenLoader fails when temporary directory is a symlink
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new ae96b335 [FLINK-30139] CodeGenLoader fails when temporary directory is a symlink ae96b335 is described below commit ae96b335e470a068a7e87f3d1a22c58bf2d82256 Author: Jingsong Lee AuthorDate: Wed Nov 23 10:42:42 2022 +0800 [FLINK-30139] CodeGenLoader fails when temporary directory is a symlink This closes #397 --- flink-table-store-codegen-loader/pom.xml | 7 ++ .../flink/table/store/codegen/CodeGenLoader.java | 16 ++-- .../flink/table/store/utils/LocalFileUtils.java| 50 +++ .../table/store/utils/LocalFileUtilsTest.java | 80 + .../flink/table/store/utils/TempDirUtils.java | 100 + 5 files changed, 247 insertions(+), 6 deletions(-) diff --git a/flink-table-store-codegen-loader/pom.xml b/flink-table-store-codegen-loader/pom.xml index b968b507..1cbe9bd7 100644 --- a/flink-table-store-codegen-loader/pom.xml +++ b/flink-table-store-codegen-loader/pom.xml @@ -39,6 +39,13 @@ under the License. provided + +org.apache.flink +flink-table-store-common +${project.version} +provided + + org.apache.flink diff --git a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java b/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java index 938e4cec..6bba6a4c 100644 --- a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java +++ b/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java @@ -20,6 +20,7 @@ package org.apache.flink.table.store.codegen; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.table.store.utils.LocalFileUtils; import org.apache.flink.util.IOUtils; import java.io.IOException; @@ -70,7 +71,8 @@ public class CodeGenLoader { final ClassLoader flinkClassLoader = CodeGenLoader.class.getClassLoader(); final Path tmpDirectory = Paths.get(ConfigurationUtils.parseTempDirectories(new Configuration())[0]); -Files.createDirectories(tmpDirectory); +Files.createDirectories( + LocalFileUtils.getTargetPathIfContainsSymbolicPath(tmpDirectory)); Path delegateJar = extractResource( FLINK_TABLE_STORE_CODEGEN_FAT_JAR, @@ -110,12 +112,14 @@ public class CodeGenLoader { // Singleton lazy initialization -private static class CodegenLoaderHolder { -private static final CodeGenLoader INSTANCE = new CodeGenLoader(); -} +private static CodeGenLoader instance; -public static CodeGenLoader getInstance() { -return CodeGenLoader.CodegenLoaderHolder.INSTANCE; +public static synchronized CodeGenLoader getInstance() { +if (instance == null) { +// Avoid NoClassDefFoundError without cause by exception +instance = new CodeGenLoader(); +} +return instance; } public T discover(Class clazz) { diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/LocalFileUtils.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/LocalFileUtils.java new file mode 100644 index ..a3930ea7 --- /dev/null +++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/LocalFileUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.utils; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +/** Utils for local file. */ +public class LocalFileUtils { + +/** + * Get a
[flink-kubernetes-operator] branch main updated: [FLINK-29475] Add error checker for the operator in e2e tests
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new af00c99d [FLINK-29475] Add error checker for the operator in e2e tests af00c99d is described below commit af00c99defbe49c84dbd8a3ac4341136ca3efac9 Author: Gabor Somogyi AuthorDate: Mon Nov 21 10:15:15 2022 +0100 [FLINK-29475] Add error checker for the operator in e2e tests --- e2e-tests/test_application_kubernetes_ha.sh | 2 ++ e2e-tests/test_application_operations.sh| 2 ++ e2e-tests/test_multi_sessionjob.sh | 2 ++ e2e-tests/test_sessionjob_kubernetes_ha.sh | 2 ++ e2e-tests/test_sessionjob_operations.sh | 2 ++ e2e-tests/utils.sh | 29 + 6 files changed, 39 insertions(+) diff --git a/e2e-tests/test_application_kubernetes_ha.sh b/e2e-tests/test_application_kubernetes_ha.sh index 1797b29a..eda15bc6 100755 --- a/e2e-tests/test_application_kubernetes_ha.sh +++ b/e2e-tests/test_application_kubernetes_ha.sh @@ -47,5 +47,7 @@ wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || e wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 +check_operator_log_for_errors || exit 1 + echo "Successfully run the Flink Kubernetes application HA test" diff --git a/e2e-tests/test_application_operations.sh b/e2e-tests/test_application_operations.sh index 457972c8..f6d1ace3 100755 --- a/e2e-tests/test_application_operations.sh +++ b/e2e-tests/test_application_operations.sh @@ -67,4 +67,6 @@ wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymen wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 assert_available_slots 1 $CLUSTER_ID +check_operator_log_for_errors || exit 1 + echo "Successfully run the last-state upgrade test" diff --git a/e2e-tests/test_multi_sessionjob.sh b/e2e-tests/test_multi_sessionjob.sh index 59990870..09862db5 100755 --- a/e2e-tests/test_multi_sessionjob.sh +++ b/e2e-tests/test_multi_sessionjob.sh @@ -38,6 +38,7 @@ jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 +check_operator_log_for_errors || exit 1 echo "Flink Session Job is running properly" # Current namespace: flink @@ -48,4 +49,5 @@ jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 +check_operator_log_for_errors || exit 1 echo "Flink Session Job is running properly" diff --git a/e2e-tests/test_sessionjob_kubernetes_ha.sh b/e2e-tests/test_sessionjob_kubernetes_ha.sh index 0ad55b12..7a0fa813 100755 --- a/e2e-tests/test_sessionjob_kubernetes_ha.sh +++ b/e2e-tests/test_sessionjob_kubernetes_ha.sh @@ -48,5 +48,7 @@ wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || e wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 +check_operator_log_for_errors || exit 1 + echo "Successfully run the Flink Session Job HA test" diff --git a/e2e-tests/test_sessionjob_operations.sh b/e2e-tests/test_sessionjob_operations.sh index b1c88fc2..c230af8c 100755 --- a/e2e-tests/test_sessionjob_operations.sh +++ b/e2e-tests/test_sessionjob_operations.sh @@ -79,3 +79,5 @@ wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 + +check_operator_log_for_errors || exit 1 diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh index b8df6a42..447f038c 100755 --- a/e2e-tests/utils.sh +++ b/e2e-tests/utils.sh @@ -83,6 +83,11 @@ function wait_for_jobmanager_running() { wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1 } +function get_operator_pod_name() { + operator_pod_name=$(kubectl get pods --selector="app.kubernetes.io/name=flink-kubernetes-operator" -o
[flink] branch master updated: [FLINK-29938][Connectors/Base] Add open() Method to AsyncSink ElementConverter (#21265)
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 91c4d865eab [FLINK-29938][Connectors/Base] Add open() Method to AsyncSink ElementConverter (#21265) 91c4d865eab is described below commit 91c4d865eabad0f7f1b8c7426d87e86afa06d6f6 Author: Danny Cranmer AuthorDate: Tue Nov 22 17:13:02 2022 + [FLINK-29938][Connectors/Base] Add open() Method to AsyncSink ElementConverter (#21265) --- .../sink/KinesisFirehoseSinkElementConverter.java | 49 .../KinesisFirehoseSinkElementConverterTest.java | 28 + .../sink/KinesisStreamsSinkElementConverter.java | 50 .../KinesisStreamsSinkElementConverterTest.java| 57 +++ .../base/sink/writer/AsyncSinkWriter.java | 1 + .../base/sink/writer/ElementConverter.java | 5 ++ .../base/sink/writer/AsyncSinkWriterTest.java | 66 +- .../base/sink/writer/TestElementConverter.java | 43 ++ 8 files changed, 222 insertions(+), 77 deletions(-) diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java index b90db3387b9..f6e83fef2be 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.firehose.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.metrics.MetricGroup; @@ -39,7 +40,8 @@ import software.amazon.awssdk.services.firehose.model.Record; @Internal public class KinesisFirehoseSinkElementConverter implements ElementConverter { -private boolean schemaOpened = false; + +private static final long serialVersionUID = 1L; /** A serialization schema to specify how the input element should be serialized. */ private final SerializationSchema serializationSchema; @@ -48,37 +50,34 @@ public class KinesisFirehoseSinkElementConverter this.serializationSchema = serializationSchema; } +@Override +public void open(Sink.InitContext context) { +try { +serializationSchema.open( +new SerializationSchema.InitializationContext() { +@Override +public MetricGroup getMetricGroup() { +return new UnregisteredMetricsGroup(); +} + +@Override +public UserCodeClassLoader getUserCodeClassLoader() { +return SimpleUserCodeClassLoader.create( + KinesisFirehoseSinkElementConverter.class.getClassLoader()); +} +}); +} catch (Exception e) { +throw new FlinkRuntimeException("Failed to initialize serialization schema.", e); +} +} + @Override public Record apply(InputT element, SinkWriter.Context context) { -checkOpened(); return Record.builder() .data(SdkBytes.fromByteArray(serializationSchema.serialize(element))) .build(); } -private void checkOpened() { -if (!schemaOpened) { -try { -serializationSchema.open( -new SerializationSchema.InitializationContext() { -@Override -public MetricGroup getMetricGroup() { -return new UnregisteredMetricsGroup(); -} - -@Override -public UserCodeClassLoader getUserCodeClassLoader() { -return SimpleUserCodeClassLoader.create( - KinesisFirehoseSinkElementConverter.class.getClassLoader()); -} -}); -schemaOpened = true; -} catch (Exception e) { -throw new FlinkRuntimeException("Failed to initialize serialization schema.", e); -} -} -} - public
[flink-connector-hbase] 01/01: Init repository
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git commit 4faad76941b29e1ac692a5dffc04ccae44657a0c Author: Martijn Visser AuthorDate: Tue Nov 22 16:00:14 2022 +0100 Init repository --- .asf.yaml | 20 ++ .github/boring-cyborg.yml | 87 +++ .github/workflows/ci.yml | 101 .github/workflows/push_pr.yml | 27 ++ .github/workflows/weekly.yml | 32 +++ .gitignore| 38 +++ README.md | 68 + tools/ci/log4j.properties | 43 tools/maven/checkstyle.xml| 562 ++ tools/maven/suppressions.xml | 26 ++ 10 files changed, 1004 insertions(+) diff --git a/.asf.yaml b/.asf.yaml new file mode 100644 index 000..35f0424 --- /dev/null +++ b/.asf.yaml @@ -0,0 +1,20 @@ +github: + enabled_merge_buttons: +squash: true +merge: false +rebase: true + labels: +- flink +- hbase +- connector +- datastream +- table +- sql + collaborators: +- flinkbot +notifications: + commits: commits@flink.apache.org + issues: iss...@flink.apache.org + pullrequests: iss...@flink.apache.org + jobs: bui...@flink.apache.org + jira_options: link label \ No newline at end of file diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml new file mode 100644 index 000..e42d5d9 --- /dev/null +++ b/.github/boring-cyborg.yml @@ -0,0 +1,87 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +labelPRBasedOnFilePath: + component=BuildSystem: +- .github/**/* +- tools/maven/* + + component=Documentation: +- docs/**/* + + component=Connectors/HBase: +- flink-connector-hbase*/**/* +- flink-sql-connector-hbase*/**/* + +## IssueLink Adder # +# Insert Issue (Jira/Github etc) link in PR description based on the Issue ID in PR title. +insertIssueLinkInPrDescription: + # specify the placeholder for the issue link that should be present in the description + descriptionIssuePlaceholderRegexp: "^Issue link: (.*)$" + matchers: +# you can have several matches - for different types of issues +# only the first matching entry is replaced +jiraIssueMatch: + # specify the regexp of issue id that you can find in the title of the PR + # the match groups can be used to build the issue id (${1}, ${2}, etc.). + titleIssueIdRegexp: \[(FLINK-[0-9]+)\] + # the issue link to be added. ${1}, ${2} ... are replaced with the match groups from the + # title match (remember to use quotes) + descriptionIssueLink: "[${1}](https://issues.apache.org/jira/browse/${1}/)" +docOnlyIssueMatch: + titleIssueIdRegexp: \[hotfix\] + descriptionIssueLink: "`Documentation only change, no JIRA issue`" + +## Title Validator # +# Verifies if commit/PR titles match the regexp specified +verifyTitles: + # Regular expression that should be matched by titles of commits or PR + titleRegexp: ^\[FLINK-[0-9]+\].*$|^\[FLINK-X\].*$|^\[hotfix].*$ + # If set to true, it will always check the PR title (as opposed to the individual commits). + alwaysUsePrTitle: false + # If set to true, it will only check the commit in case there is a single commit. + # In case of multiple commits it will check PR title. + # This reflects the standard behaviour of Github that for `Squash & Merge` GitHub + # uses the PR title rather than commit messages for the squashed commit ¯\_(ツ)_/¯ + # For single-commit PRs it takes the squashed commit message from the commit as expected. + # + # If set to false it will check all commit messages. This is useful when you do not squash commits at merge. + validateEitherPrOrSingleCommitTitle: true + # The title the GitHub
[flink-connector-hbase] branch main created (now 4faad76)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git at 4faad76 Init repository This branch includes the following new commits: new 4faad76 Init repository The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[flink-connector-kafka] 01/01: Init repository
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git commit 2727011674de9d8b6a560861639fb6b806fd Author: Martijn Visser AuthorDate: Tue Nov 22 15:38:43 2022 +0100 Init repository --- .asf.yaml | 20 ++ .github/boring-cyborg.yml | 87 +++ .github/workflows/ci.yml | 101 .github/workflows/push_pr.yml | 27 ++ .github/workflows/weekly.yml | 32 +++ .gitignore| 38 +++ README.md | 68 + tools/ci/log4j.properties | 43 tools/maven/checkstyle.xml| 562 ++ tools/maven/suppressions.xml | 26 ++ 10 files changed, 1004 insertions(+) diff --git a/.asf.yaml b/.asf.yaml new file mode 100644 index 000..0d166d3 --- /dev/null +++ b/.asf.yaml @@ -0,0 +1,20 @@ +github: + enabled_merge_buttons: +squash: true +merge: false +rebase: true + labels: +- flink +- kafka +- connector +- datastream +- table +- sql + collaborators: +- flinkbot +notifications: + commits: commits@flink.apache.org + issues: iss...@flink.apache.org + pullrequests: iss...@flink.apache.org + jobs: bui...@flink.apache.org + jira_options: link label \ No newline at end of file diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml new file mode 100644 index 000..f2f0edd --- /dev/null +++ b/.github/boring-cyborg.yml @@ -0,0 +1,87 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +labelPRBasedOnFilePath: + component=BuildSystem: +- .github/**/* +- tools/maven/* + + component=Documentation: +- docs/**/* + + component=Connectors/Kafka: +- flink-connector-kafka*/**/* +- flink-sql-connector-kafka*/**/* + +## IssueLink Adder # +# Insert Issue (Jira/Github etc) link in PR description based on the Issue ID in PR title. +insertIssueLinkInPrDescription: + # specify the placeholder for the issue link that should be present in the description + descriptionIssuePlaceholderRegexp: "^Issue link: (.*)$" + matchers: +# you can have several matches - for different types of issues +# only the first matching entry is replaced +jiraIssueMatch: + # specify the regexp of issue id that you can find in the title of the PR + # the match groups can be used to build the issue id (${1}, ${2}, etc.). + titleIssueIdRegexp: \[(FLINK-[0-9]+)\] + # the issue link to be added. ${1}, ${2} ... are replaced with the match groups from the + # title match (remember to use quotes) + descriptionIssueLink: "[${1}](https://issues.apache.org/jira/browse/${1}/)" +docOnlyIssueMatch: + titleIssueIdRegexp: \[hotfix\] + descriptionIssueLink: "`Documentation only change, no JIRA issue`" + +## Title Validator # +# Verifies if commit/PR titles match the regexp specified +verifyTitles: + # Regular expression that should be matched by titles of commits or PR + titleRegexp: ^\[FLINK-[0-9]+\].*$|^\[FLINK-X\].*$|^\[hotfix].*$ + # If set to true, it will always check the PR title (as opposed to the individual commits). + alwaysUsePrTitle: false + # If set to true, it will only check the commit in case there is a single commit. + # In case of multiple commits it will check PR title. + # This reflects the standard behaviour of Github that for `Squash & Merge` GitHub + # uses the PR title rather than commit messages for the squashed commit ¯\_(ツ)_/¯ + # For single-commit PRs it takes the squashed commit message from the commit as expected. + # + # If set to false it will check all commit messages. This is useful when you do not squash commits at merge. + validateEitherPrOrSingleCommitTitle: true + # The title the GitHub
[flink-connector-kafka] branch main created (now 2111172)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git at 272 Init repository This branch includes the following new commits: new 272 Init repository The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[flink] branch master updated (f21090b529d -> 06fcd34eb26)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from f21090b529d [FLINK-24742][table][docs] Add info about SQL client key strokes to docs (#17649) add 06fcd34eb26 [FLINK-30044][ci] Deduplicate plugin parser loops No new revisions were added by this update. Summary of changes: .../ci/utils/dependency/DependencyParser.java | 40 +++- .../flink/tools/ci/utils/deploy/DeployParser.java | 31 + .../flink/tools/ci/utils/shade/ShadeParser.java| 31 ++--- .../flink/tools/ci/utils/shared/ParserUtils.java | 73 ++ .../tools/ci/utils/deploy/DeployParserTest.java| 23 +++ 5 files changed, 111 insertions(+), 87 deletions(-) create mode 100644 tools/ci/flink-ci-tools/src/main/java/org/apache/flink/tools/ci/utils/shared/ParserUtils.java
[flink] branch master updated (88a161101bb -> f21090b529d)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 88a161101bb [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest add f21090b529d [FLINK-24742][table][docs] Add info about SQL client key strokes to docs (#17649) No new revisions were added by this update. Summary of changes: docs/content.zh/docs/dev/table/sqlClient.md | 34 + docs/content/docs/dev/table/sqlClient.md| 34 + 2 files changed, 68 insertions(+)
[flink-connector-aws] branch main updated: [FLINK-29900][Connectors/DynamoDB] Implement Table API for DynamoDB sink
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git The following commit(s) were added to refs/heads/main by this push: new b2606b6 [FLINK-29900][Connectors/DynamoDB] Implement Table API for DynamoDB sink b2606b6 is described below commit b2606b634df44c1bc412e72c9494842703052998 Author: Hong Liang Teoh AuthorDate: Sun Nov 6 21:24:55 2022 + [FLINK-29900][Connectors/DynamoDB] Implement Table API for DynamoDB sink --- flink-connector-dynamodb/pom.xml | 44 +- .../dynamodb/table/DynamoDbConfiguration.java | 44 ++ .../dynamodb/table/DynamoDbConnectorOptions.java | 52 ++ .../dynamodb/table/DynamoDbDynamicSink.java| 182 +++ .../dynamodb/table/DynamoDbDynamicSinkFactory.java | 84 +++ .../dynamodb/table/RowDataElementConverter.java| 73 +++ .../table/RowDataToAttributeValueConverter.java| 104 .../table/converter/ArrayAttributeConverter.java | 69 +++ .../converter/ArrayAttributeConverterProvider.java | 82 +++ .../org.apache.flink.table.factories.Factory | 16 + .../dynamodb/sink/examples/example_dynamodb.sql| 67 +++ .../example_dynamodb_static_partitions.sql | 67 +++ .../table/DynamoDbDynamicSinkFactoryTest.java | 295 +++ .../table/RowDataElementConverterTest.java | 148 ++ .../RowDataToAttributeValueConverterTest.java | 561 + .../converter/ArrayAttributeConverterTest.java | 90 flink-sql-connector-dynamodb/pom.xml | 1 + .../src/main/resources/META-INF/NOTICE | 25 +- .../services/dynamodb/execution.interceptors | 1 + 19 files changed, 1984 insertions(+), 21 deletions(-) diff --git a/flink-connector-dynamodb/pom.xml b/flink-connector-dynamodb/pom.xml index 25f7756..932f069 100644 --- a/flink-connector-dynamodb/pom.xml +++ b/flink-connector-dynamodb/pom.xml @@ -18,19 +18,19 @@ specific language governing permissions and limitations under the License. --> http://maven.apache.org/POM/4.0.0; - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> -4.0.0 + 4.0.0 - -org.apache.flink -flink-connector-aws-parent + + org.apache.flink + flink-connector-aws-parent 1.0.0-SNAPSHOT - + -flink-connector-dynamodb -Flink : Connectors : Amazon DynamoDB + flink-connector-dynamodb + Flink : Connectors : Amazon DynamoDB jar @@ -76,6 +76,22 @@ under the License. dynamodb-enhanced + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + true + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + true + + org.apache.flink @@ -106,5 +122,15 @@ under the License. ${flink.version} test + + + + org.apache.flink + flink-table-common + ${flink.version} + test-jar + test + + diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java new file mode 100644 index 000..ad4ca3c --- /dev/null +++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, +
[flink] 01/02: [FLINK-29639][runtime] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ.
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git commit d521b030890c9c14749ff220548499fb92bd Author: Weijie Guo AuthorDate: Wed Nov 16 21:05:24 2022 +0800 [FLINK-29639][runtime] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ. (cherry picked from commit 97c9abf9791d8c08db27f0ef2a2a78488321b0a1) --- .../ResultPartitionDeploymentDescriptorTest.java | 41 +++--- .../netty/ClientTransportErrorHandlingTest.java| 106 +++- .../netty/PartitionRequestClientFactoryTest.java | 141 - .../util/NettyShuffleDescriptorBuilder.java| 1 - 4 files changed, 146 insertions(+), 143 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index 39e91a45a1f..423512f7862 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -31,19 +31,16 @@ import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionC import org.apache.flink.runtime.shuffle.PartitionDescriptor; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor; -import org.apache.flink.util.TestLogger; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.net.InetSocketAddress; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link ResultPartitionDeploymentDescriptor}. */ -public class ResultPartitionDeploymentDescriptorTest extends TestLogger { +class ResultPartitionDeploymentDescriptorTest { private static final IntermediateDataSetID resultId = new IntermediateDataSetID(); private static final int numberOfPartitions = 5; @@ -73,18 +70,18 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger { /** Tests simple de/serialization with {@link UnknownShuffleDescriptor}. */ @Test -public void testSerializationOfUnknownShuffleDescriptor() throws IOException { +void testSerializationOfUnknownShuffleDescriptor() throws IOException { ShuffleDescriptor shuffleDescriptor = new UnknownShuffleDescriptor(resultPartitionID); ShuffleDescriptor shuffleDescriptorCopy = CommonTestUtils.createCopySerializable(shuffleDescriptor); -assertThat(shuffleDescriptorCopy, instanceOf(UnknownShuffleDescriptor.class)); -assertThat(shuffleDescriptorCopy.getResultPartitionID(), is(resultPartitionID)); -assertThat(shuffleDescriptorCopy.isUnknown(), is(true)); + assertThat(shuffleDescriptorCopy).isInstanceOf(UnknownShuffleDescriptor.class); + assertThat(resultPartitionID).isEqualTo(shuffleDescriptorCopy.getResultPartitionID()); +assertThat(shuffleDescriptorCopy.isUnknown()).isTrue(); } /** Tests simple de/serialization with {@link NettyShuffleDescriptor}. */ @Test -public void testSerializationWithNettyShuffleDescriptor() throws IOException { +void testSerializationWithNettyShuffleDescriptor() throws IOException { ShuffleDescriptor shuffleDescriptor = new NettyShuffleDescriptor( producerLocation, @@ -94,13 +91,13 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger { ResultPartitionDeploymentDescriptor copy = createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor); -assertThat(copy.getShuffleDescriptor(), instanceOf(NettyShuffleDescriptor.class)); + assertThat(copy.getShuffleDescriptor()).isInstanceOf(NettyShuffleDescriptor.class); NettyShuffleDescriptor shuffleDescriptorCopy = (NettyShuffleDescriptor) copy.getShuffleDescriptor(); -assertThat(shuffleDescriptorCopy.getResultPartitionID(), is(resultPartitionID)); -assertThat(shuffleDescriptorCopy.isUnknown(), is(false)); -assertThat(shuffleDescriptorCopy.isLocalTo(producerLocation), is(true)); -assertThat(shuffleDescriptorCopy.getConnectionId(), is(connectionID)); + assertThat(resultPartitionID).isEqualTo(shuffleDescriptorCopy.getResultPartitionID()); +assertThat(shuffleDescriptorCopy.isUnknown()).isFalse(); +assertThat(shuffleDescriptorCopy.isLocalTo(producerLocation)).isTrue(); +
[flink] 02/02: [FLINK-29639][runtime] Print resourceId of remote taskmanager when encounter transport exception.
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git commit 7a4fb7c86fbb28bee63bba829ebdda7b886170e3 Author: Weijie Guo AuthorDate: Wed Nov 16 21:24:23 2022 +0800 [FLINK-29639][runtime] Print resourceId of remote taskmanager when encounter transport exception. This closes #21358 (cherry picked from commit 93c834be953f1336adb3ec5b5bf759a20e25eddf) --- .../flink/runtime/io/network/ConnectionID.java | 22 + .../runtime/io/network/NetworkClientHandler.java | 2 ++ .../CreditBasedPartitionRequestClientHandler.java | 37 +++-- .../network/netty/NettyPartitionRequestClient.java | 16 +++-- .../netty/PartitionRequestClientFactory.java | 4 +++ .../runtime/shuffle/NettyShuffleDescriptor.java| 38 +++--- .../ResultPartitionDeploymentDescriptorTest.java | 5 +-- .../runtime/deployment/ShuffleDescriptorTest.java | 13 ++-- .../netty/ClientTransportErrorHandlingTest.java| 13 +--- ...editBasedPartitionRequestClientHandlerTest.java | 4 +++ .../netty/NettyPartitionRequestClientTest.java | 4 ++- .../runtime/io/network/netty/NettyTestUtil.java| 4 ++- .../netty/PartitionRequestClientFactoryTest.java | 2 +- .../partition/consumer/InputChannelBuilder.java| 3 +- .../util/NettyShuffleDescriptorBuilder.java| 7 ++-- 15 files changed, 134 insertions(+), 40 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java index 6cb0fa29f20..fb60340c673 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.executiongraph.IntermediateResult; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -43,18 +44,26 @@ public class ConnectionID implements Serializable { private final int connectionIndex; +private final ResourceID resourceID; + public ConnectionID(TaskManagerLocation connectionInfo, int connectionIndex) { this( +connectionInfo.getResourceID(), new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()), connectionIndex); } -public ConnectionID(InetSocketAddress address, int connectionIndex) { +public ConnectionID(ResourceID resourceID, InetSocketAddress address, int connectionIndex) { +this.resourceID = checkNotNull(resourceID); this.address = checkNotNull(address); checkArgument(connectionIndex >= 0); this.connectionIndex = connectionIndex; } +public ResourceID getResourceID() { +return resourceID; +} + public InetSocketAddress getAddress() { return address; } @@ -75,15 +84,14 @@ public class ConnectionID implements Serializable { } final ConnectionID ra = (ConnectionID) other; -if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != connectionIndex) { -return false; -} - -return true; +return ra.getAddress().equals(address) +&& ra.getConnectionIndex() == connectionIndex +&& ra.getResourceID().equals(resourceID); } @Override public String toString() { -return address + " [" + connectionIndex + "]"; +return String.format( +"%s (%s) [%s]", address, resourceID.getStringWithMetadata(), connectionIndex); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java index 18ae9e6d9a9..354da6c3a32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java @@ -39,6 +39,8 @@ public interface NetworkClientHandler extends ChannelHandler { void cancelRequestFor(InputChannelID inputChannelId); +void setConnectionId(ConnectionID connectionId); + /** * Return whether there is channel error. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java index fe008c1afed..63efdf117c2 100644 ---
[flink] branch release-1.15 updated (28a877b8880 -> 7a4fb7c86fb)
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a change to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git from 28a877b8880 [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest new d521b030890 [FLINK-29639][runtime] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ. new 7a4fb7c86fb [FLINK-29639][runtime] Print resourceId of remote taskmanager when encounter transport exception. The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/runtime/io/network/ConnectionID.java | 22 +++- .../runtime/io/network/NetworkClientHandler.java | 2 + .../CreditBasedPartitionRequestClientHandler.java | 37 +- .../network/netty/NettyPartitionRequestClient.java | 16 ++- .../netty/PartitionRequestClientFactory.java | 4 + .../runtime/shuffle/NettyShuffleDescriptor.java| 38 -- .../ResultPartitionDeploymentDescriptorTest.java | 46 --- .../runtime/deployment/ShuffleDescriptorTest.java | 13 +- .../netty/ClientTransportErrorHandlingTest.java| 119 - ...editBasedPartitionRequestClientHandlerTest.java | 4 + .../netty/NettyPartitionRequestClientTest.java | 4 +- .../runtime/io/network/netty/NettyTestUtil.java| 4 +- .../netty/PartitionRequestClientFactoryTest.java | 143 - .../partition/consumer/InputChannelBuilder.java| 3 +- .../util/NettyShuffleDescriptorBuilder.java| 8 +- 15 files changed, 280 insertions(+), 183 deletions(-)
[flink] 02/02: [FLINK-29639] Print resourceId of remote taskmanager when encounter transport exception.
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git commit df061627591a1f62942e0700d51bb79259fb8f54 Author: Weijie Guo AuthorDate: Wed Nov 16 21:24:23 2022 +0800 [FLINK-29639] Print resourceId of remote taskmanager when encounter transport exception. This closes #21361 (cherry picked from commit 93c834be953f1336adb3ec5b5bf759a20e25eddf) --- .../flink/runtime/io/network/ConnectionID.java | 22 + .../runtime/io/network/NetworkClientHandler.java | 2 ++ .../CreditBasedPartitionRequestClientHandler.java | 37 +++-- .../network/netty/NettyPartitionRequestClient.java | 16 +++-- .../netty/PartitionRequestClientFactory.java | 4 +++ .../runtime/shuffle/NettyShuffleDescriptor.java| 38 +++--- .../ResultPartitionDeploymentDescriptorTest.java | 5 +-- .../runtime/deployment/ShuffleDescriptorTest.java | 13 ++-- .../netty/ClientTransportErrorHandlingTest.java| 13 +--- ...editBasedPartitionRequestClientHandlerTest.java | 4 +++ .../netty/NettyPartitionRequestClientTest.java | 4 ++- .../runtime/io/network/netty/NettyTestUtil.java| 4 ++- .../netty/PartitionRequestClientFactoryTest.java | 24 +- .../partition/consumer/InputChannelBuilder.java| 3 +- .../util/NettyShuffleDescriptorBuilder.java| 8 ++--- 15 files changed, 149 insertions(+), 48 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java index 6cb0fa29f20..fb60340c673 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.executiongraph.IntermediateResult; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -43,18 +44,26 @@ public class ConnectionID implements Serializable { private final int connectionIndex; +private final ResourceID resourceID; + public ConnectionID(TaskManagerLocation connectionInfo, int connectionIndex) { this( +connectionInfo.getResourceID(), new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()), connectionIndex); } -public ConnectionID(InetSocketAddress address, int connectionIndex) { +public ConnectionID(ResourceID resourceID, InetSocketAddress address, int connectionIndex) { +this.resourceID = checkNotNull(resourceID); this.address = checkNotNull(address); checkArgument(connectionIndex >= 0); this.connectionIndex = connectionIndex; } +public ResourceID getResourceID() { +return resourceID; +} + public InetSocketAddress getAddress() { return address; } @@ -75,15 +84,14 @@ public class ConnectionID implements Serializable { } final ConnectionID ra = (ConnectionID) other; -if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != connectionIndex) { -return false; -} - -return true; +return ra.getAddress().equals(address) +&& ra.getConnectionIndex() == connectionIndex +&& ra.getResourceID().equals(resourceID); } @Override public String toString() { -return address + " [" + connectionIndex + "]"; +return String.format( +"%s (%s) [%s]", address, resourceID.getStringWithMetadata(), connectionIndex); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java index 18ae9e6d9a9..354da6c3a32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java @@ -39,6 +39,8 @@ public interface NetworkClientHandler extends ChannelHandler { void cancelRequestFor(InputChannelID inputChannelId); +void setConnectionId(ConnectionID connectionId); + /** * Return whether there is channel error. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java index fe008c1afed..63efdf117c2 100644 ---
[flink] 01/02: [hotfix] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ.
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git commit e1c75638af65210c2f780996431dbc38f47517a0 Author: Weijie Guo AuthorDate: Wed Nov 16 21:05:24 2022 +0800 [hotfix] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ. (cherry picked from commit 97c9abf9791d8c08db27f0ef2a2a78488321b0a1) --- .../ResultPartitionDeploymentDescriptorTest.java | 39 .../netty/ClientTransportErrorHandlingTest.java| 106 ++--- .../netty/PartitionRequestClientFactoryTest.java | 99 ++- 3 files changed, 122 insertions(+), 122 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index 33c79a1be4a..1fe6b9e329e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -31,20 +31,17 @@ import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionC import org.apache.flink.runtime.shuffle.PartitionDescriptor; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor; -import org.apache.flink.util.TestLogger; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.net.InetSocketAddress; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link ResultPartitionDeploymentDescriptor}. */ -public class ResultPartitionDeploymentDescriptorTest extends TestLogger { +class ResultPartitionDeploymentDescriptorTest { private static final IntermediateDataSetID resultId = new IntermediateDataSetID(); private static final int numberOfPartitions = 5; @@ -78,18 +75,18 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger { /** Tests simple de/serialization with {@link UnknownShuffleDescriptor}. */ @Test -public void testSerializationOfUnknownShuffleDescriptor() throws IOException { +void testSerializationOfUnknownShuffleDescriptor() throws IOException { ShuffleDescriptor shuffleDescriptor = new UnknownShuffleDescriptor(resultPartitionID); ShuffleDescriptor shuffleDescriptorCopy = CommonTestUtils.createCopySerializable(shuffleDescriptor); -assertThat(shuffleDescriptorCopy, instanceOf(UnknownShuffleDescriptor.class)); -assertThat(shuffleDescriptorCopy.getResultPartitionID(), is(resultPartitionID)); -assertThat(shuffleDescriptorCopy.isUnknown(), is(true)); + assertThat(shuffleDescriptorCopy).isInstanceOf(UnknownShuffleDescriptor.class); + assertThat(resultPartitionID).isEqualTo(shuffleDescriptorCopy.getResultPartitionID()); +assertThat(shuffleDescriptorCopy.isUnknown()).isTrue(); } /** Tests simple de/serialization with {@link NettyShuffleDescriptor}. */ @Test -public void testSerializationWithNettyShuffleDescriptor() throws IOException { +void testSerializationWithNettyShuffleDescriptor() throws IOException { ShuffleDescriptor shuffleDescriptor = new NettyShuffleDescriptor( producerLocation, @@ -99,13 +96,13 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger { ResultPartitionDeploymentDescriptor copy = createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor); -assertThat(copy.getShuffleDescriptor(), instanceOf(NettyShuffleDescriptor.class)); + assertThat(copy.getShuffleDescriptor()).isInstanceOf(NettyShuffleDescriptor.class); NettyShuffleDescriptor shuffleDescriptorCopy = (NettyShuffleDescriptor) copy.getShuffleDescriptor(); -assertThat(shuffleDescriptorCopy.getResultPartitionID(), is(resultPartitionID)); -assertThat(shuffleDescriptorCopy.isUnknown(), is(false)); -assertThat(shuffleDescriptorCopy.isLocalTo(producerLocation), is(true)); -assertThat(shuffleDescriptorCopy.getConnectionId(), is(connectionID)); + assertThat(resultPartitionID).isEqualTo(shuffleDescriptorCopy.getResultPartitionID()); +assertThat(shuffleDescriptorCopy.isUnknown()).isFalse(); +
[flink] branch release-1.16 updated (d8f7777f321 -> df061627591)
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a change to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git from d8ff321 [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest new e1c75638af6 [hotfix] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ. new df061627591 [FLINK-29639] Print resourceId of remote taskmanager when encounter transport exception. The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/runtime/io/network/ConnectionID.java | 22 ++-- .../runtime/io/network/NetworkClientHandler.java | 2 + .../CreditBasedPartitionRequestClientHandler.java | 37 ++- .../network/netty/NettyPartitionRequestClient.java | 16 ++- .../netty/PartitionRequestClientFactory.java | 4 + .../runtime/shuffle/NettyShuffleDescriptor.java| 38 +-- .../ResultPartitionDeploymentDescriptorTest.java | 44 .../runtime/deployment/ShuffleDescriptorTest.java | 13 ++- .../netty/ClientTransportErrorHandlingTest.java| 119 ++-- ...editBasedPartitionRequestClientHandlerTest.java | 4 + .../netty/NettyPartitionRequestClientTest.java | 4 +- .../runtime/io/network/netty/NettyTestUtil.java| 4 +- .../netty/PartitionRequestClientFactoryTest.java | 121 - .../partition/consumer/InputChannelBuilder.java| 3 +- .../util/NettyShuffleDescriptorBuilder.java| 8 +- 15 files changed, 270 insertions(+), 169 deletions(-)
[flink] branch release-1.15 updated (6b565055beb -> 28a877b8880)
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a change to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git from 6b565055beb [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel add 28a877b8880 [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest No new revisions were added by this update. Summary of changes: .../io/network/netty/NettyBufferPoolTest.java | 58 -- 1 file changed, 44 insertions(+), 14 deletions(-)
[flink] branch release-1.16 updated: [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.16 by this push: new d8ff321 [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest d8ff321 is described below commit d8ff3216101fd31dbcdc4a725d2e7ead4113 Author: Yun Gao AuthorDate: Mon Dec 7 18:06:34 2020 +0800 [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest --- .../io/network/netty/NettyBufferPoolTest.java | 58 -- 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java index 3797d025e77..0aaa11ab64c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java @@ -18,36 +18,61 @@ package org.apache.flink.runtime.io.network.netty; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import org.junit.After; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** Tests for the {@link NettyBufferPool} wrapper. */ public class NettyBufferPoolTest { +private final List needReleasing = new ArrayList<>(); + +@After +public void tearDown() { +try { +// Release all of the buffers. +for (ByteBuf buf : needReleasing) { +buf.release(); +} + +// Checks in a separate loop in case we have sliced buffers. +for (ByteBuf buf : needReleasing) { +assertEquals(0, buf.refCnt()); +} +} finally { +needReleasing.clear(); +} +} + @Test public void testNoHeapAllocations() throws Exception { -NettyBufferPool nettyBufferPool = new NettyBufferPool(1); +final NettyBufferPool nettyBufferPool = new NettyBufferPool(1); // Buffers should prefer to be direct -assertTrue(nettyBufferPool.buffer().isDirect()); -assertTrue(nettyBufferPool.buffer(128).isDirect()); -assertTrue(nettyBufferPool.buffer(128, 256).isDirect()); +assertTrue(releaseLater(nettyBufferPool.buffer()).isDirect()); +assertTrue(releaseLater(nettyBufferPool.buffer(128)).isDirect()); +assertTrue(releaseLater(nettyBufferPool.buffer(128, 256)).isDirect()); // IO buffers should prefer to be direct -assertTrue(nettyBufferPool.ioBuffer().isDirect()); -assertTrue(nettyBufferPool.ioBuffer(128).isDirect()); -assertTrue(nettyBufferPool.ioBuffer(128, 256).isDirect()); +assertTrue(releaseLater(nettyBufferPool.ioBuffer()).isDirect()); +assertTrue(releaseLater(nettyBufferPool.ioBuffer(128)).isDirect()); +assertTrue(releaseLater(nettyBufferPool.ioBuffer(128, 256)).isDirect()); // Currently we fakes the heap buffer allocation with direct buffers -assertTrue(nettyBufferPool.heapBuffer().isDirect()); -assertTrue(nettyBufferPool.heapBuffer(128).isDirect()); -assertTrue(nettyBufferPool.heapBuffer(128, 256).isDirect()); +assertTrue(releaseLater(nettyBufferPool.heapBuffer()).isDirect()); +assertTrue(releaseLater(nettyBufferPool.heapBuffer(128)).isDirect()); +assertTrue(releaseLater(nettyBufferPool.heapBuffer(128, 256)).isDirect()); // Composite buffers allocates the corresponding type of buffers when extending its capacity - assertTrue(nettyBufferPool.compositeHeapBuffer().capacity(1024).isDirect()); - assertTrue(nettyBufferPool.compositeHeapBuffer(10).capacity(1024).isDirect()); + assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer()).capacity(1024).isDirect()); + assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer(10)).capacity(1024).isDirect()); // Is direct buffer pooled! assertTrue(nettyBufferPool.isDirectBufferPooled()); @@ -60,16 +85,21 @@ public class NettyBufferPoolTest { { // Single large buffer allocates one chunk -nettyBufferPool.directBuffer(chunkSize - 64); +releaseLater(nettyBufferPool.directBuffer(chunkSize - 64)); long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get(); assertEquals(chunkSize, allocated); } { // Allocate a little more (one more chunk required) -nettyBufferPool.directBuffer(128); +releaseLater(nettyBufferPool.directBuffer(128)); long allocated =
[flink] branch master updated: [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 88a161101bb [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest 88a161101bb is described below commit 88a161101bb33b4c088325788bd11d41f9369355 Author: Yun Gao AuthorDate: Mon Dec 7 18:06:34 2020 +0800 [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest This closes #14319. --- .../io/network/netty/NettyBufferPoolTest.java | 58 -- 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java index 3797d025e77..0aaa11ab64c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java @@ -18,36 +18,61 @@ package org.apache.flink.runtime.io.network.netty; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import org.junit.After; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** Tests for the {@link NettyBufferPool} wrapper. */ public class NettyBufferPoolTest { +private final List needReleasing = new ArrayList<>(); + +@After +public void tearDown() { +try { +// Release all of the buffers. +for (ByteBuf buf : needReleasing) { +buf.release(); +} + +// Checks in a separate loop in case we have sliced buffers. +for (ByteBuf buf : needReleasing) { +assertEquals(0, buf.refCnt()); +} +} finally { +needReleasing.clear(); +} +} + @Test public void testNoHeapAllocations() throws Exception { -NettyBufferPool nettyBufferPool = new NettyBufferPool(1); +final NettyBufferPool nettyBufferPool = new NettyBufferPool(1); // Buffers should prefer to be direct -assertTrue(nettyBufferPool.buffer().isDirect()); -assertTrue(nettyBufferPool.buffer(128).isDirect()); -assertTrue(nettyBufferPool.buffer(128, 256).isDirect()); +assertTrue(releaseLater(nettyBufferPool.buffer()).isDirect()); +assertTrue(releaseLater(nettyBufferPool.buffer(128)).isDirect()); +assertTrue(releaseLater(nettyBufferPool.buffer(128, 256)).isDirect()); // IO buffers should prefer to be direct -assertTrue(nettyBufferPool.ioBuffer().isDirect()); -assertTrue(nettyBufferPool.ioBuffer(128).isDirect()); -assertTrue(nettyBufferPool.ioBuffer(128, 256).isDirect()); +assertTrue(releaseLater(nettyBufferPool.ioBuffer()).isDirect()); +assertTrue(releaseLater(nettyBufferPool.ioBuffer(128)).isDirect()); +assertTrue(releaseLater(nettyBufferPool.ioBuffer(128, 256)).isDirect()); // Currently we fakes the heap buffer allocation with direct buffers -assertTrue(nettyBufferPool.heapBuffer().isDirect()); -assertTrue(nettyBufferPool.heapBuffer(128).isDirect()); -assertTrue(nettyBufferPool.heapBuffer(128, 256).isDirect()); +assertTrue(releaseLater(nettyBufferPool.heapBuffer()).isDirect()); +assertTrue(releaseLater(nettyBufferPool.heapBuffer(128)).isDirect()); +assertTrue(releaseLater(nettyBufferPool.heapBuffer(128, 256)).isDirect()); // Composite buffers allocates the corresponding type of buffers when extending its capacity - assertTrue(nettyBufferPool.compositeHeapBuffer().capacity(1024).isDirect()); - assertTrue(nettyBufferPool.compositeHeapBuffer(10).capacity(1024).isDirect()); + assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer()).capacity(1024).isDirect()); + assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer(10)).capacity(1024).isDirect()); // Is direct buffer pooled! assertTrue(nettyBufferPool.isDirectBufferPooled()); @@ -60,16 +85,21 @@ public class NettyBufferPoolTest { { // Single large buffer allocates one chunk -nettyBufferPool.directBuffer(chunkSize - 64); +releaseLater(nettyBufferPool.directBuffer(chunkSize - 64)); long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get(); assertEquals(chunkSize, allocated); } { // Allocate a little more (one more chunk required) -nettyBufferPool.directBuffer(128); +releaseLater(nettyBufferPool.directBuffer(128)); long allocated =
[flink] branch master updated: [FLINK-29423][rest] Remove custom JobDetails serializer
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new ec89c17da5d [FLINK-29423][rest] Remove custom JobDetails serializer ec89c17da5d is described below commit ec89c17da5d555ca44f89d3f8739fa9ae00734b7 Author: Chesnay Schepler AuthorDate: Fri Nov 11 13:55:40 2022 +0100 [FLINK-29423][rest] Remove custom JobDetails serializer --- .../shortcodes/generated/rest_v1_dispatcher.html | 33 +++- docs/static/generated/rest_v1_dispatcher.yml | 41 ++--- .../src/test/resources/rest_api_v1.snapshot| 33 +++- .../runtime/messages/webmonitor/JobDetails.java| 167 + 4 files changed, 141 insertions(+), 133 deletions(-) diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index ac1c58c1299..baa681d261b 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -1096,7 +1096,38 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "jobs" : { "type" : "array", "items" : { -"type" : "any" +"type" : "object", +"id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobDetails", +"properties" : { + "duration" : { +"type" : "integer" + }, + "end-time" : { +"type" : "integer" + }, + "jid" : { +"type" : "any" + }, + "last-modification" : { +"type" : "integer" + }, + "name" : { +"type" : "string" + }, + "start-time" : { +"type" : "integer" + }, + "state" : { +"type" : "string", +"enum" : [ "INITIALIZING", "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ] + }, + "tasks" : { +"type" : "object", +"additionalProperties" : { + "type" : "integer" +} + } +} } } } diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index c1dcc55d097..146b66469f3 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -1886,18 +1886,6 @@ components: total: type: integer format: int64 -CurrentAttempts: - type: object - properties: -currentAttempts: - uniqueItems: true - type: array - items: -type: integer -format: int32 -representativeAttempt: - type: integer - format: int32 DashboardConfiguration: type: object properties: @@ -2218,36 +2206,27 @@ components: JobDetails: type: object properties: -currentExecutionAttempts: - type: object - additionalProperties: -type: object -additionalProperties: - $ref: '#/components/schemas/CurrentAttempts' duration: type: integer format: int64 -endTime: +end-time: type: integer format: int64 -jobId: +jid: $ref: '#/components/schemas/JobID' -jobName: - type: string -lastUpdateTime: +last-modification: type: integer format: int64 -numTasks: - type: integer - format: int32 -startTime: +name: + type: string +start-time: type: integer format: int64 -status: +state: $ref: '#/components/schemas/JobStatus' -tasksPerState: - type: array - items: +tasks: + type: object + additionalProperties: type: integer format: int32 JobDetailsInfo: diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index ea31ca3d138..0ae17a6ce11 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -786,7 +786,38 @@ "jobs" : { "type" : "array", "items" : { -"type" : "any" +"type" : "object", +"id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobDetails", +"properties" : { + "jid" : { +"type" : "any" + }, + "name" : { +"type" : "string" + }, + "start-time" : { +