(flink) branch master updated: [FLINK-33714][doc] Update the documentation about the usage of RuntimeContext#getExecutionConfig.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 3531998adad [FLINK-33714][doc] Update the documentation about the usage of RuntimeContext#getExecutionConfig. 3531998adad is described below commit 3531998adad20f3904d1421a35a69fef44b2b69f Author: JunRuiLee AuthorDate: Mon Dec 11 17:36:59 2023 +0800 [FLINK-33714][doc] Update the documentation about the usage of RuntimeContext#getExecutionConfig. --- docs/content.zh/docs/dev/datastream/application_parameters.md | 3 +-- .../fault-tolerance/serialization/types_serialization.md | 10 +++--- docs/content/docs/dev/datastream/application_parameters.md | 3 +-- .../fault-tolerance/serialization/types_serialization.md | 10 +++--- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/application_parameters.md b/docs/content.zh/docs/dev/datastream/application_parameters.md index a9ad2c17739..98b567429fd 100644 --- a/docs/content.zh/docs/dev/datastream/application_parameters.md +++ b/docs/content.zh/docs/dev/datastream/application_parameters.md @@ -125,8 +125,7 @@ public static final class Tokenizer extends RichFlatMapFunction> out) { -ParameterTool parameters = (ParameterTool) - getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); +ParameterTool parameters = ParameterTool.fromMap(getRuntimeContext().getGlobalJobParameters()); parameters.getRequired("input"); // .. do more .. ``` diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md index 4d4f1456c2b..70679c5888d 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md @@ -355,12 +355,16 @@ You can still use the same method as in Java as a fallback. {{< /tab >}} {{< /tabs >}} -To create a `TypeSerializer`, simply call `typeInfo.createSerializer(config)` on the `TypeInformation` object. +There are two ways to create a TypeSerializer. +The first is to simply call `typeInfo.createSerializer(config)` on the `TypeInformation` object. The `config` parameter is of type `ExecutionConfig` and holds the information about the program's registered custom serializers. Where ever possibly, try to pass the programs proper ExecutionConfig. You can usually -obtain it from `DataStream` via calling `getExecutionConfig()`. Inside functions (like `MapFunction`), you can -get it by making the function a [Rich Function]() and calling `getRuntimeContext().getExecutionConfig()`. +obtain it from `DataStream` via calling `getExecutionConfig()`. + +The second is to use getRuntimeContext().createSerializer(typeInfo) within a function. Inside functions +(like `MapFunction`), you can get it by making the function a [Rich Function]() and calling +`getRuntimeContext().createSerializer(typeInfo)`. diff --git a/docs/content/docs/dev/datastream/application_parameters.md b/docs/content/docs/dev/datastream/application_parameters.md index 43d3e31bf09..6583c5c8a71 100644 --- a/docs/content/docs/dev/datastream/application_parameters.md +++ b/docs/content/docs/dev/datastream/application_parameters.md @@ -132,8 +132,7 @@ public static final class Tokenizer extends RichFlatMapFunction> out) { - ParameterTool parameters = (ParameterTool) - getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); + ParameterTool parameters = ParameterTool.fromMap(getRuntimeContext().getGlobalJobParameters()); parameters.getRequired("input"); // .. do more .. ``` diff --git a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md index 34d21b5e211..43062151733 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md @@ -356,12 +356,16 @@ You can still use the same method as in Java as a fallback. {{< /tab >}} {{< /tabs >}} -To create a `TypeSerializer`, simply call `typeInfo.createSerializer(config)` on the `TypeInformation` object. +There are two ways to create a TypeSerializer. +The first is to simply call `typeInfo.createSerializer(config)` on the `TypeInformation` object. The `config` parameter is of type `ExecutionConfig` and holds the information about the program's registered custom serializers. Where ever possibly, try to pass the programs proper ExecutionConfig. You can usually
(flink-connector-pulsar) branch v4.1 updated: [hotfix][build] Development branch should use SNAPSHOT version
This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch v4.1 in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git The following commit(s) were added to refs/heads/v4.1 by this push: new e1ee6b4 [hotfix][build] Development branch should use SNAPSHOT version e1ee6b4 is described below commit e1ee6b49d5d6222555241fac7fd8ba603d05bfe8 Author: Leonard Xu AuthorDate: Tue Dec 12 12:27:55 2023 +0800 [hotfix][build] Development branch should use SNAPSHOT version This closes #66. Co-authored-by: tison --- flink-connector-pulsar-e2e-tests/pom.xml | 2 +- flink-connector-pulsar/pom.xml | 2 +- flink-sql-connector-pulsar/pom.xml | 2 +- pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-connector-pulsar-e2e-tests/pom.xml b/flink-connector-pulsar-e2e-tests/pom.xml index 4643099..c4554b9 100644 --- a/flink-connector-pulsar-e2e-tests/pom.xml +++ b/flink-connector-pulsar-e2e-tests/pom.xml @@ -23,7 +23,7 @@ under the License. org.apache.flink flink-connector-pulsar-parent - 4.1.0 + 4.1-SNAPSHOT 4.0.0 diff --git a/flink-connector-pulsar/pom.xml b/flink-connector-pulsar/pom.xml index 709cbfd..9e23fa2 100644 --- a/flink-connector-pulsar/pom.xml +++ b/flink-connector-pulsar/pom.xml @@ -26,7 +26,7 @@ under the License. org.apache.flink flink-connector-pulsar-parent - 4.1.0 + 4.1-SNAPSHOT flink-connector-pulsar diff --git a/flink-sql-connector-pulsar/pom.xml b/flink-sql-connector-pulsar/pom.xml index 09babe6..d6242dd 100644 --- a/flink-sql-connector-pulsar/pom.xml +++ b/flink-sql-connector-pulsar/pom.xml @@ -26,7 +26,7 @@ under the License. org.apache.flink flink-connector-pulsar-parent - 4.1.0 + 4.1-SNAPSHOT flink-sql-connector-pulsar diff --git a/pom.xml b/pom.xml index 02fa673..a2e68f2 100644 --- a/pom.xml +++ b/pom.xml @@ -31,7 +31,7 @@ under the License. org.apache.flink flink-connector-pulsar-parent -4.1.0 +4.1-SNAPSHOT Flink : Connectors : Pulsar : Parent pom 2022
(flink) branch master updated: [hotfix][test] Migrate JsonRowDeserializationSchemaTest/JsonRowSerializationSchemaTest to Junit5 and Assertj (#23882)
This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e454fc07192 [hotfix][test] Migrate JsonRowDeserializationSchemaTest/JsonRowSerializationSchemaTest to Junit5 and Assertj (#23882) e454fc07192 is described below commit e454fc0719276d2d54590989b754e88b9adb4455 Author: yunhong <337361...@qq.com> AuthorDate: Tue Dec 12 10:19:08 2023 +0800 [hotfix][test] Migrate JsonRowDeserializationSchemaTest/JsonRowSerializationSchemaTest to Junit5 and Assertj (#23882) Co-authored-by: zhengyunhong.zyh --- .../json/JsonRowDeserializationSchemaTest.java | 99 -- .../json/JsonRowSerializationSchemaTest.java | 46 +- 2 files changed, 78 insertions(+), 67 deletions(-) diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java index 81e370c0fd3..4712768f291 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java @@ -26,10 +26,7 @@ import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; +import org.junit.jupiter.api.Test; import javax.annotation.Nullable; @@ -44,19 +41,14 @@ import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import static org.apache.flink.formats.utils.DeserializationSchemaMatcher.whenDeserializedWith; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.assertThat; -import static org.junit.internal.matchers.ThrowableCauseMatcher.hasCause; -import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the {@link JsonRowDeserializationSchema}. */ public class JsonRowDeserializationSchemaTest { private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); -@Rule public ExpectedException thrown = ExpectedException.none(); - /** Tests simple deserialization using type information. */ @Test public void testTypeInfoDeserialization() throws Exception { @@ -135,7 +127,11 @@ public class JsonRowDeserializationSchemaTest { row.setField(9, map); row.setField(10, nestedMap); -assertThat(serializedJson, whenDeserializedWith(deserializationSchema).equalsTo(row)); +assertThat( +whenDeserializedWith(deserializationSchema) +.equalsTo(row) +.matches(serializedJson)) +.isTrue(); } @Test @@ -205,7 +201,11 @@ public class JsonRowDeserializationSchemaTest { nestedRow.setField(1, BigDecimal.valueOf(12)); expected.setField(9, nestedRow); -assertThat(serializedJson, whenDeserializedWith(deserializationSchema).equalsTo(expected)); +assertThat( +whenDeserializedWith(deserializationSchema) +.equalsTo(expected) +.matches(serializedJson)) +.isTrue(); } /** Tests deserialization with non-existing field name. */ @@ -223,45 +223,55 @@ public class JsonRowDeserializationSchemaTest { new JsonRowDeserializationSchema.Builder(rowTypeInformation).build(); Row row = new Row(1); -assertThat(serializedJson, whenDeserializedWith(deserializationSchema).equalsTo(row)); +assertThat( +whenDeserializedWith(deserializationSchema) +.equalsTo(row) +.matches(serializedJson)) +.isTrue(); deserializationSchema = new JsonRowDeserializationSchema.Builder(rowTypeInformation) .failOnMissingField() .build(); - -assertThat( -serializedJson, -whenDeserializedWith(deserializationSchema) - .failsWithException(hasCause(instanceOf(IllegalStateException.class; +final JsonRowDeserializationSchema errorDs = deserializationSchema; +assertThatThrownBy(() ->
(flink) branch master updated: [FLINK-33672][table-runtime] Use MapState.entries() instead of keys() and get() in over window (#23855)
This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 080119cca53 [FLINK-33672][table-runtime] Use MapState.entries() instead of keys() and get() in over window (#23855) 080119cca53 is described below commit 080119cca53d9890257982b6a74a7d6f913253c2 Author: Zakelly AuthorDate: Tue Dec 12 10:14:04 2023 +0800 [FLINK-33672][table-runtime] Use MapState.entries() instead of keys() and get() in over window (#23855) --- .../operators/over/ProcTimeRangeBoundedPrecedingFunction.java | 8 +--- .../operators/over/RowTimeRangeBoundedPrecedingFunction.java | 10 ++ .../operators/over/RowTimeRowsBoundedPrecedingFunction.java| 6 -- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java index 39405ec8a05..15c5f0e4141 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java @@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; /** * Process Function used for the aggregate in bounded proc-time OVER window. @@ -197,13 +198,14 @@ public class ProcTimeRangeBoundedPrecedingFunction // when we find timestamps that are out of interest, we retrieve corresponding elements // and eliminate them. Multiple elements could have been received at the same timestamp // the removal of old elements happens only once per proctime as onTimer is called only once -Iterator iter = inputState.keys().iterator(); +Iterator>> iter = inputState.entries().iterator(); List markToRemove = new ArrayList(); while (iter.hasNext()) { -Long elementKey = iter.next(); +Map.Entry> element = iter.next(); +Long elementKey = element.getKey(); if (elementKey < limit) { // element key outside of window. Retract values -List elementsRemove = inputState.get(elementKey); +List elementsRemove = element.getValue(); if (elementsRemove != null) { int iRemove = 0; while (iRemove < elementsRemove.size()) { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java index bf5a3d5ca7f..285b8de6f1b 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; /** * Process Function for RANGE clause event-time bounded OVER window. @@ -227,12 +228,13 @@ public class RowTimeRangeBoundedPrecedingFunction List retractTsList = new ArrayList(); // do retraction -Iterator dataTimestampIt = inputState.keys().iterator(); -while (dataTimestampIt.hasNext()) { -Long dataTs = dataTimestampIt.next(); +Iterator>> iter = inputState.entries().iterator(); +while (iter.hasNext()) { +Map.Entry> data = iter.next(); +Long dataTs = data.getKey(); Long offset = timestamp - dataTs; if (offset > precedingOffset) { -List retractDataList = inputState.get(dataTs); +List retractDataList = data.getValue(); if (retractDataList != null) { dataListIndex = 0; while (dataListIndex < retractDataList.size()) { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRowsBoundedPrecedingFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRowsBoundedPrecedingFunction.java index 74ffb428c2d..df897eef9a0 100644 ---
(flink) branch master updated: [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan
This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9cac80be18c [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan 9cac80be18c is described below commit 9cac80be18c6aff0cebdfe706327c1693822e884 Author: Yuxin Tan AuthorDate: Wed Nov 22 14:22:45 2023 +0800 [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan --- .../utils/InputPriorityConflictResolver.java | 6 +- .../planner/utils/StreamExchangeModeUtils.java | 4 + .../physical/batch/BatchPhysicalExchange.scala | 10 +- .../utils/InputPriorityConflictResolverTest.java | 72 +++- .../plan/optimize/ShuffleModePlanOptimizeTest.java | 127 ++ .../planner/utils/StreamExchangeModeUtilsTest.java | 11 + .../plan/optimize/ShuffleModePlanOptimizeTest.xml | 445 + 7 files changed, 655 insertions(+), 20 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java index 144b527c520..0479993b5bd 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java @@ -199,7 +199,11 @@ public class InputPriorityConflictResolver extends InputPriorityGraphGenerator { } private InputProperty.DamBehavior getDamBehavior() { -if (getBatchStreamExchangeMode(tableConfig, exchangeMode) == StreamExchangeMode.BATCH) { +StreamExchangeMode streamExchangeMode = +getBatchStreamExchangeMode(tableConfig, exchangeMode); +if (streamExchangeMode == StreamExchangeMode.BATCH +|| streamExchangeMode == StreamExchangeMode.HYBRID_FULL +|| streamExchangeMode == StreamExchangeMode.HYBRID_SELECTIVE) { return InputProperty.DamBehavior.BLOCKING; } else { return InputProperty.DamBehavior.PIPELINED; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java index bd722a07726..312e59bee34 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java @@ -52,6 +52,10 @@ public class StreamExchangeModeUtils { final BatchShuffleMode shuffleMode = config.get(ExecutionOptions.BATCH_SHUFFLE_MODE); if (shuffleMode == BatchShuffleMode.ALL_EXCHANGES_BLOCKING) { return StreamExchangeMode.BATCH; +} else if (shuffleMode == BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL) { +return StreamExchangeMode.HYBRID_FULL; +} else if (shuffleMode == BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE) { +return StreamExchangeMode.HYBRID_SELECTIVE; } return StreamExchangeMode.UNDEFINED; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala index 3fbddc658e1..0a57f8087f8 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala @@ -60,10 +60,12 @@ class BatchPhysicalExchange( val exchangeMode = getBatchStreamExchangeMode(unwrapTableConfig(this), StreamExchangeMode.UNDEFINED) -val damBehavior = if (exchangeMode eq StreamExchangeMode.BATCH) { - InputProperty.DamBehavior.BLOCKING -} else { - InputProperty.DamBehavior.PIPELINED +val damBehavior = exchangeMode match { + case StreamExchangeMode.BATCH | StreamExchangeMode.HYBRID_FULL | + StreamExchangeMode.HYBRID_SELECTIVE => +InputProperty.DamBehavior.BLOCKING + case _ => +InputProperty.DamBehavior.PIPELINED } InputProperty.builder diff --git
(flink-connector-pulsar) branch dependabot/maven/org.apache.commons-commons-compress-1.24.0 deleted (was ea918c8)
This is an automated email from the ASF dual-hosted git repository. tison pushed a change to branch dependabot/maven/org.apache.commons-commons-compress-1.24.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git was ea918c8 Bump org.apache.commons:commons-compress from 1.22 to 1.24.0 The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
(flink-connector-pulsar) branch main updated: Bump org.apache.commons:commons-compress from 1.22 to 1.24.0 (#65)
This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git The following commit(s) were added to refs/heads/main by this push: new 74a4696 Bump org.apache.commons:commons-compress from 1.22 to 1.24.0 (#65) 74a4696 is described below commit 74a46963fd8eb05de9dff4c6b302f37d1a3faf0e Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> AuthorDate: Tue Dec 12 09:20:35 2023 +0800 Bump org.apache.commons:commons-compress from 1.22 to 1.24.0 (#65) Bumps org.apache.commons:commons-compress from 1.22 to 1.24.0. --- updated-dependencies: - dependency-name: org.apache.commons:commons-compress dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2292706..768a891 100644 --- a/pom.xml +++ b/pom.xml @@ -74,7 +74,7 @@ under the License. 1.7.36 2.17.2 2.3.1 -1.22 +1.24.0 1.12.20 2.24.0 3.3
(flink-connector-pulsar) branch main updated (d958d15 -> 29f1e92)
This is an automated email from the ASF dual-hosted git repository. tison pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git from d958d15 Update version to 4.1-SNAPSHOT add 29f1e92 [hotfix][build] Bump version to 4.2-SNAPSHOT (#67) No new revisions were added by this update. Summary of changes: flink-connector-pulsar-e2e-tests/pom.xml | 2 +- flink-connector-pulsar/pom.xml | 2 +- flink-sql-connector-pulsar/pom.xml | 2 +- pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-)
(flink-connector-kafka) branch main updated (c38a0406 -> 825052f5)
This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git from c38a0406 [FLINK-33559] Externalize Kafka Python connector code add 825052f5 [FLINK-33361][connectors/kafka] Add Java 17 compatibility to Flink Kafka connector No new revisions were added by this update. Summary of changes: .github/workflows/push_pr.yml | 6 +- flink-connector-kafka/pom.xml | 8 pom.xml | 7 +++ 3 files changed, 20 insertions(+), 1 deletion(-)
(flink) branch master updated: [FLINK-28215][Buildsystem] Update Maven Surefire plugin to 3.3.2. This closes #22502
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new ea4cdc28651 [FLINK-28215][Buildsystem] Update Maven Surefire plugin to 3.3.2. This closes #22502 ea4cdc28651 is described below commit ea4cdc28651ad91defd4fc7b371a1f520ea7a262 Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com> AuthorDate: Mon Dec 11 16:36:44 2023 +0100 [FLINK-28215][Buildsystem] Update Maven Surefire plugin to 3.3.2. This closes #22502 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 18c8b4b3f49..0304712e2ee 100644 --- a/pom.xml +++ b/pom.xml @@ -1714,7 +1714,7 @@ under the License. org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M5 + 3.2.2
(flink) 05/06: [FLINK-33709][checkpointing] Report CheckpointStats as Spans
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f5039a8824e9f78066fa56e2050daa18f69cc715 Author: Piotr Nowojski AuthorDate: Tue Sep 26 17:44:46 2023 +0200 [FLINK-33709][checkpointing] Report CheckpointStats as Spans --- docs/content.zh/docs/ops/traces.md | 57 ++ docs/content/docs/ops/traces.md| 57 ++ .../runtime/checkpoint/CheckpointStatsTracker.java | 11 + .../checkpoint/CheckpointStatsTrackerTest.java | 40 +++ 4 files changed, 165 insertions(+) diff --git a/docs/content.zh/docs/ops/traces.md b/docs/content.zh/docs/ops/traces.md index 427ab8d6cc3..eff102bb58a 100644 --- a/docs/content.zh/docs/ops/traces.md +++ b/docs/content.zh/docs/ops/traces.md @@ -68,4 +68,61 @@ Currently reporting Spans from Python is not supported. For information on how to set up Flink's trace reporters please take a look at the [trace reporters documentation]({{< ref "docs/deployment/trace_reporters" >}}). +## System traces + +Flink reports traces listed below. + +The tables below generally feature 5 columns: + +* The "Scope" column describes what is that trace reported scope. + +* The "Name" column describes the name of the reported trace. + +* The "Attributes" column lists the names of all attributes that are reported with the given trace. + +* The "Description" column provides information as to what a given attribute is reporting. + +### Checkpointing + +Flink reports a single span trace for the whole checkpoint once checkpoint reaches a terminal state: COMPLETED or FAILED. + + + + + Scope + Name + Attributes + Description + + + + + org.apache.flink.runtime.checkpoint.CheckpointStatsTracker + Checkpoint + startTs + Timestamp when the checkpoint has started. + + + endTs + Timestamp when the checkpoint has finished. + + + checkpointId + Id of the checkpoint. + + + checkpointedSize + Size in bytes of checkpointed state during this checkpoint. Might be smaller than fullSize if incremental checkpoints are used. + + + fullSize + Full size in bytes of the referenced state by this checkpoint. Might be larger than checkpointSize if incremental checkpoints are used. + + + checkpointStatus + What was the state of this checkpoint: FAILED or COMPLETED. + + + + {{< top >}} diff --git a/docs/content/docs/ops/traces.md b/docs/content/docs/ops/traces.md index 427ab8d6cc3..eff102bb58a 100644 --- a/docs/content/docs/ops/traces.md +++ b/docs/content/docs/ops/traces.md @@ -68,4 +68,61 @@ Currently reporting Spans from Python is not supported. For information on how to set up Flink's trace reporters please take a look at the [trace reporters documentation]({{< ref "docs/deployment/trace_reporters" >}}). +## System traces + +Flink reports traces listed below. + +The tables below generally feature 5 columns: + +* The "Scope" column describes what is that trace reported scope. + +* The "Name" column describes the name of the reported trace. + +* The "Attributes" column lists the names of all attributes that are reported with the given trace. + +* The "Description" column provides information as to what a given attribute is reporting. + +### Checkpointing + +Flink reports a single span trace for the whole checkpoint once checkpoint reaches a terminal state: COMPLETED or FAILED. + + + + + Scope + Name + Attributes + Description + + + + + org.apache.flink.runtime.checkpoint.CheckpointStatsTracker + Checkpoint + startTs + Timestamp when the checkpoint has started. + + + endTs + Timestamp when the checkpoint has finished. + + + checkpointId + Id of the checkpoint. + + + checkpointedSize + Size in bytes of checkpointed state during this checkpoint. Might be smaller than fullSize if incremental checkpoints are used. + + + fullSize + Full size in bytes of the referenced state by this checkpoint. Might be larger than checkpointSize if incremental checkpoints are used. + + + checkpointStatus + What was the state of this checkpoint: FAILED or COMPLETED. + + + + {{< top >}} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java index f868f3fb4ba..47a6c3cf12c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import
(flink) 04/06: [FLINK-33708][docs] Document Span and TraceReporter concepts
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit ddfb5a3a0292c29a5d8df3e3ab408893ab363e63 Author: Piotr Nowojski AuthorDate: Tue Oct 24 17:36:15 2023 +0200 [FLINK-33708][docs] Document Span and TraceReporter concepts --- docs/content.zh/docs/deployment/config.md | 9 +++ docs/content.zh/docs/deployment/security/_index.md | 4 +- docs/content.zh/docs/deployment/trace_reporters.md | 75 ++ docs/content.zh/docs/ops/metrics.md| 2 +- docs/content.zh/docs/ops/traces.md | 71 docs/content/docs/deployment/config.md | 9 +++ docs/content/docs/deployment/security/_index.md| 4 +- docs/content/docs/deployment/trace_reporters.md| 75 ++ docs/content/docs/ops/metrics.md | 2 +- docs/content/docs/ops/traces.md| 71 .../shortcodes/generated/trace_configuration.html | 36 +++ .../generated/trace_reporters_section.html | 30 + 12 files changed, 382 insertions(+), 6 deletions(-) diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md index b68d87afecf..34d04f733e5 100644 --- a/docs/content.zh/docs/deployment/config.md +++ b/docs/content.zh/docs/deployment/config.md @@ -287,6 +287,15 @@ Enabling RocksDB's native metrics may cause degraded performance and should be s +# Traces + +Please refer to the [tracing system documentation]({{< ref "docs/ops/traces" >}}) for background on Flink's tracing infrastructure. + +{{< generated/trace_configuration >}} + + + + # History Server The history server keeps the information of completed jobs (graphs, runtimes, statistics). To enable it, you have to enable "job archiving" in the JobManager (`jobmanager.archive.fs.dir`). diff --git a/docs/content.zh/docs/deployment/security/_index.md b/docs/content.zh/docs/deployment/security/_index.md index d098fefc680..8a2a87a398e 100644 --- a/docs/content.zh/docs/deployment/security/_index.md +++ b/docs/content.zh/docs/deployment/security/_index.md @@ -1,7 +1,7 @@ --- title: Security bookCollapseSection: true -weight: 8 +weight: 9 --- \ No newline at end of file +--> diff --git a/docs/content.zh/docs/deployment/trace_reporters.md b/docs/content.zh/docs/deployment/trace_reporters.md new file mode 100644 index 000..eaab7d4ce3d --- /dev/null +++ b/docs/content.zh/docs/deployment/trace_reporters.md @@ -0,0 +1,75 @@ +--- +title: "Trace Reporters" +weight: 7 +type: docs +aliases: + - /deployment/trace_reporters.html +--- + + +# Trace Reporters + +Flink allows reporting traces to external systems. +For more information about Flink's tracing system go to the [tracing system documentation]({{< ref "docs/ops/traces" >}}). + +Traces can be exposed to an external system by configuring one or several reporters in `conf/flink-conf.yaml`. These +reporters will be instantiated on each job and task manager when they are started. + +Below is a list of parameters that are generally applicable to all reporters. +All properties are configured by setting `traces.reporter..` in the configuration. +Reporters may additionally offer implementation-specific parameters, which are documented in the respective reporter's section. + +{{< include_reporter_config "layouts/shortcodes/generated/trace_reporters_section.html" >}} + +All reporter configurations must contain the `factory.class` property. + +Example reporter configuration that specifies multiple reporters: + +```yaml +traces.reporters: otel,my_other_otel + +traces.reporter.otel.factory.class: org.apache.flink.common.metrics.OpenTelemetryTraceReporterFactory +traces.reporter.otel.exporter.endpoint: http://127.0.0.1:1337 +traces.reporter.otel.scope.variables.additional: region:eu-west-1,environment:local-pnowojski-test,flink_runtime:1.17.1 + +traces.reporter.my_other_otel.factory.class: org.apache.flink.common.metrics.OpenTelemetryTraceReporterFactory +traces.reporter.my_other_otel.exporter.endpoint: http://196.168.0.1:31337 +``` + +**Important:** The jar containing the reporter must be accessible when Flink is started. + Reporters are loaded as [plugins]({{< ref "docs/deployment/filesystems/plugins" >}}). + All reporters documented on this page are available by default. + +You can write your own `Reporter` by implementing the `org.apache.flink.traces.reporter.TraceReporter` and `org.apache.flink.traces.reporter.TraceReporterFactory` interfaces. +Be careful that all the method must not block for a significant amount of time, and any reporter needing more time should instead run the operation asynchronously. + +## Reporters + +The following sections list the supported reporters. + +### Slf4j + (org.apache.flink.traces.slf4j.Slf4jTraceReporter) + +Example configuration: +
(flink) 06/06: [hotfix][docs] Make lvl 1 headers visible in the docs TOC
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 589ea2e752495fd724bb85541e6566be907db0ba Author: Piotr Nowojski AuthorDate: Thu Dec 7 11:01:33 2023 +0100 [hotfix][docs] Make lvl 1 headers visible in the docs TOC https://github.com/apache/flink/pull/23845#discussion_r1417918412 There are many pages that have more than one lvl 1 header. In that case, our current TOC doesn't make sense and is mostly unreadable, as by default TOC start from lvl 2 headers only. For example: docs/content/docs/deployment/config.md --- docs/config.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/config.toml b/docs/config.toml index 8969a081bd5..3776b6f895d 100644 --- a/docs/config.toml +++ b/docs/config.toml @@ -100,6 +100,8 @@ pygmentsUseClasses = true [markup] [markup.goldmark.renderer] unsafe = true +[markup.tableOfContents] + startLevel = 1 [languages] [languages.en]
(flink) branch master updated (548e4b5188b -> 589ea2e7524)
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 548e4b5188b [FLINK-25565][Formats][Parquet] timestamp int64 option tidy up (#23900) new 066f34fde30 [hotfix][metrics] Reduce indentation level in #addMetric new f9fa5498514 [FLINK-33708][metrics] Add Span and TraceReporter new 7db2ecad165 [FLINK-33696][metrics] Add Slf4jTraceReporter new ddfb5a3a029 [FLINK-33708][docs] Document Span and TraceReporter concepts new f5039a8824e [FLINK-33709][checkpointing] Report CheckpointStats as Spans new 589ea2e7524 [hotfix][docs] Make lvl 1 headers visible in the docs TOC The 6 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/config.toml | 2 + docs/content.zh/docs/deployment/config.md | 9 + docs/content.zh/docs/deployment/security/_index.md | 4 +- docs/content.zh/docs/deployment/trace_reporters.md | 75 + docs/content.zh/docs/ops/metrics.md| 2 +- docs/content.zh/docs/ops/traces.md | 128 + docs/content/docs/deployment/config.md | 9 + docs/content/docs/deployment/security/_index.md| 4 +- docs/content/docs/deployment/trace_reporters.md| 75 + docs/content/docs/ops/metrics.md | 2 +- docs/content/docs/ops/traces.md| 128 + .../shortcodes/generated/trace_configuration.html | 36 +++ .../generated/trace_reporters_section.html | 30 ++ .../flink/annotation/docs/Documentation.java | 2 + .../flink/configuration/ConfigConstants.java | 9 +- .../apache/flink/configuration/TraceOptions.java | 108 +++ .../java/org/apache/flink/metrics/MetricGroup.java | 9 + .../flink/metrics/reporter/MetricReporter.java | 4 +- .../metrics/reporter/MetricReporterFactory.java| 8 +- .../java/org/apache/flink/traces/SimpleSpan.java | 92 ++ .../main/java/org/apache/flink/traces/Span.java| 52 .../java/org/apache/flink/traces/SpanBuilder.java | 98 +++ .../reporter/TraceReporter.java} | 37 +-- .../reporter/TraceReporterFactory.java}| 20 +- .../flink/traces/slf4j/Slf4jTraceReporter.java}| 38 +-- .../traces/slf4j/Slf4jTraceReporterFactory.java| 32 +++ ...ache.flink.traces.reporter.TraceReporterFactory | 16 ++ .../runtime/checkpoint/CheckpointStatsTracker.java | 11 + .../runtime/entrypoint/ClusterEntrypoint.java | 4 +- .../flink/runtime/metrics/MetricRegistry.java | 5 + .../flink/runtime/metrics/MetricRegistryImpl.java | 197 + .../flink/runtime/metrics/NoOpMetricRegistry.java | 4 + .../flink/runtime/metrics/ReporterSetup.java | 51 ++-- .../flink/runtime/metrics/TraceReporterSetup.java | 314 + .../metrics/groups/AbstractMetricGroup.java| 78 +++-- .../runtime/metrics/groups/JobMetricGroup.java | 6 + .../runtime/metrics/groups/ProxyMetricGroup.java | 6 + .../flink/runtime/minicluster/MiniCluster.java | 3 + .../runtime/taskexecutor/TaskManagerRunner.java| 4 +- .../checkpoint/CheckpointStatsTrackerTest.java | 40 +++ .../metrics/util/TestingMetricRegistry.java| 25 +- .../state/RocksDBNativeMetricMonitorTest.java | 4 + 42 files changed, 1606 insertions(+), 175 deletions(-) create mode 100644 docs/content.zh/docs/deployment/trace_reporters.md create mode 100644 docs/content.zh/docs/ops/traces.md create mode 100644 docs/content/docs/deployment/trace_reporters.md create mode 100644 docs/content/docs/ops/traces.md create mode 100644 docs/layouts/shortcodes/generated/trace_configuration.html create mode 100644 docs/layouts/shortcodes/generated/trace_reporters_section.html create mode 100644 flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java create mode 100644 flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SimpleSpan.java create mode 100644 flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/Span.java create mode 100644 flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SpanBuilder.java copy flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/{metrics/reporter/MetricReporter.java => traces/reporter/TraceReporter.java} (59%) copy flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/{metrics/reporter/MetricReporterFactory.java => traces/reporter/TraceReporterFactory.java} (64%) copy flink-metrics/{flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporterFactory.java =>
(flink) 01/06: [hotfix][metrics] Reduce indentation level in #addMetric
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 066f34fde304b90e4ee3f12627b84ec1dd636ffd Author: Piotr Nowojski AuthorDate: Tue Sep 26 15:27:57 2023 +0200 [hotfix][metrics] Reduce indentation level in #addMetric --- .../metrics/groups/AbstractMetricGroup.java| 58 +++--- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index 44eb8c2b251..59537604c77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -390,41 +390,43 @@ public abstract class AbstractMetricGroup> impl } // add the metric only if the group is still open synchronized (this) { -if (!closed) { -// immediately put without a 'contains' check to optimize the common case (no -// collision) -// collisions are resolved later -Metric prior = metrics.put(name, metric); - -// check for collisions with other metric names -if (prior == null) { -// no other metric with this name yet - -if (groups.containsKey(name)) { -// we warn here, rather than failing, because metrics are tools that should -// not fail the -// program when used incorrectly -LOG.warn( -"Name collision: Adding a metric with the same name as a metric subgroup: '" -+ name -+ "'. Metric might not get properly reported. " -+ Arrays.toString(scopeComponents)); -} +if (closed) { +return; +} -registry.register(metric, name, this); -} else { -// we had a collision. put back the original value -metrics.put(name, prior); +// immediately put without a 'contains' check to optimize the common case (no +// collision) +// collisions are resolved later +Metric prior = metrics.put(name, metric); + +// check for collisions with other metric names +if (prior == null) { +// no other metric with this name yet -// we warn here, rather than failing, because metrics are tools that should not -// fail the +if (groups.containsKey(name)) { +// we warn here, rather than failing, because metrics are tools that should +// not fail the // program when used incorrectly LOG.warn( -"Name collision: Group already contains a Metric with the name '" +"Name collision: Adding a metric with the same name as a metric subgroup: '" + name -+ "'. Metric will not be reported." ++ "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents)); } + +registry.register(metric, name, this); +} else { +// we had a collision. put back the original value +metrics.put(name, prior); + +// we warn here, rather than failing, because metrics are tools that should not +// fail the +// program when used incorrectly +LOG.warn( +"Name collision: Group already contains a Metric with the name '" ++ name ++ "'. Metric will not be reported." ++ Arrays.toString(scopeComponents)); } } }
(flink) 03/06: [FLINK-33696][metrics] Add Slf4jTraceReporter
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 7db2ecad165009312dfbd35c15f42b61d98ad2fe Author: Piotr Nowojski AuthorDate: Wed Dec 6 16:22:10 2023 +0100 [FLINK-33696][metrics] Add Slf4jTraceReporter --- .../flink/traces/slf4j/Slf4jTraceReporter.java | 45 ++ .../traces/slf4j/Slf4jTraceReporterFactory.java| 32 +++ ...ache.flink.traces.reporter.TraceReporterFactory | 16 3 files changed, 93 insertions(+) diff --git a/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/traces/slf4j/Slf4jTraceReporter.java b/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/traces/slf4j/Slf4jTraceReporter.java new file mode 100644 index 000..d867a58cbbd --- /dev/null +++ b/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/traces/slf4j/Slf4jTraceReporter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.traces.slf4j; + +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.traces.Span; +import org.apache.flink.traces.reporter.TraceReporter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link TraceReporter} that exports {@link org.apache.flink.traces.Span Spans} via SLF4J {@link + * Logger}. + */ +public class Slf4jTraceReporter implements TraceReporter { +private static final Logger LOG = LoggerFactory.getLogger(Slf4jTraceReporter.class); + +@Override +public void notifyOfAddedSpan(Span span) { +LOG.info("Reported span: {}", span); +} + +@Override +public void open(MetricConfig metricConfig) {} + +@Override +public void close() {} +} diff --git a/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/traces/slf4j/Slf4jTraceReporterFactory.java b/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/traces/slf4j/Slf4jTraceReporterFactory.java new file mode 100644 index 000..dd2a7ddb15b --- /dev/null +++ b/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/traces/slf4j/Slf4jTraceReporterFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.traces.slf4j; + +import org.apache.flink.traces.reporter.TraceReporter; +import org.apache.flink.traces.reporter.TraceReporterFactory; + +import java.util.Properties; + +/** {@link TraceReporterFactory} for {@link Slf4jTraceReporter}. */ +public class Slf4jTraceReporterFactory implements TraceReporterFactory { + +@Override +public TraceReporter createTraceReporter(Properties properties) { +return new Slf4jTraceReporter(); +} +} diff --git a/flink-metrics/flink-metrics-slf4j/src/main/resources/META-INF/services/org.apache.flink.traces.reporter.TraceReporterFactory b/flink-metrics/flink-metrics-slf4j/src/main/resources/META-INF/services/org.apache.flink.traces.reporter.TraceReporterFactory new file mode 100644 index 000..5716cdd1da3 --- /dev/null +++ b/flink-metrics/flink-metrics-slf4j/src/main/resources/META-INF/services/org.apache.flink.traces.reporter.TraceReporterFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding
(flink) 02/06: [FLINK-33708][metrics] Add Span and TraceReporter
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f9fa54985148b313aa8964c2233d7714e8bc7d45 Author: Piotr Nowojski AuthorDate: Tue Sep 26 17:40:29 2023 +0200 [FLINK-33708][metrics] Add Span and TraceReporter --- .../flink/annotation/docs/Documentation.java | 2 + .../flink/configuration/ConfigConstants.java | 9 +- .../apache/flink/configuration/TraceOptions.java | 108 +++ .../java/org/apache/flink/metrics/MetricGroup.java | 9 + .../flink/metrics/reporter/MetricReporter.java | 4 +- .../metrics/reporter/MetricReporterFactory.java| 8 +- .../java/org/apache/flink/traces/SimpleSpan.java | 92 ++ .../main/java/org/apache/flink/traces/Span.java| 52 .../java/org/apache/flink/traces/SpanBuilder.java | 98 +++ .../reporter/TraceReporter.java} | 37 +-- .../reporter/TraceReporterFactory.java}| 20 +- .../runtime/entrypoint/ClusterEntrypoint.java | 4 +- .../flink/runtime/metrics/MetricRegistry.java | 5 + .../flink/runtime/metrics/MetricRegistryImpl.java | 197 + .../flink/runtime/metrics/NoOpMetricRegistry.java | 4 + .../flink/runtime/metrics/ReporterSetup.java | 51 ++-- .../flink/runtime/metrics/TraceReporterSetup.java | 314 + .../metrics/groups/AbstractMetricGroup.java| 20 ++ .../runtime/metrics/groups/JobMetricGroup.java | 6 + .../runtime/metrics/groups/ProxyMetricGroup.java | 6 + .../flink/runtime/minicluster/MiniCluster.java | 3 + .../runtime/taskexecutor/TaskManagerRunner.java| 4 +- .../metrics/util/TestingMetricRegistry.java| 25 +- .../state/RocksDBNativeMetricMonitorTest.java | 4 + 24 files changed, 959 insertions(+), 123 deletions(-) diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java index 7bab129bcba..466f09ba4ce 100644 --- a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java +++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java @@ -106,6 +106,8 @@ public final class Documentation { public static final String METRIC_REPORTERS = "metric_reporters"; +public static final String TRACE_REPORTERS = "trace_reporters"; + private Sections() {} } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index a4968db4550..37e081e47cf 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -1108,10 +1108,15 @@ public final class ConfigConstants { @Deprecated public static final String METRICS_REPORTERS_LIST = "metrics.reporters"; /** - * The prefix for per-reporter configs. Has to be combined with a reporter name and the configs - * mentioned below. + * The prefix for per-metric reporter configs. Has to be combined with a reporter name and the + * configs mentioned below. */ public static final String METRICS_REPORTER_PREFIX = "metrics.reporter."; +/** + * The prefix for per-trace reporter configs. Has to be combined with a reporter name and the + * configs mentioned below. + */ +public static final String TRACES_REPORTER_PREFIX = "traces.reporter."; /** @deprecated use {@link MetricOptions#REPORTER_CLASS} */ @Deprecated public static final String METRICS_REPORTER_CLASS_SUFFIX = "class"; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java new file mode 100644 index 000..1aee746e210 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package
(flink) branch master updated: [FLINK-25565][Formats][Parquet] timestamp int64 option tidy up (#23900)
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 548e4b5188b [FLINK-25565][Formats][Parquet] timestamp int64 option tidy up (#23900) 548e4b5188b is described below commit 548e4b5188bb3f092206182d779a909756408660 Author: Thomas Weise AuthorDate: Mon Dec 11 08:05:53 2023 -0500 [FLINK-25565][Formats][Parquet] timestamp int64 option tidy up (#23900) --- docs/content/docs/connectors/table/formats/parquet.md | 5 +++-- .../flink/formats/parquet/vector/reader/TimestampColumnReader.java | 6 +++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/content/docs/connectors/table/formats/parquet.md b/docs/content/docs/connectors/table/formats/parquet.md index 75c524f238f..0e53f4b919e 100644 --- a/docs/content/docs/connectors/table/formats/parquet.md +++ b/docs/content/docs/connectors/table/formats/parquet.md @@ -107,9 +107,10 @@ For example, you can configure `parquet.compression=GZIP` to enable gzip compres Data Type Mapping -Currently, Parquet format type mapping is compatible with Apache Hive, but different with Apache Spark: +Currently, Parquet format type mapping is compatible with Apache Hive, but by default not with Apache Spark: - Timestamp: mapping timestamp type to int96 whatever the precision is. +- Spark compatibility requires int64 via config option `timestamp.time.unit` (see above). - Decimal: mapping decimal type to fixed length byte array according to the precision. The following table lists the type mapping from Flink type to Parquet type. @@ -185,7 +186,7 @@ The following table lists the type mapping from Flink type to Parquet type. TIMESTAMP - INT96 + INT96 (or INT64) diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java index aa544f4e91c..7a36edc573b 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java @@ -159,8 +159,8 @@ public class TimestampColumnReader extends AbstractColumnReader
(flink-kubernetes-operator) branch main updated: [FLINK-33763] Support savepoint redeployment through a nonce
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 9e8dcd97 [FLINK-33763] Support savepoint redeployment through a nonce 9e8dcd97 is described below commit 9e8dcd97cd07d4b769997ce40e43047301886c41 Author: Gyula Fora AuthorDate: Mon Dec 11 12:42:45 2023 +0100 [FLINK-33763] Support savepoint redeployment through a nonce --- .../content/docs/custom-resource/job-management.md | 33 +--- docs/content/docs/custom-resource/reference.md | 11 +-- .../kubernetes/operator/api/diff/DiffType.java | 18 - .../kubernetes/operator/api/diff/SpecDiff.java | 2 + .../operator/api/spec/AbstractFlinkSpec.java | 3 +- .../kubernetes/operator/api/spec/JobSpec.java | 16 +++- .../AbstractFlinkResourceReconciler.java | 15 +++- .../deployment/AbstractJobReconciler.java | 43 +- .../reconciler/deployment/SessionReconciler.java | 2 + .../operator/reconciler/diff/DiffResult.java | 15 +--- .../reconciler/diff/ReflectiveDiffBuilder.java | 12 +-- .../operator/validation/DefaultValidator.java | 8 ++ .../deployment/ApplicationReconcilerTest.java | 92 ++ .../operator/reconciler/diff/SpecDiffTest.java | 59 ++ .../operator/validation/DefaultValidatorTest.java | 88 + .../crds/flinkdeployments.flink.apache.org-v1.yml | 2 + .../crds/flinksessionjobs.flink.apache.org-v1.yml | 2 + 17 files changed, 377 insertions(+), 44 deletions(-) diff --git a/docs/content/docs/custom-resource/job-management.md b/docs/content/docs/custom-resource/job-management.md index 67664069..7a75bb19 100644 --- a/docs/content/docs/custom-resource/job-management.md +++ b/docs/content/docs/custom-resource/job-management.md @@ -288,12 +288,34 @@ Rollback is currently only supported for `FlinkDeployments`. ## Manual Recovery -There are cases when manual intervention is required from the user to recover a Flink application deployment. +There are cases when manual intervention is required from the user to recover a Flink application deployment or to restore to a user specified state. In most of these situations the main reason for this is that the deployment got into a state where the operator cannot determine the health of the application or the latest checkpoint information to be used for recovery. While these cases are not common, we need to be prepared to handle them. -Fortunately almost any issue can be recovered by the user manually by using the following steps: +Users have two options to restore a job from a target savepoint / checkpoint + +### Redeploy using the savepointRedeployNonce + +It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource from a target savepoint by using the combination of `savepointRedeployNonce` and `initialSavepointPath` in the job spec: + +```yaml + job: + initialSavepointPath: file://redeploy-target-savepoint + # If not set previously, set to 1, otherwise increment, e.g. 2 + savepointRedeployNonce: 1 +``` + +When changing the `savepointRedeployNonce` the operator will redeploy the job to the savepoint defined in the `initialSavepointPath`. The savepoint path must not be empty. + +{{< hint warning >}} +Rollbacks are not supported after redeployments. +{{< /hint >}} + +### Delete and recreate the custom resource + +Alternatively you can completely delete and recreate the custom resources to solve almost any issues. This will fully reset the status information to start from a clean slate. +However, this also means that savepoint history is lost and the operator won't clean up past periodic savepoints taken before the deletion. 1. Locate the latest checkpoint/savepoint metafile in your configured checkpoint/savepoint directory. 2. Delete the `FlinkDeployment` resource for your application @@ -303,10 +325,3 @@ Fortunately almost any issue can be recovered by the user manually by using the These steps ensure that the operator will start completely fresh from the user defined savepoint path and can hopefully fully recover. Keep an eye on your job to see what could have cause the problem in the first place. - -{{< hint info >}} -The main idea behind the recovery process is that the user needs to manually override the target checkpoint/savepoint location because it is not known to the operator. -The only way to do this is to delete the previous deployment resource fully, and recreate it with `initialSavepointPath` set. - -The `initialSavepointPath` setting only takes effect on the first deployment and the operator takes over checkpoint management after that. -{{< /hint >}} diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md index 4fc1ea25..dfe6e0d7
(flink-kubernetes-operator) branch main updated: [FLINK-33770] Promote deprecated autoscaler keys to fallback keys (#725)
This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new ef250a0c [FLINK-33770] Promote deprecated autoscaler keys to fallback keys (#725) ef250a0c is described below commit ef250a0ccc4429b2573c424ff5b35cad9134dfb0 Author: Maximilian Michels AuthorDate: Mon Dec 11 10:31:04 2023 +0100 [FLINK-33770] Promote deprecated autoscaler keys to fallback keys (#725) We moved all autoscaler configuration from `kubernetes.operator.job.autoscaler.` to `job.autoscaler.` With the latest release, the logs are full with logs like this: ``` level: WARN logger: org.apache.flink.configuration.Configuration message: Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.target.utilization' instead of proper key 'job.autoscaler.target.utilization' ``` The reason is that the configuration is loaded for every reconciliation. This configuration is already widely adopted across hundreds of pipelines. I propose to remove the deprecation from the config keys and make them "fallback" keys instead which removes the deprecation warning. --- .../flink/autoscaler/config/AutoScalerOptions.java | 70 ++-- .../autoscaler/config/AutoScalerOptionsTest.java | 77 ++ 2 files changed, 111 insertions(+), 36 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index 1436c91e..d8db0fed 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -27,11 +27,11 @@ import java.util.List; /** Config options related to the autoscaler module. */ public class AutoScalerOptions { -public static final String DEPRECATED_K8S_OP_CONF_PREFIX = "kubernetes.operator."; +public static final String OLD_K8S_OP_CONF_PREFIX = "kubernetes.operator."; public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler."; -private static String deprecatedOperatorConfigKey(String key) { -return DEPRECATED_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key; +private static String oldOperatorConfigKey(String key) { +return OLD_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key; } private static String autoScalerConfigKey(String key) { @@ -46,14 +46,14 @@ public class AutoScalerOptions { autoScalerConfig("enabled") .booleanType() .defaultValue(false) -.withDeprecatedKeys(deprecatedOperatorConfigKey("enabled")) +.withFallbackKeys(oldOperatorConfigKey("enabled")) .withDescription("Enable job autoscaler module."); public static final ConfigOption SCALING_ENABLED = autoScalerConfig("scaling.enabled") .booleanType() .defaultValue(true) - .withDeprecatedKeys(deprecatedOperatorConfigKey("scaling.enabled")) +.withFallbackKeys(oldOperatorConfigKey("scaling.enabled")) .withDescription( "Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs."); @@ -61,14 +61,14 @@ public class AutoScalerOptions { autoScalerConfig("metrics.window") .durationType() .defaultValue(Duration.ofMinutes(15)) - .withDeprecatedKeys(deprecatedOperatorConfigKey("metrics.window")) +.withFallbackKeys(oldOperatorConfigKey("metrics.window")) .withDescription("Scaling metrics aggregation window size."); public static final ConfigOption STABILIZATION_INTERVAL = autoScalerConfig("stabilization.interval") .durationType() .defaultValue(Duration.ofMinutes(5)) - .withDeprecatedKeys(deprecatedOperatorConfigKey("stabilization.interval")) + .withFallbackKeys(oldOperatorConfigKey("stabilization.interval")) .withDescription( "Stabilization period in which no new scaling will be executed"); @@ -76,14 +76,14 @@ public class AutoScalerOptions { autoScalerConfig("target.utilization") .doubleType() .defaultValue(0.7) - .withDeprecatedKeys(deprecatedOperatorConfigKey("target.utilization")) +
(flink-connector-kafka) branch dependabot/maven/org.apache.commons-commons-compress-1.24.0 created (now f701ac25)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/org.apache.commons-commons-compress-1.24.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git at f701ac25 Bump org.apache.commons:commons-compress from 1.22 to 1.24.0 No new revisions were added by this update.
(flink-connector-kafka) branch dependabot/maven/org.apache.zookeeper-zookeeper-3.7.2 created (now d66ab6e7)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/org.apache.zookeeper-zookeeper-3.7.2 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git at d66ab6e7 Bump org.apache.zookeeper:zookeeper from 3.5.9 to 3.7.2 No new revisions were added by this update.
(flink-connector-kafka) branch main updated: [FLINK-33559] Externalize Kafka Python connector code
This is an automated email from the ASF dual-hosted git repository. gaborgsomogyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git The following commit(s) were added to refs/heads/main by this push: new c38a0406 [FLINK-33559] Externalize Kafka Python connector code c38a0406 is described below commit c38a0406104646a7ea8199bb64244310e344ce2b Author: pvary AuthorDate: Mon Dec 11 09:40:50 2023 +0100 [FLINK-33559] Externalize Kafka Python connector code --- .github/workflows/push_pr.yml |8 + .gitignore | 18 +- .../push_pr.yml => flink-python/MANIFEST.in| 16 +- flink-python/README.txt| 14 + flink-python/dev/integration_test.sh | 54 + flink-python/pom.xml | 222 .../pyflink/datastream/connectors/kafka.py | 1163 .../datastream/connectors/tests/test_kafka.py | 669 +++ flink-python/pyflink/pyflink_gateway_server.py | 288 + flink-python/setup.py | 158 +++ flink-python/tox.ini | 51 + pom.xml|1 + 12 files changed, 2648 insertions(+), 14 deletions(-) diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index ddc50ab8..8f53a5bd 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -29,3 +29,11 @@ jobs: uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} + + python_test: +strategy: + matrix: +flink: [ 1.17.1, 1.18.0 ] +uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils +with: + flink_version: ${{ matrix.flink }} diff --git a/.gitignore b/.gitignore index 5f0068cd..901fd674 100644 --- a/.gitignore +++ b/.gitignore @@ -35,4 +35,20 @@ out/ tools/flink tools/flink-* tools/releasing/release -tools/japicmp-output \ No newline at end of file +tools/japicmp-output + +# Generated file, do not store in git +flink-python/pyflink/datastream/connectors/kafka_connector_version.py +flink-python/apache_flink_connectors_kafka.egg-info/ +flink-python/.tox/ +flink-python/build +flink-python/dist +flink-python/dev/download +flink-python/dev/.conda/ +flink-python/dev/log/ +flink-python/dev/.stage.txt +flink-python/dev/install_command.sh +flink-python/dev/lint-python.sh +flink-python/dev/build-wheels.sh +flink-python/dev/glibc_version_fix.h +flink-python/dev/dev-requirements.txt diff --git a/.github/workflows/push_pr.yml b/flink-python/MANIFEST.in similarity index 73% copy from .github/workflows/push_pr.yml copy to flink-python/MANIFEST.in index ddc50ab8..3578d2df 100644 --- a/.github/workflows/push_pr.yml +++ b/flink-python/MANIFEST.in @@ -16,16 +16,6 @@ # limitations under the License. -name: CI -on: [push, pull_request] -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true -jobs: - compile_and_test: -strategy: - matrix: -flink: [ 1.17.1, 1.18.0 ] -uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils -with: - flink_version: ${{ matrix.flink }} +graft pyflink +global-exclude *.py[cod] __pycache__ .DS_Store + diff --git a/flink-python/README.txt b/flink-python/README.txt new file mode 100644 index ..a12c13e5 --- /dev/null +++ b/flink-python/README.txt @@ -0,0 +1,14 @@ +This is official Apache Flink Kafka Python connector. + +For the latest information about Flink connector, please visit our website at: + + https://flink.apache.org + +and our GitHub Account for Kafka connector + + https://github.com/apache/flink-connector-kafka + +If you have any questions, ask on our Mailing lists: + + u...@flink.apache.org + d...@flink.apache.org diff --git a/flink-python/dev/integration_test.sh b/flink-python/dev/integration_test.sh new file mode 100755 index ..19816725 --- /dev/null +++ b/flink-python/dev/integration_test.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS,
(flink) branch master updated: [FLINK-33713][core] Deprecate RuntimeContext#getExecutionConfig
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 3aa70df4e7d [FLINK-33713][core] Deprecate RuntimeContext#getExecutionConfig 3aa70df4e7d is described below commit 3aa70df4e7da93ed32c26cfabdaeb606233419b1 Author: JunRuiLee AuthorDate: Wed Nov 8 11:48:45 2023 +0800 [FLINK-33713][core] Deprecate RuntimeContext#getExecutionConfig This closes #23848. --- .../mapred/HadoopReduceCombineFunction.java| 3 +- .../mapred/HadoopReduceFunction.java | 3 +- .../flink/api/common/functions/RuntimeContext.java | 33 + .../api/common/functions/SerializerFactory.java| 38 .../functions/util/AbstractRuntimeUDFContext.java | 19 ++ .../flink/api/common/state/StateDescriptor.java| 15 +++- .../flink/cep/operator/CepRuntimeContext.java | 19 ++ .../flink/cep/operator/CepRuntimeContextTest.java | 12 --- .../flink/state/api/EvictingWindowReader.java | 3 +- .../state/api/EvictingWindowSavepointReader.java | 3 +- .../apache/flink/state/api/ExistingSavepoint.java | 39 .../apache/flink/state/api/SavepointReader.java| 12 --- .../org/apache/flink/state/api/WindowReader.java | 3 +- .../flink/state/api/WindowSavepointReader.java | 3 +- .../state/api/input/BroadcastStateInputFormat.java | 8 +++-- .../state/api/input/KeyedStateInputFormat.java | 41 ++ .../state/api/input/ListStateInputFormat.java | 9 +++-- .../state/api/input/OperatorStateInputFormat.java | 25 +++-- .../api/input/StreamOperatorContextBuilder.java| 9 +++-- .../state/api/input/UnionStateInputFormat.java | 9 +++-- .../api/input/operator/StateReaderOperator.java| 14 .../api/input/operator/WindowReaderOperator.java | 4 +-- .../api/output/BootstrapStreamTaskRunner.java | 3 +- .../output/BoundedOneInputStreamTaskRunner.java| 5 ++- .../state/api/runtime/SavepointEnvironment.java| 13 +-- .../state/api/runtime/SavepointRuntimeContext.java | 19 ++ .../api/input/BroadcastStateInputFormatTest.java | 4 ++- .../state/api/input/KeyedStateInputFormatTest.java | 22 .../state/api/input/ListStateInputFormatTest.java | 13 +-- .../input/StreamOperatorContextBuilderTest.java| 6 ++-- .../state/api/input/UnionStateInputFormatTest.java | 4 ++- .../flink/state/api/input/WindowReaderTest.java| 16 ++--- .../org/apache/flink/python/util/ProtoUtils.java | 6 ++-- .../AbstractPythonStreamAggregateOperator.java | 3 +- ...wPythonOverWindowAggregateFunctionOperator.java | 3 +- .../runtime/state/DefaultKeyedStateStore.java | 18 +- .../scala/operators/ScalaAggregateOperator.java| 3 +- .../api/functions/async/RichAsyncFunction.java | 19 ++ .../source/ContinuousFileReaderOperator.java | 2 +- .../source/InputFormatSourceFunction.java | 2 +- .../api/operators/StreamOperatorStateHandler.java | 13 ++- .../streaming/api/operators/StreamSource.java | 3 +- .../api/operators/StreamingRuntimeContext.java | 10 +++--- .../runtime/operators/sink/SinkWriterOperator.java | 2 +- .../operators/windowing/WindowOperator.java| 11 +- .../api/functions/async/RichAsyncFunctionTest.java | 14 +--- .../api/operators/StreamingRuntimeContextTest.java | 32 +++-- .../flink/table/functions/FunctionContext.java | 7 +--- .../org/apache/flink/test/operators/MapITCase.java | 22 ++-- 49 files changed, 461 insertions(+), 138 deletions(-) diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java index 4db71d76d7b..fc9981379f8 100644 --- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java +++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java @@ -117,8 +117,7 @@ public final class HadoopReduceCombineFunction Class inKeyClass = (Class) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); TypeSerializer keySerializer = -TypeExtractor.getForClass(inKeyClass) - .createSerializer(getRuntimeContext().getExecutionConfig()); + getRuntimeContext().createSerializer(TypeExtractor.getForClass(inKeyClass)); this.valueIterator = new HadoopTupleUnwrappingIterator<>(keySerializer); this.combineCollector =