(flink) branch master updated: [FLINK-33714][doc] Update the documentation about the usage of RuntimeContext#getExecutionConfig.

2023-12-11 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 3531998adad [FLINK-33714][doc] Update the documentation about the 
usage of RuntimeContext#getExecutionConfig.
3531998adad is described below

commit 3531998adad20f3904d1421a35a69fef44b2b69f
Author: JunRuiLee 
AuthorDate: Mon Dec 11 17:36:59 2023 +0800

[FLINK-33714][doc] Update the documentation about the usage of 
RuntimeContext#getExecutionConfig.
---
 docs/content.zh/docs/dev/datastream/application_parameters.md  |  3 +--
 .../fault-tolerance/serialization/types_serialization.md   | 10 +++---
 docs/content/docs/dev/datastream/application_parameters.md |  3 +--
 .../fault-tolerance/serialization/types_serialization.md   | 10 +++---
 4 files changed, 16 insertions(+), 10 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/application_parameters.md 
b/docs/content.zh/docs/dev/datastream/application_parameters.md
index a9ad2c17739..98b567429fd 100644
--- a/docs/content.zh/docs/dev/datastream/application_parameters.md
+++ b/docs/content.zh/docs/dev/datastream/application_parameters.md
@@ -125,8 +125,7 @@ public static final class Tokenizer extends 
RichFlatMapFunction> out) {
-ParameterTool parameters = (ParameterTool)
-
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ParameterTool parameters = 
ParameterTool.fromMap(getRuntimeContext().getGlobalJobParameters());
 parameters.getRequired("input");
 // .. do more ..
 ```
diff --git 
a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
 
b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
index 4d4f1456c2b..70679c5888d 100644
--- 
a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
+++ 
b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
@@ -355,12 +355,16 @@ You can still use the same method as in Java as a 
fallback.
 {{< /tab >}}
 {{< /tabs >}}
 
-To create a `TypeSerializer`, simply call `typeInfo.createSerializer(config)` 
on the `TypeInformation` object.
+There are two ways to create a TypeSerializer. 
 
+The first is to simply call `typeInfo.createSerializer(config)` on the 
`TypeInformation` object.
 The `config` parameter is of type `ExecutionConfig` and holds the information 
about the program's registered
 custom serializers. Where ever possibly, try to pass the programs proper 
ExecutionConfig. You can usually
-obtain it from `DataStream` via calling `getExecutionConfig()`. Inside 
functions (like `MapFunction`), you can
-get it by making the function a [Rich Function]() and calling 
`getRuntimeContext().getExecutionConfig()`.
+obtain it from `DataStream` via calling `getExecutionConfig()`. 
+
+The second is to use getRuntimeContext().createSerializer(typeInfo) within a 
function. Inside functions 
+(like `MapFunction`), you can get it by making the function a [Rich 
Function]() and calling 
+`getRuntimeContext().createSerializer(typeInfo)`.
 
 
 
diff --git a/docs/content/docs/dev/datastream/application_parameters.md 
b/docs/content/docs/dev/datastream/application_parameters.md
index 43d3e31bf09..6583c5c8a71 100644
--- a/docs/content/docs/dev/datastream/application_parameters.md
+++ b/docs/content/docs/dev/datastream/application_parameters.md
@@ -132,8 +132,7 @@ public static final class Tokenizer extends 
RichFlatMapFunction> out) {
-   ParameterTool parameters = (ParameterTool)
-   getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+   ParameterTool parameters = 
ParameterTool.fromMap(getRuntimeContext().getGlobalJobParameters());
parameters.getRequired("input");
// .. do more ..
 ```
diff --git 
a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
 
b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
index 34d21b5e211..43062151733 100644
--- 
a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
+++ 
b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
@@ -356,12 +356,16 @@ You can still use the same method as in Java as a 
fallback.
 {{< /tab >}}
 {{< /tabs >}}
 
-To create a `TypeSerializer`, simply call `typeInfo.createSerializer(config)` 
on the `TypeInformation` object.
+There are two ways to create a TypeSerializer.
 
+The first is to simply call `typeInfo.createSerializer(config)` on the 
`TypeInformation` object.
 The `config` parameter is of type `ExecutionConfig` and holds the information 
about the program's registered
 custom serializers. Where ever possibly, try to pass the programs proper 
ExecutionConfig. You can usually

(flink-connector-pulsar) branch v4.1 updated: [hotfix][build] Development branch should use SNAPSHOT version

2023-12-11 Thread leonard
This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch v4.1
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/v4.1 by this push:
 new e1ee6b4  [hotfix][build] Development branch should use SNAPSHOT version
e1ee6b4 is described below

commit e1ee6b49d5d6222555241fac7fd8ba603d05bfe8
Author: Leonard Xu 
AuthorDate: Tue Dec 12 12:27:55 2023 +0800

[hotfix][build] Development branch should use SNAPSHOT version

This closes #66.

Co-authored-by: tison 
---
 flink-connector-pulsar-e2e-tests/pom.xml | 2 +-
 flink-connector-pulsar/pom.xml   | 2 +-
 flink-sql-connector-pulsar/pom.xml   | 2 +-
 pom.xml  | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-connector-pulsar-e2e-tests/pom.xml 
b/flink-connector-pulsar-e2e-tests/pom.xml
index 4643099..c4554b9 100644
--- a/flink-connector-pulsar-e2e-tests/pom.xml
+++ b/flink-connector-pulsar-e2e-tests/pom.xml
@@ -23,7 +23,7 @@ under the License.

org.apache.flink
flink-connector-pulsar-parent
-   4.1.0
+   4.1-SNAPSHOT

4.0.0
 
diff --git a/flink-connector-pulsar/pom.xml b/flink-connector-pulsar/pom.xml
index 709cbfd..9e23fa2 100644
--- a/flink-connector-pulsar/pom.xml
+++ b/flink-connector-pulsar/pom.xml
@@ -26,7 +26,7 @@ under the License.

org.apache.flink
flink-connector-pulsar-parent
-   4.1.0
+   4.1-SNAPSHOT

 
flink-connector-pulsar
diff --git a/flink-sql-connector-pulsar/pom.xml 
b/flink-sql-connector-pulsar/pom.xml
index 09babe6..d6242dd 100644
--- a/flink-sql-connector-pulsar/pom.xml
+++ b/flink-sql-connector-pulsar/pom.xml
@@ -26,7 +26,7 @@ under the License.

org.apache.flink
flink-connector-pulsar-parent
-   4.1.0
+   4.1-SNAPSHOT

 
flink-sql-connector-pulsar
diff --git a/pom.xml b/pom.xml
index 02fa673..a2e68f2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,7 +31,7 @@ under the License.
 
 org.apache.flink
 flink-connector-pulsar-parent
-4.1.0
+4.1-SNAPSHOT
 Flink : Connectors : Pulsar : Parent
 pom
 2022



(flink) branch master updated: [hotfix][test] Migrate JsonRowDeserializationSchemaTest/JsonRowSerializationSchemaTest to Junit5 and Assertj (#23882)

2023-12-11 Thread shengkai
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e454fc07192 [hotfix][test] Migrate 
JsonRowDeserializationSchemaTest/JsonRowSerializationSchemaTest to Junit5 and 
Assertj (#23882)
e454fc07192 is described below

commit e454fc0719276d2d54590989b754e88b9adb4455
Author: yunhong <337361...@qq.com>
AuthorDate: Tue Dec 12 10:19:08 2023 +0800

[hotfix][test] Migrate 
JsonRowDeserializationSchemaTest/JsonRowSerializationSchemaTest to Junit5 and 
Assertj (#23882)

Co-authored-by: zhengyunhong.zyh 
---
 .../json/JsonRowDeserializationSchemaTest.java | 99 --
 .../json/JsonRowSerializationSchemaTest.java   | 46 +-
 2 files changed, 78 insertions(+), 67 deletions(-)

diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
index 81e370c0fd3..4712768f291 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
@@ -26,10 +26,7 @@ import org.apache.flink.util.jackson.JacksonMapperFactory;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nullable;
 
@@ -44,19 +41,14 @@ import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static 
org.apache.flink.formats.utils.DeserializationSchemaMatcher.whenDeserializedWith;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertThat;
-import static org.junit.internal.matchers.ThrowableCauseMatcher.hasCause;
-import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link JsonRowDeserializationSchema}. */
 public class JsonRowDeserializationSchemaTest {
 
 private static final ObjectMapper OBJECT_MAPPER = 
JacksonMapperFactory.createObjectMapper();
 
-@Rule public ExpectedException thrown = ExpectedException.none();
-
 /** Tests simple deserialization using type information. */
 @Test
 public void testTypeInfoDeserialization() throws Exception {
@@ -135,7 +127,11 @@ public class JsonRowDeserializationSchemaTest {
 row.setField(9, map);
 row.setField(10, nestedMap);
 
-assertThat(serializedJson, 
whenDeserializedWith(deserializationSchema).equalsTo(row));
+assertThat(
+whenDeserializedWith(deserializationSchema)
+.equalsTo(row)
+.matches(serializedJson))
+.isTrue();
 }
 
 @Test
@@ -205,7 +201,11 @@ public class JsonRowDeserializationSchemaTest {
 nestedRow.setField(1, BigDecimal.valueOf(12));
 expected.setField(9, nestedRow);
 
-assertThat(serializedJson, 
whenDeserializedWith(deserializationSchema).equalsTo(expected));
+assertThat(
+whenDeserializedWith(deserializationSchema)
+.equalsTo(expected)
+.matches(serializedJson))
+.isTrue();
 }
 
 /** Tests deserialization with non-existing field name. */
@@ -223,45 +223,55 @@ public class JsonRowDeserializationSchemaTest {
 new 
JsonRowDeserializationSchema.Builder(rowTypeInformation).build();
 
 Row row = new Row(1);
-assertThat(serializedJson, 
whenDeserializedWith(deserializationSchema).equalsTo(row));
+assertThat(
+whenDeserializedWith(deserializationSchema)
+.equalsTo(row)
+.matches(serializedJson))
+.isTrue();
 
 deserializationSchema =
 new JsonRowDeserializationSchema.Builder(rowTypeInformation)
 .failOnMissingField()
 .build();
-
-assertThat(
-serializedJson,
-whenDeserializedWith(deserializationSchema)
-
.failsWithException(hasCause(instanceOf(IllegalStateException.class;
+final JsonRowDeserializationSchema errorDs = deserializationSchema;
+assertThatThrownBy(() -> 

(flink) branch master updated: [FLINK-33672][table-runtime] Use MapState.entries() instead of keys() and get() in over window (#23855)

2023-12-11 Thread shengkai
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 080119cca53 [FLINK-33672][table-runtime] Use MapState.entries() 
instead of keys() and get() in over window (#23855)
080119cca53 is described below

commit 080119cca53d9890257982b6a74a7d6f913253c2
Author: Zakelly 
AuthorDate: Tue Dec 12 10:14:04 2023 +0800

[FLINK-33672][table-runtime] Use MapState.entries() instead of keys() and 
get() in over window (#23855)
---
 .../operators/over/ProcTimeRangeBoundedPrecedingFunction.java  |  8 +---
 .../operators/over/RowTimeRangeBoundedPrecedingFunction.java   | 10 ++
 .../operators/over/RowTimeRowsBoundedPrecedingFunction.java|  6 --
 3 files changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
index 39405ec8a05..15c5f0e4141 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Process Function used for the aggregate in bounded proc-time OVER window.
@@ -197,13 +198,14 @@ public class ProcTimeRangeBoundedPrecedingFunction
 // when we find timestamps that are out of interest, we retrieve 
corresponding elements
 // and eliminate them. Multiple elements could have been received at 
the same timestamp
 // the removal of old elements happens only once per proctime as 
onTimer is called only once
-Iterator iter = inputState.keys().iterator();
+Iterator>> iter = 
inputState.entries().iterator();
 List markToRemove = new ArrayList();
 while (iter.hasNext()) {
-Long elementKey = iter.next();
+Map.Entry> element = iter.next();
+Long elementKey = element.getKey();
 if (elementKey < limit) {
 // element key outside of window. Retract values
-List elementsRemove = inputState.get(elementKey);
+List elementsRemove = element.getValue();
 if (elementsRemove != null) {
 int iRemove = 0;
 while (iRemove < elementsRemove.size()) {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java
index bf5a3d5ca7f..285b8de6f1b 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Process Function for RANGE clause event-time bounded OVER window.
@@ -227,12 +228,13 @@ public class RowTimeRangeBoundedPrecedingFunction
 List retractTsList = new ArrayList();
 
 // do retraction
-Iterator dataTimestampIt = inputState.keys().iterator();
-while (dataTimestampIt.hasNext()) {
-Long dataTs = dataTimestampIt.next();
+Iterator>> iter = 
inputState.entries().iterator();
+while (iter.hasNext()) {
+Map.Entry> data = iter.next();
+Long dataTs = data.getKey();
 Long offset = timestamp - dataTs;
 if (offset > precedingOffset) {
-List retractDataList = inputState.get(dataTs);
+List retractDataList = data.getValue();
 if (retractDataList != null) {
 dataListIndex = 0;
 while (dataListIndex < retractDataList.size()) {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRowsBoundedPrecedingFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRowsBoundedPrecedingFunction.java
index 74ffb428c2d..df897eef9a0 100644
--- 

(flink) branch master updated: [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan

2023-12-11 Thread ron
This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9cac80be18c [FLINK-33612][table-planner] Hybrid shuffle mode avoids 
unnecessary blocking edges in the plan
9cac80be18c is described below

commit 9cac80be18c6aff0cebdfe706327c1693822e884
Author: Yuxin Tan 
AuthorDate: Wed Nov 22 14:22:45 2023 +0800

[FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary 
blocking edges in the plan
---
 .../utils/InputPriorityConflictResolver.java   |   6 +-
 .../planner/utils/StreamExchangeModeUtils.java |   4 +
 .../physical/batch/BatchPhysicalExchange.scala |  10 +-
 .../utils/InputPriorityConflictResolverTest.java   |  72 +++-
 .../plan/optimize/ShuffleModePlanOptimizeTest.java | 127 ++
 .../planner/utils/StreamExchangeModeUtilsTest.java |  11 +
 .../plan/optimize/ShuffleModePlanOptimizeTest.xml  | 445 +
 7 files changed, 655 insertions(+), 20 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
index 144b527c520..0479993b5bd 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
@@ -199,7 +199,11 @@ public class InputPriorityConflictResolver extends 
InputPriorityGraphGenerator {
 }
 
 private InputProperty.DamBehavior getDamBehavior() {
-if (getBatchStreamExchangeMode(tableConfig, exchangeMode) == 
StreamExchangeMode.BATCH) {
+StreamExchangeMode streamExchangeMode =
+getBatchStreamExchangeMode(tableConfig, exchangeMode);
+if (streamExchangeMode == StreamExchangeMode.BATCH
+|| streamExchangeMode == StreamExchangeMode.HYBRID_FULL
+|| streamExchangeMode == StreamExchangeMode.HYBRID_SELECTIVE) {
 return InputProperty.DamBehavior.BLOCKING;
 } else {
 return InputProperty.DamBehavior.PIPELINED;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java
index bd722a07726..312e59bee34 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java
@@ -52,6 +52,10 @@ public class StreamExchangeModeUtils {
 final BatchShuffleMode shuffleMode = 
config.get(ExecutionOptions.BATCH_SHUFFLE_MODE);
 if (shuffleMode == BatchShuffleMode.ALL_EXCHANGES_BLOCKING) {
 return StreamExchangeMode.BATCH;
+} else if (shuffleMode == BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL) {
+return StreamExchangeMode.HYBRID_FULL;
+} else if (shuffleMode == 
BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE) {
+return StreamExchangeMode.HYBRID_SELECTIVE;
 }
 
 return StreamExchangeMode.UNDEFINED;
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
index 3fbddc658e1..0a57f8087f8 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
@@ -60,10 +60,12 @@ class BatchPhysicalExchange(
 val exchangeMode =
   getBatchStreamExchangeMode(unwrapTableConfig(this), 
StreamExchangeMode.UNDEFINED)
 
-val damBehavior = if (exchangeMode eq StreamExchangeMode.BATCH) {
-  InputProperty.DamBehavior.BLOCKING
-} else {
-  InputProperty.DamBehavior.PIPELINED
+val damBehavior = exchangeMode match {
+  case StreamExchangeMode.BATCH | StreamExchangeMode.HYBRID_FULL |
+  StreamExchangeMode.HYBRID_SELECTIVE =>
+InputProperty.DamBehavior.BLOCKING
+  case _ =>
+InputProperty.DamBehavior.PIPELINED
 }
 
 InputProperty.builder
diff --git 

(flink-connector-pulsar) branch dependabot/maven/org.apache.commons-commons-compress-1.24.0 deleted (was ea918c8)

2023-12-11 Thread tison
This is an automated email from the ASF dual-hosted git repository.

tison pushed a change to branch 
dependabot/maven/org.apache.commons-commons-compress-1.24.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


 was ea918c8  Bump org.apache.commons:commons-compress from 1.22 to 1.24.0

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



(flink-connector-pulsar) branch main updated: Bump org.apache.commons:commons-compress from 1.22 to 1.24.0 (#65)

2023-12-11 Thread tison
This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
 new 74a4696  Bump org.apache.commons:commons-compress from 1.22 to 1.24.0 
(#65)
74a4696 is described below

commit 74a46963fd8eb05de9dff4c6b302f37d1a3faf0e
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 12 09:20:35 2023 +0800

Bump org.apache.commons:commons-compress from 1.22 to 1.24.0 (#65)

Bumps org.apache.commons:commons-compress from 1.22 to 1.24.0.

---
updated-dependencies:
- dependency-name: org.apache.commons:commons-compress
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] 
Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 2292706..768a891 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,7 +74,7 @@ under the License.
 1.7.36
 2.17.2
 2.3.1
-1.22
+1.24.0
 1.12.20
 2.24.0
 3.3



(flink-connector-pulsar) branch main updated (d958d15 -> 29f1e92)

2023-12-11 Thread tison
This is an automated email from the ASF dual-hosted git repository.

tison pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


from d958d15  Update version to 4.1-SNAPSHOT
 add 29f1e92  [hotfix][build] Bump version to 4.2-SNAPSHOT (#67)

No new revisions were added by this update.

Summary of changes:
 flink-connector-pulsar-e2e-tests/pom.xml | 2 +-
 flink-connector-pulsar/pom.xml   | 2 +-
 flink-sql-connector-pulsar/pom.xml   | 2 +-
 pom.xml  | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)



(flink-connector-kafka) branch main updated (c38a0406 -> 825052f5)

2023-12-11 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


from c38a0406 [FLINK-33559] Externalize Kafka Python connector code
 add 825052f5 [FLINK-33361][connectors/kafka] Add Java 17 compatibility to 
Flink Kafka connector

No new revisions were added by this update.

Summary of changes:
 .github/workflows/push_pr.yml | 6 +-
 flink-connector-kafka/pom.xml | 8 
 pom.xml   | 7 +++
 3 files changed, 20 insertions(+), 1 deletion(-)



(flink) branch master updated: [FLINK-28215][Buildsystem] Update Maven Surefire plugin to 3.3.2. This closes #22502

2023-12-11 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new ea4cdc28651 [FLINK-28215][Buildsystem] Update Maven Surefire plugin to 
3.3.2. This closes #22502
ea4cdc28651 is described below

commit ea4cdc28651ad91defd4fc7b371a1f520ea7a262
Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com>
AuthorDate: Mon Dec 11 16:36:44 2023 +0100

[FLINK-28215][Buildsystem] Update Maven Surefire plugin to 3.3.2. This 
closes #22502
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 18c8b4b3f49..0304712e2ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1714,7 +1714,7 @@ under the License.

org.apache.maven.plugins
maven-surefire-plugin
-   3.0.0-M5
+   3.2.2






(flink) 05/06: [FLINK-33709][checkpointing] Report CheckpointStats as Spans

2023-12-11 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f5039a8824e9f78066fa56e2050daa18f69cc715
Author: Piotr Nowojski 
AuthorDate: Tue Sep 26 17:44:46 2023 +0200

[FLINK-33709][checkpointing] Report CheckpointStats as Spans
---
 docs/content.zh/docs/ops/traces.md | 57 ++
 docs/content/docs/ops/traces.md| 57 ++
 .../runtime/checkpoint/CheckpointStatsTracker.java | 11 +
 .../checkpoint/CheckpointStatsTrackerTest.java | 40 +++
 4 files changed, 165 insertions(+)

diff --git a/docs/content.zh/docs/ops/traces.md 
b/docs/content.zh/docs/ops/traces.md
index 427ab8d6cc3..eff102bb58a 100644
--- a/docs/content.zh/docs/ops/traces.md
+++ b/docs/content.zh/docs/ops/traces.md
@@ -68,4 +68,61 @@ Currently reporting Spans from Python is not supported.
 
 For information on how to set up Flink's trace reporters please take a look at 
the [trace reporters documentation]({{< ref "docs/deployment/trace_reporters" 
>}}).
 
+## System traces
+
+Flink reports traces listed below.
+
+The tables below generally feature 5 columns:
+
+* The "Scope" column describes what is that trace reported scope.
+
+* The "Name" column describes the name of the reported trace.
+
+* The "Attributes" column lists the names of all attributes that are reported 
with the given trace.
+
+* The "Description" column provides information as to what a given attribute 
is reporting.
+
+### Checkpointing
+
+Flink reports a single span trace for the whole checkpoint once checkpoint 
reaches a terminal state: COMPLETED or FAILED.
+
+
+  
+
+  Scope
+  Name
+  Attributes
+  Description
+
+  
+  
+
+  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
+  Checkpoint
+  startTs
+  Timestamp when the checkpoint has started.
+
+
+  endTs
+  Timestamp when the checkpoint has finished.
+
+
+  checkpointId
+  Id of the checkpoint.
+
+
+  checkpointedSize
+  Size in bytes of checkpointed state during this checkpoint. Might be 
smaller than fullSize if incremental checkpoints are used.
+
+
+  fullSize
+  Full size in bytes of the referenced state by this checkpoint. Might 
be larger than checkpointSize if incremental checkpoints are used.
+
+
+  checkpointStatus
+  What was the state of this checkpoint: FAILED or COMPLETED.
+
+  
+
+
 {{< top >}}
diff --git a/docs/content/docs/ops/traces.md b/docs/content/docs/ops/traces.md
index 427ab8d6cc3..eff102bb58a 100644
--- a/docs/content/docs/ops/traces.md
+++ b/docs/content/docs/ops/traces.md
@@ -68,4 +68,61 @@ Currently reporting Spans from Python is not supported.
 
 For information on how to set up Flink's trace reporters please take a look at 
the [trace reporters documentation]({{< ref "docs/deployment/trace_reporters" 
>}}).
 
+## System traces
+
+Flink reports traces listed below.
+
+The tables below generally feature 5 columns:
+
+* The "Scope" column describes what is that trace reported scope.
+
+* The "Name" column describes the name of the reported trace.
+
+* The "Attributes" column lists the names of all attributes that are reported 
with the given trace.
+
+* The "Description" column provides information as to what a given attribute 
is reporting.
+
+### Checkpointing
+
+Flink reports a single span trace for the whole checkpoint once checkpoint 
reaches a terminal state: COMPLETED or FAILED.
+
+
+  
+
+  Scope
+  Name
+  Attributes
+  Description
+
+  
+  
+
+  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
+  Checkpoint
+  startTs
+  Timestamp when the checkpoint has started.
+
+
+  endTs
+  Timestamp when the checkpoint has finished.
+
+
+  checkpointId
+  Id of the checkpoint.
+
+
+  checkpointedSize
+  Size in bytes of checkpointed state during this checkpoint. Might be 
smaller than fullSize if incremental checkpoints are used.
+
+
+  fullSize
+  Full size in bytes of the referenced state by this checkpoint. Might 
be larger than checkpointSize if incremental checkpoints are used.
+
+
+  checkpointStatus
+  What was the state of this checkpoint: FAILED or COMPLETED.
+
+  
+
+
 {{< top >}}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index f868f3fb4ba..47a6c3cf12c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 

(flink) 04/06: [FLINK-33708][docs] Document Span and TraceReporter concepts

2023-12-11 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ddfb5a3a0292c29a5d8df3e3ab408893ab363e63
Author: Piotr Nowojski 
AuthorDate: Tue Oct 24 17:36:15 2023 +0200

[FLINK-33708][docs] Document Span and TraceReporter concepts
---
 docs/content.zh/docs/deployment/config.md  |  9 +++
 docs/content.zh/docs/deployment/security/_index.md |  4 +-
 docs/content.zh/docs/deployment/trace_reporters.md | 75 ++
 docs/content.zh/docs/ops/metrics.md|  2 +-
 docs/content.zh/docs/ops/traces.md | 71 
 docs/content/docs/deployment/config.md |  9 +++
 docs/content/docs/deployment/security/_index.md|  4 +-
 docs/content/docs/deployment/trace_reporters.md| 75 ++
 docs/content/docs/ops/metrics.md   |  2 +-
 docs/content/docs/ops/traces.md| 71 
 .../shortcodes/generated/trace_configuration.html  | 36 +++
 .../generated/trace_reporters_section.html | 30 +
 12 files changed, 382 insertions(+), 6 deletions(-)

diff --git a/docs/content.zh/docs/deployment/config.md 
b/docs/content.zh/docs/deployment/config.md
index b68d87afecf..34d04f733e5 100644
--- a/docs/content.zh/docs/deployment/config.md
+++ b/docs/content.zh/docs/deployment/config.md
@@ -287,6 +287,15 @@ Enabling RocksDB's native metrics may cause degraded 
performance and should be s
 
 
 
+# Traces
+
+Please refer to the [tracing system documentation]({{< ref "docs/ops/traces" 
>}}) for background on Flink's tracing infrastructure.
+
+{{< generated/trace_configuration >}}
+
+
+
+
 # History Server
 
 The history server keeps the information of completed jobs (graphs, runtimes, 
statistics). To enable it, you have to enable "job archiving" in the JobManager 
(`jobmanager.archive.fs.dir`).
diff --git a/docs/content.zh/docs/deployment/security/_index.md 
b/docs/content.zh/docs/deployment/security/_index.md
index d098fefc680..8a2a87a398e 100644
--- a/docs/content.zh/docs/deployment/security/_index.md
+++ b/docs/content.zh/docs/deployment/security/_index.md
@@ -1,7 +1,7 @@
 ---
 title: Security
 bookCollapseSection: true
-weight: 8
+weight: 9
 ---
 
\ No newline at end of file
+-->
diff --git a/docs/content.zh/docs/deployment/trace_reporters.md 
b/docs/content.zh/docs/deployment/trace_reporters.md
new file mode 100644
index 000..eaab7d4ce3d
--- /dev/null
+++ b/docs/content.zh/docs/deployment/trace_reporters.md
@@ -0,0 +1,75 @@
+---
+title: "Trace Reporters"
+weight: 7
+type: docs
+aliases:
+  - /deployment/trace_reporters.html
+---
+
+
+# Trace Reporters
+
+Flink allows reporting traces to external systems.
+For more information about Flink's tracing system go to the [tracing system 
documentation]({{< ref "docs/ops/traces" >}}).
+
+Traces can be exposed to an external system by configuring one or several 
reporters in `conf/flink-conf.yaml`. These
+reporters will be instantiated on each job and task manager when they are 
started.
+
+Below is a list of parameters that are generally applicable to all reporters.
+All properties are configured by setting 
`traces.reporter..` in the configuration.
+Reporters may additionally offer implementation-specific parameters, which are 
documented in the respective reporter's section. 
+
+{{< include_reporter_config 
"layouts/shortcodes/generated/trace_reporters_section.html" >}}
+
+All reporter configurations must contain the `factory.class` property.
+
+Example reporter configuration that specifies multiple reporters:
+
+```yaml
+traces.reporters: otel,my_other_otel
+
+traces.reporter.otel.factory.class: 
org.apache.flink.common.metrics.OpenTelemetryTraceReporterFactory
+traces.reporter.otel.exporter.endpoint: http://127.0.0.1:1337
+traces.reporter.otel.scope.variables.additional: 
region:eu-west-1,environment:local-pnowojski-test,flink_runtime:1.17.1
+
+traces.reporter.my_other_otel.factory.class: 
org.apache.flink.common.metrics.OpenTelemetryTraceReporterFactory
+traces.reporter.my_other_otel.exporter.endpoint: http://196.168.0.1:31337
+```
+
+**Important:** The jar containing the reporter must be accessible when Flink 
is started.
+ Reporters are loaded as [plugins]({{< ref 
"docs/deployment/filesystems/plugins" >}}).
+ All reporters documented on this page are available by default.
+
+You can write your own `Reporter` by implementing the 
`org.apache.flink.traces.reporter.TraceReporter` and 
`org.apache.flink.traces.reporter.TraceReporterFactory` interfaces.
+Be careful that all the method must not block for a significant amount of 
time, and any reporter needing more time should instead run the operation 
asynchronously.
+
+## Reporters
+
+The following sections list the supported reporters.
+
+### Slf4j
+ (org.apache.flink.traces.slf4j.Slf4jTraceReporter)
+
+Example configuration:
+

(flink) 06/06: [hotfix][docs] Make lvl 1 headers visible in the docs TOC

2023-12-11 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 589ea2e752495fd724bb85541e6566be907db0ba
Author: Piotr Nowojski 
AuthorDate: Thu Dec 7 11:01:33 2023 +0100

[hotfix][docs] Make lvl 1 headers visible in the docs TOC

https://github.com/apache/flink/pull/23845#discussion_r1417918412

There are many pages that have more than one lvl 1 header.
In that case, our current TOC doesn't make sense and is mostly
unreadable, as by default TOC start from lvl 2 headers only.

For example:
docs/content/docs/deployment/config.md
---
 docs/config.toml | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/docs/config.toml b/docs/config.toml
index 8969a081bd5..3776b6f895d 100644
--- a/docs/config.toml
+++ b/docs/config.toml
@@ -100,6 +100,8 @@ pygmentsUseClasses = true
 [markup]
 [markup.goldmark.renderer]
   unsafe = true
+[markup.tableOfContents]
+  startLevel = 1
 
 [languages]
 [languages.en]



(flink) branch master updated (548e4b5188b -> 589ea2e7524)

2023-12-11 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 548e4b5188b [FLINK-25565][Formats][Parquet] timestamp int64 option 
tidy up (#23900)
 new 066f34fde30 [hotfix][metrics] Reduce indentation level in #addMetric
 new f9fa5498514 [FLINK-33708][metrics] Add Span and TraceReporter
 new 7db2ecad165 [FLINK-33696][metrics] Add Slf4jTraceReporter
 new ddfb5a3a029 [FLINK-33708][docs] Document Span and TraceReporter 
concepts
 new f5039a8824e [FLINK-33709][checkpointing] Report CheckpointStats as 
Spans
 new 589ea2e7524 [hotfix][docs] Make lvl 1 headers visible in the docs TOC

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/config.toml   |   2 +
 docs/content.zh/docs/deployment/config.md  |   9 +
 docs/content.zh/docs/deployment/security/_index.md |   4 +-
 docs/content.zh/docs/deployment/trace_reporters.md |  75 +
 docs/content.zh/docs/ops/metrics.md|   2 +-
 docs/content.zh/docs/ops/traces.md | 128 +
 docs/content/docs/deployment/config.md |   9 +
 docs/content/docs/deployment/security/_index.md|   4 +-
 docs/content/docs/deployment/trace_reporters.md|  75 +
 docs/content/docs/ops/metrics.md   |   2 +-
 docs/content/docs/ops/traces.md| 128 +
 .../shortcodes/generated/trace_configuration.html  |  36 +++
 .../generated/trace_reporters_section.html |  30 ++
 .../flink/annotation/docs/Documentation.java   |   2 +
 .../flink/configuration/ConfigConstants.java   |   9 +-
 .../apache/flink/configuration/TraceOptions.java   | 108 +++
 .../java/org/apache/flink/metrics/MetricGroup.java |   9 +
 .../flink/metrics/reporter/MetricReporter.java |   4 +-
 .../metrics/reporter/MetricReporterFactory.java|   8 +-
 .../java/org/apache/flink/traces/SimpleSpan.java   |  92 ++
 .../main/java/org/apache/flink/traces/Span.java|  52 
 .../java/org/apache/flink/traces/SpanBuilder.java  |  98 +++
 .../reporter/TraceReporter.java}   |  37 +--
 .../reporter/TraceReporterFactory.java}|  20 +-
 .../flink/traces/slf4j/Slf4jTraceReporter.java}|  38 +--
 .../traces/slf4j/Slf4jTraceReporterFactory.java|  32 +++
 ...ache.flink.traces.reporter.TraceReporterFactory |  16 ++
 .../runtime/checkpoint/CheckpointStatsTracker.java |  11 +
 .../runtime/entrypoint/ClusterEntrypoint.java  |   4 +-
 .../flink/runtime/metrics/MetricRegistry.java  |   5 +
 .../flink/runtime/metrics/MetricRegistryImpl.java  | 197 +
 .../flink/runtime/metrics/NoOpMetricRegistry.java  |   4 +
 .../flink/runtime/metrics/ReporterSetup.java   |  51 ++--
 .../flink/runtime/metrics/TraceReporterSetup.java  | 314 +
 .../metrics/groups/AbstractMetricGroup.java|  78 +++--
 .../runtime/metrics/groups/JobMetricGroup.java |   6 +
 .../runtime/metrics/groups/ProxyMetricGroup.java   |   6 +
 .../flink/runtime/minicluster/MiniCluster.java |   3 +
 .../runtime/taskexecutor/TaskManagerRunner.java|   4 +-
 .../checkpoint/CheckpointStatsTrackerTest.java |  40 +++
 .../metrics/util/TestingMetricRegistry.java|  25 +-
 .../state/RocksDBNativeMetricMonitorTest.java  |   4 +
 42 files changed, 1606 insertions(+), 175 deletions(-)
 create mode 100644 docs/content.zh/docs/deployment/trace_reporters.md
 create mode 100644 docs/content.zh/docs/ops/traces.md
 create mode 100644 docs/content/docs/deployment/trace_reporters.md
 create mode 100644 docs/content/docs/ops/traces.md
 create mode 100644 docs/layouts/shortcodes/generated/trace_configuration.html
 create mode 100644 
docs/layouts/shortcodes/generated/trace_reporters_section.html
 create mode 100644 
flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
 create mode 100644 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SimpleSpan.java
 create mode 100644 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/Span.java
 create mode 100644 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SpanBuilder.java
 copy 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/{metrics/reporter/MetricReporter.java
 => traces/reporter/TraceReporter.java} (59%)
 copy 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/{metrics/reporter/MetricReporterFactory.java
 => traces/reporter/TraceReporterFactory.java} (64%)
 copy 
flink-metrics/{flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporterFactory.java
 => 

(flink) 01/06: [hotfix][metrics] Reduce indentation level in #addMetric

2023-12-11 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 066f34fde304b90e4ee3f12627b84ec1dd636ffd
Author: Piotr Nowojski 
AuthorDate: Tue Sep 26 15:27:57 2023 +0200

[hotfix][metrics] Reduce indentation level in #addMetric
---
 .../metrics/groups/AbstractMetricGroup.java| 58 +++---
 1 file changed, 30 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 44eb8c2b251..59537604c77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -390,41 +390,43 @@ public abstract class AbstractMetricGroup> impl
 }
 // add the metric only if the group is still open
 synchronized (this) {
-if (!closed) {
-// immediately put without a 'contains' check to optimize the 
common case (no
-// collision)
-// collisions are resolved later
-Metric prior = metrics.put(name, metric);
-
-// check for collisions with other metric names
-if (prior == null) {
-// no other metric with this name yet
-
-if (groups.containsKey(name)) {
-// we warn here, rather than failing, because metrics 
are tools that should
-// not fail the
-// program when used incorrectly
-LOG.warn(
-"Name collision: Adding a metric with the same 
name as a metric subgroup: '"
-+ name
-+ "'. Metric might not get properly 
reported. "
-+ Arrays.toString(scopeComponents));
-}
+if (closed) {
+return;
+}
 
-registry.register(metric, name, this);
-} else {
-// we had a collision. put back the original value
-metrics.put(name, prior);
+// immediately put without a 'contains' check to optimize the 
common case (no
+// collision)
+// collisions are resolved later
+Metric prior = metrics.put(name, metric);
+
+// check for collisions with other metric names
+if (prior == null) {
+// no other metric with this name yet
 
-// we warn here, rather than failing, because metrics are 
tools that should not
-// fail the
+if (groups.containsKey(name)) {
+// we warn here, rather than failing, because metrics are 
tools that should
+// not fail the
 // program when used incorrectly
 LOG.warn(
-"Name collision: Group already contains a Metric 
with the name '"
+"Name collision: Adding a metric with the same 
name as a metric subgroup: '"
 + name
-+ "'. Metric will not be reported."
++ "'. Metric might not get properly 
reported. "
 + Arrays.toString(scopeComponents));
 }
+
+registry.register(metric, name, this);
+} else {
+// we had a collision. put back the original value
+metrics.put(name, prior);
+
+// we warn here, rather than failing, because metrics are 
tools that should not
+// fail the
+// program when used incorrectly
+LOG.warn(
+"Name collision: Group already contains a Metric with 
the name '"
++ name
++ "'. Metric will not be reported."
++ Arrays.toString(scopeComponents));
 }
 }
 }



(flink) 03/06: [FLINK-33696][metrics] Add Slf4jTraceReporter

2023-12-11 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7db2ecad165009312dfbd35c15f42b61d98ad2fe
Author: Piotr Nowojski 
AuthorDate: Wed Dec 6 16:22:10 2023 +0100

[FLINK-33696][metrics] Add Slf4jTraceReporter
---
 .../flink/traces/slf4j/Slf4jTraceReporter.java | 45 ++
 .../traces/slf4j/Slf4jTraceReporterFactory.java| 32 +++
 ...ache.flink.traces.reporter.TraceReporterFactory | 16 
 3 files changed, 93 insertions(+)

diff --git 
a/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/traces/slf4j/Slf4jTraceReporter.java
 
b/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/traces/slf4j/Slf4jTraceReporter.java
new file mode 100644
index 000..d867a58cbbd
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/traces/slf4j/Slf4jTraceReporter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.traces.slf4j;
+
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.traces.Span;
+import org.apache.flink.traces.reporter.TraceReporter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TraceReporter} that exports {@link org.apache.flink.traces.Span 
Spans} via SLF4J {@link
+ * Logger}.
+ */
+public class Slf4jTraceReporter implements TraceReporter {
+private static final Logger LOG = 
LoggerFactory.getLogger(Slf4jTraceReporter.class);
+
+@Override
+public void notifyOfAddedSpan(Span span) {
+LOG.info("Reported span: {}", span);
+}
+
+@Override
+public void open(MetricConfig metricConfig) {}
+
+@Override
+public void close() {}
+}
diff --git 
a/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/traces/slf4j/Slf4jTraceReporterFactory.java
 
b/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/traces/slf4j/Slf4jTraceReporterFactory.java
new file mode 100644
index 000..dd2a7ddb15b
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/traces/slf4j/Slf4jTraceReporterFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.traces.slf4j;
+
+import org.apache.flink.traces.reporter.TraceReporter;
+import org.apache.flink.traces.reporter.TraceReporterFactory;
+
+import java.util.Properties;
+
+/** {@link TraceReporterFactory} for {@link Slf4jTraceReporter}. */
+public class Slf4jTraceReporterFactory implements TraceReporterFactory {
+
+@Override
+public TraceReporter createTraceReporter(Properties properties) {
+return new Slf4jTraceReporter();
+}
+}
diff --git 
a/flink-metrics/flink-metrics-slf4j/src/main/resources/META-INF/services/org.apache.flink.traces.reporter.TraceReporterFactory
 
b/flink-metrics/flink-metrics-slf4j/src/main/resources/META-INF/services/org.apache.flink.traces.reporter.TraceReporterFactory
new file mode 100644
index 000..5716cdd1da3
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-slf4j/src/main/resources/META-INF/services/org.apache.flink.traces.reporter.TraceReporterFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding 

(flink) 02/06: [FLINK-33708][metrics] Add Span and TraceReporter

2023-12-11 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9fa54985148b313aa8964c2233d7714e8bc7d45
Author: Piotr Nowojski 
AuthorDate: Tue Sep 26 17:40:29 2023 +0200

[FLINK-33708][metrics] Add Span and TraceReporter
---
 .../flink/annotation/docs/Documentation.java   |   2 +
 .../flink/configuration/ConfigConstants.java   |   9 +-
 .../apache/flink/configuration/TraceOptions.java   | 108 +++
 .../java/org/apache/flink/metrics/MetricGroup.java |   9 +
 .../flink/metrics/reporter/MetricReporter.java |   4 +-
 .../metrics/reporter/MetricReporterFactory.java|   8 +-
 .../java/org/apache/flink/traces/SimpleSpan.java   |  92 ++
 .../main/java/org/apache/flink/traces/Span.java|  52 
 .../java/org/apache/flink/traces/SpanBuilder.java  |  98 +++
 .../reporter/TraceReporter.java}   |  37 +--
 .../reporter/TraceReporterFactory.java}|  20 +-
 .../runtime/entrypoint/ClusterEntrypoint.java  |   4 +-
 .../flink/runtime/metrics/MetricRegistry.java  |   5 +
 .../flink/runtime/metrics/MetricRegistryImpl.java  | 197 +
 .../flink/runtime/metrics/NoOpMetricRegistry.java  |   4 +
 .../flink/runtime/metrics/ReporterSetup.java   |  51 ++--
 .../flink/runtime/metrics/TraceReporterSetup.java  | 314 +
 .../metrics/groups/AbstractMetricGroup.java|  20 ++
 .../runtime/metrics/groups/JobMetricGroup.java |   6 +
 .../runtime/metrics/groups/ProxyMetricGroup.java   |   6 +
 .../flink/runtime/minicluster/MiniCluster.java |   3 +
 .../runtime/taskexecutor/TaskManagerRunner.java|   4 +-
 .../metrics/util/TestingMetricRegistry.java|  25 +-
 .../state/RocksDBNativeMetricMonitorTest.java  |   4 +
 24 files changed, 959 insertions(+), 123 deletions(-)

diff --git 
a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
 
b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
index 7bab129bcba..466f09ba4ce 100644
--- 
a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
+++ 
b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
@@ -106,6 +106,8 @@ public final class Documentation {
 
 public static final String METRIC_REPORTERS = "metric_reporters";
 
+public static final String TRACE_REPORTERS = "trace_reporters";
+
 private Sections() {}
 }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index a4968db4550..37e081e47cf 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1108,10 +1108,15 @@ public final class ConfigConstants {
 @Deprecated public static final String METRICS_REPORTERS_LIST = 
"metrics.reporters";
 
 /**
- * The prefix for per-reporter configs. Has to be combined with a reporter 
name and the configs
- * mentioned below.
+ * The prefix for per-metric reporter configs. Has to be combined with a 
reporter name and the
+ * configs mentioned below.
  */
 public static final String METRICS_REPORTER_PREFIX = "metrics.reporter.";
+/**
+ * The prefix for per-trace reporter configs. Has to be combined with a 
reporter name and the
+ * configs mentioned below.
+ */
+public static final String TRACES_REPORTER_PREFIX = "traces.reporter.";
 
 /** @deprecated use {@link MetricOptions#REPORTER_CLASS} */
 @Deprecated public static final String METRICS_REPORTER_CLASS_SUFFIX = 
"class";
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
new file mode 100644
index 000..1aee746e210
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 

(flink) branch master updated: [FLINK-25565][Formats][Parquet] timestamp int64 option tidy up (#23900)

2023-12-11 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 548e4b5188b [FLINK-25565][Formats][Parquet] timestamp int64 option 
tidy up (#23900)
548e4b5188b is described below

commit 548e4b5188bb3f092206182d779a909756408660
Author: Thomas Weise 
AuthorDate: Mon Dec 11 08:05:53 2023 -0500

[FLINK-25565][Formats][Parquet] timestamp int64 option tidy up (#23900)
---
 docs/content/docs/connectors/table/formats/parquet.md   | 5 +++--
 .../flink/formats/parquet/vector/reader/TimestampColumnReader.java  | 6 +++---
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/docs/content/docs/connectors/table/formats/parquet.md 
b/docs/content/docs/connectors/table/formats/parquet.md
index 75c524f238f..0e53f4b919e 100644
--- a/docs/content/docs/connectors/table/formats/parquet.md
+++ b/docs/content/docs/connectors/table/formats/parquet.md
@@ -107,9 +107,10 @@ For example, you can configure `parquet.compression=GZIP` 
to enable gzip compres
 Data Type Mapping
 
 
-Currently, Parquet format type mapping is compatible with Apache Hive, but 
different with Apache Spark:
+Currently, Parquet format type mapping is compatible with Apache Hive, but by 
default not with Apache Spark:
 
 - Timestamp: mapping timestamp type to int96 whatever the precision is.
+- Spark compatibility requires int64 via config option `timestamp.time.unit` 
(see above).
 - Decimal: mapping decimal type to fixed length byte array according to the 
precision.
 
 The following table lists the type mapping from Flink type to Parquet type.
@@ -185,7 +186,7 @@ The following table lists the type mapping from Flink type 
to Parquet type.
 
 
   TIMESTAMP
-  INT96
+  INT96 (or INT64)
   
 
 
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
index aa544f4e91c..7a36edc573b 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
@@ -159,8 +159,8 @@ public class TimestampColumnReader extends 
AbstractColumnReader

(flink-kubernetes-operator) branch main updated: [FLINK-33763] Support savepoint redeployment through a nonce

2023-12-11 Thread gyfora
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 9e8dcd97 [FLINK-33763] Support savepoint redeployment through a nonce
9e8dcd97 is described below

commit 9e8dcd97cd07d4b769997ce40e43047301886c41
Author: Gyula Fora 
AuthorDate: Mon Dec 11 12:42:45 2023 +0100

[FLINK-33763] Support savepoint redeployment through a nonce
---
 .../content/docs/custom-resource/job-management.md | 33 +---
 docs/content/docs/custom-resource/reference.md | 11 +--
 .../kubernetes/operator/api/diff/DiffType.java | 18 -
 .../kubernetes/operator/api/diff/SpecDiff.java |  2 +
 .../operator/api/spec/AbstractFlinkSpec.java   |  3 +-
 .../kubernetes/operator/api/spec/JobSpec.java  | 16 +++-
 .../AbstractFlinkResourceReconciler.java   | 15 +++-
 .../deployment/AbstractJobReconciler.java  | 43 +-
 .../reconciler/deployment/SessionReconciler.java   |  2 +
 .../operator/reconciler/diff/DiffResult.java   | 15 +---
 .../reconciler/diff/ReflectiveDiffBuilder.java | 12 +--
 .../operator/validation/DefaultValidator.java  |  8 ++
 .../deployment/ApplicationReconcilerTest.java  | 92 ++
 .../operator/reconciler/diff/SpecDiffTest.java | 59 ++
 .../operator/validation/DefaultValidatorTest.java  | 88 +
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |  2 +
 .../crds/flinksessionjobs.flink.apache.org-v1.yml  |  2 +
 17 files changed, 377 insertions(+), 44 deletions(-)

diff --git a/docs/content/docs/custom-resource/job-management.md 
b/docs/content/docs/custom-resource/job-management.md
index 67664069..7a75bb19 100644
--- a/docs/content/docs/custom-resource/job-management.md
+++ b/docs/content/docs/custom-resource/job-management.md
@@ -288,12 +288,34 @@ Rollback is currently only supported for 
`FlinkDeployments`.
 
 ## Manual Recovery
 
-There are cases when manual intervention is required from the user to recover 
a Flink application deployment.
+There are cases when manual intervention is required from the user to recover 
a Flink application deployment or to restore to a user specified state.
 
 In most of these situations the main reason for this is that the deployment 
got into a state where the operator cannot determine the health of the 
application or the latest checkpoint information to be used for recovery.
 While these cases are not common, we need to be prepared to handle them.
 
-Fortunately almost any issue can be recovered by the user manually by using 
the following steps:
+Users have two options to restore a job from a target savepoint / checkpoint
+
+### Redeploy using the savepointRedeployNonce
+
+It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource 
from a target savepoint by using the combination of `savepointRedeployNonce` 
and `initialSavepointPath` in the job spec:
+
+```yaml
+ job:
+   initialSavepointPath: file://redeploy-target-savepoint
+   # If not set previously, set to 1, otherwise increment, e.g. 2
+   savepointRedeployNonce: 1
+```
+
+When changing the `savepointRedeployNonce` the operator will redeploy the job 
to the savepoint defined in the `initialSavepointPath`. The savepoint path must 
not be empty. 
+
+{{< hint warning >}}
+Rollbacks are not supported after redeployments.
+{{< /hint >}}
+
+### Delete and recreate the custom resource 
+ 
+Alternatively you can completely delete and recreate the custom resources to 
solve almost any issues. This will fully reset the status information to start 
from a clean slate.
+However, this also means that savepoint history is lost and the operator won't 
clean up past periodic savepoints taken before the deletion.
 
  1. Locate the latest checkpoint/savepoint metafile in your configured 
checkpoint/savepoint directory.
  2. Delete the `FlinkDeployment` resource for your application
@@ -303,10 +325,3 @@ Fortunately almost any issue can be recovered by the user 
manually by using the
 
 These steps ensure that the operator will start completely fresh from the user 
defined savepoint path and can hopefully fully recover.
 Keep an eye on your job to see what could have cause the problem in the first 
place.
-
-{{< hint info >}}
-The main idea behind the recovery process is that the user needs to manually 
override the target checkpoint/savepoint location because it is not known to 
the operator.
-The only way to do this is to delete the previous deployment resource fully, 
and recreate it with `initialSavepointPath` set.
-
-The `initialSavepointPath` setting only takes effect on the first deployment 
and the operator takes over checkpoint management after that.
-{{< /hint >}}
diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index 4fc1ea25..dfe6e0d7 

(flink-kubernetes-operator) branch main updated: [FLINK-33770] Promote deprecated autoscaler keys to fallback keys (#725)

2023-12-11 Thread mxm
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new ef250a0c [FLINK-33770] Promote deprecated autoscaler keys to fallback 
keys (#725)
ef250a0c is described below

commit ef250a0ccc4429b2573c424ff5b35cad9134dfb0
Author: Maximilian Michels 
AuthorDate: Mon Dec 11 10:31:04 2023 +0100

[FLINK-33770] Promote deprecated autoscaler keys to fallback keys (#725)

We moved all autoscaler configuration from 
`kubernetes.operator.job.autoscaler.` to `job.autoscaler.`

With the latest release, the logs are full with logs like this:

```
level:  WARN
logger:  org.apache.flink.configuration.Configuration
message:  Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.target.utilization' instead of proper key 
'job.autoscaler.target.utilization'
```

The reason is that the configuration is loaded for every reconciliation.

This configuration is already widely adopted across hundreds of pipelines. I
propose to remove the deprecation from the config keys and make them 
"fallback"
keys instead which removes the deprecation warning.
---
 .../flink/autoscaler/config/AutoScalerOptions.java | 70 ++--
 .../autoscaler/config/AutoScalerOptionsTest.java   | 77 ++
 2 files changed, 111 insertions(+), 36 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index 1436c91e..d8db0fed 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -27,11 +27,11 @@ import java.util.List;
 /** Config options related to the autoscaler module. */
 public class AutoScalerOptions {
 
-public static final String DEPRECATED_K8S_OP_CONF_PREFIX = 
"kubernetes.operator.";
+public static final String OLD_K8S_OP_CONF_PREFIX = "kubernetes.operator.";
 public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler.";
 
-private static String deprecatedOperatorConfigKey(String key) {
-return DEPRECATED_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key;
+private static String oldOperatorConfigKey(String key) {
+return OLD_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key;
 }
 
 private static String autoScalerConfigKey(String key) {
@@ -46,14 +46,14 @@ public class AutoScalerOptions {
 autoScalerConfig("enabled")
 .booleanType()
 .defaultValue(false)
-.withDeprecatedKeys(deprecatedOperatorConfigKey("enabled"))
+.withFallbackKeys(oldOperatorConfigKey("enabled"))
 .withDescription("Enable job autoscaler module.");
 
 public static final ConfigOption SCALING_ENABLED =
 autoScalerConfig("scaling.enabled")
 .booleanType()
 .defaultValue(true)
-
.withDeprecatedKeys(deprecatedOperatorConfigKey("scaling.enabled"))
+.withFallbackKeys(oldOperatorConfigKey("scaling.enabled"))
 .withDescription(
 "Enable vertex scaling execution by the 
autoscaler. If disabled, the autoscaler will only collect metrics and evaluate 
the suggested parallelism for each vertex but will not upgrade the jobs.");
 
@@ -61,14 +61,14 @@ public class AutoScalerOptions {
 autoScalerConfig("metrics.window")
 .durationType()
 .defaultValue(Duration.ofMinutes(15))
-
.withDeprecatedKeys(deprecatedOperatorConfigKey("metrics.window"))
+.withFallbackKeys(oldOperatorConfigKey("metrics.window"))
 .withDescription("Scaling metrics aggregation window 
size.");
 
 public static final ConfigOption STABILIZATION_INTERVAL =
 autoScalerConfig("stabilization.interval")
 .durationType()
 .defaultValue(Duration.ofMinutes(5))
-
.withDeprecatedKeys(deprecatedOperatorConfigKey("stabilization.interval"))
+
.withFallbackKeys(oldOperatorConfigKey("stabilization.interval"))
 .withDescription(
 "Stabilization period in which no new scaling will 
be executed");
 
@@ -76,14 +76,14 @@ public class AutoScalerOptions {
 autoScalerConfig("target.utilization")
 .doubleType()
 .defaultValue(0.7)
-
.withDeprecatedKeys(deprecatedOperatorConfigKey("target.utilization"))
+ 

(flink-connector-kafka) branch dependabot/maven/org.apache.commons-commons-compress-1.24.0 created (now f701ac25)

2023-12-11 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/org.apache.commons-commons-compress-1.24.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


  at f701ac25 Bump org.apache.commons:commons-compress from 1.22 to 1.24.0

No new revisions were added by this update.



(flink-connector-kafka) branch dependabot/maven/org.apache.zookeeper-zookeeper-3.7.2 created (now d66ab6e7)

2023-12-11 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/org.apache.zookeeper-zookeeper-3.7.2
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


  at d66ab6e7 Bump org.apache.zookeeper:zookeeper from 3.5.9 to 3.7.2

No new revisions were added by this update.



(flink-connector-kafka) branch main updated: [FLINK-33559] Externalize Kafka Python connector code

2023-12-11 Thread gaborgsomogyi
This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
 new c38a0406 [FLINK-33559] Externalize Kafka Python connector code
c38a0406 is described below

commit c38a0406104646a7ea8199bb64244310e344ce2b
Author: pvary 
AuthorDate: Mon Dec 11 09:40:50 2023 +0100

[FLINK-33559] Externalize Kafka Python connector code
---
 .github/workflows/push_pr.yml  |8 +
 .gitignore |   18 +-
 .../push_pr.yml => flink-python/MANIFEST.in|   16 +-
 flink-python/README.txt|   14 +
 flink-python/dev/integration_test.sh   |   54 +
 flink-python/pom.xml   |  222 
 .../pyflink/datastream/connectors/kafka.py | 1163 
 .../datastream/connectors/tests/test_kafka.py  |  669 +++
 flink-python/pyflink/pyflink_gateway_server.py |  288 +
 flink-python/setup.py  |  158 +++
 flink-python/tox.ini   |   51 +
 pom.xml|1 +
 12 files changed, 2648 insertions(+), 14 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index ddc50ab8..8f53a5bd 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -29,3 +29,11 @@ jobs:
 uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
 with:
   flink_version: ${{ matrix.flink }}
+
+  python_test:
+strategy:
+  matrix:
+flink: [ 1.17.1, 1.18.0 ]
+uses: 
apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
+with:
+  flink_version: ${{ matrix.flink }}
diff --git a/.gitignore b/.gitignore
index 5f0068cd..901fd674 100644
--- a/.gitignore
+++ b/.gitignore
@@ -35,4 +35,20 @@ out/
 tools/flink
 tools/flink-*
 tools/releasing/release
-tools/japicmp-output
\ No newline at end of file
+tools/japicmp-output
+
+# Generated file, do not store in git
+flink-python/pyflink/datastream/connectors/kafka_connector_version.py
+flink-python/apache_flink_connectors_kafka.egg-info/
+flink-python/.tox/
+flink-python/build
+flink-python/dist
+flink-python/dev/download
+flink-python/dev/.conda/
+flink-python/dev/log/
+flink-python/dev/.stage.txt
+flink-python/dev/install_command.sh
+flink-python/dev/lint-python.sh
+flink-python/dev/build-wheels.sh
+flink-python/dev/glibc_version_fix.h
+flink-python/dev/dev-requirements.txt
diff --git a/.github/workflows/push_pr.yml b/flink-python/MANIFEST.in
similarity index 73%
copy from .github/workflows/push_pr.yml
copy to flink-python/MANIFEST.in
index ddc50ab8..3578d2df 100644
--- a/.github/workflows/push_pr.yml
+++ b/flink-python/MANIFEST.in
@@ -16,16 +16,6 @@
 # limitations under the License.
 

 
-name: CI
-on: [push, pull_request]
-concurrency:
-  group: ${{ github.workflow }}-${{ github.ref }}
-  cancel-in-progress: true
-jobs:
-  compile_and_test:
-strategy:
-  matrix:
-flink: [ 1.17.1, 1.18.0 ]
-uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
-with:
-  flink_version: ${{ matrix.flink }}
+graft pyflink
+global-exclude *.py[cod] __pycache__ .DS_Store
+
diff --git a/flink-python/README.txt b/flink-python/README.txt
new file mode 100644
index ..a12c13e5
--- /dev/null
+++ b/flink-python/README.txt
@@ -0,0 +1,14 @@
+This is official Apache Flink Kafka Python connector.
+
+For the latest information about Flink connector, please visit our website at:
+
+   https://flink.apache.org
+
+and our GitHub Account for Kafka connector
+
+   https://github.com/apache/flink-connector-kafka
+
+If you have any questions, ask on our Mailing lists:
+
+   u...@flink.apache.org
+   d...@flink.apache.org
diff --git a/flink-python/dev/integration_test.sh 
b/flink-python/dev/integration_test.sh
new file mode 100755
index ..19816725
--- /dev/null
+++ b/flink-python/dev/integration_test.sh
@@ -0,0 +1,54 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,

(flink) branch master updated: [FLINK-33713][core] Deprecate RuntimeContext#getExecutionConfig

2023-12-11 Thread zhuzh
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 3aa70df4e7d [FLINK-33713][core] Deprecate 
RuntimeContext#getExecutionConfig
3aa70df4e7d is described below

commit 3aa70df4e7da93ed32c26cfabdaeb606233419b1
Author: JunRuiLee 
AuthorDate: Wed Nov 8 11:48:45 2023 +0800

[FLINK-33713][core] Deprecate RuntimeContext#getExecutionConfig

This closes #23848.
---
 .../mapred/HadoopReduceCombineFunction.java|  3 +-
 .../mapred/HadoopReduceFunction.java   |  3 +-
 .../flink/api/common/functions/RuntimeContext.java | 33 +
 .../api/common/functions/SerializerFactory.java| 38 
 .../functions/util/AbstractRuntimeUDFContext.java  | 19 ++
 .../flink/api/common/state/StateDescriptor.java| 15 +++-
 .../flink/cep/operator/CepRuntimeContext.java  | 19 ++
 .../flink/cep/operator/CepRuntimeContextTest.java  | 12 ---
 .../flink/state/api/EvictingWindowReader.java  |  3 +-
 .../state/api/EvictingWindowSavepointReader.java   |  3 +-
 .../apache/flink/state/api/ExistingSavepoint.java  | 39 
 .../apache/flink/state/api/SavepointReader.java| 12 ---
 .../org/apache/flink/state/api/WindowReader.java   |  3 +-
 .../flink/state/api/WindowSavepointReader.java |  3 +-
 .../state/api/input/BroadcastStateInputFormat.java |  8 +++--
 .../state/api/input/KeyedStateInputFormat.java | 41 ++
 .../state/api/input/ListStateInputFormat.java  |  9 +++--
 .../state/api/input/OperatorStateInputFormat.java  | 25 +++--
 .../api/input/StreamOperatorContextBuilder.java|  9 +++--
 .../state/api/input/UnionStateInputFormat.java |  9 +++--
 .../api/input/operator/StateReaderOperator.java| 14 
 .../api/input/operator/WindowReaderOperator.java   |  4 +--
 .../api/output/BootstrapStreamTaskRunner.java  |  3 +-
 .../output/BoundedOneInputStreamTaskRunner.java|  5 ++-
 .../state/api/runtime/SavepointEnvironment.java| 13 +--
 .../state/api/runtime/SavepointRuntimeContext.java | 19 ++
 .../api/input/BroadcastStateInputFormatTest.java   |  4 ++-
 .../state/api/input/KeyedStateInputFormatTest.java | 22 
 .../state/api/input/ListStateInputFormatTest.java  | 13 +--
 .../input/StreamOperatorContextBuilderTest.java|  6 ++--
 .../state/api/input/UnionStateInputFormatTest.java |  4 ++-
 .../flink/state/api/input/WindowReaderTest.java| 16 ++---
 .../org/apache/flink/python/util/ProtoUtils.java   |  6 ++--
 .../AbstractPythonStreamAggregateOperator.java |  3 +-
 ...wPythonOverWindowAggregateFunctionOperator.java |  3 +-
 .../runtime/state/DefaultKeyedStateStore.java  | 18 +-
 .../scala/operators/ScalaAggregateOperator.java|  3 +-
 .../api/functions/async/RichAsyncFunction.java | 19 ++
 .../source/ContinuousFileReaderOperator.java   |  2 +-
 .../source/InputFormatSourceFunction.java  |  2 +-
 .../api/operators/StreamOperatorStateHandler.java  | 13 ++-
 .../streaming/api/operators/StreamSource.java  |  3 +-
 .../api/operators/StreamingRuntimeContext.java | 10 +++---
 .../runtime/operators/sink/SinkWriterOperator.java |  2 +-
 .../operators/windowing/WindowOperator.java| 11 +-
 .../api/functions/async/RichAsyncFunctionTest.java | 14 +---
 .../api/operators/StreamingRuntimeContextTest.java | 32 +++--
 .../flink/table/functions/FunctionContext.java |  7 +---
 .../org/apache/flink/test/operators/MapITCase.java | 22 ++--
 49 files changed, 461 insertions(+), 138 deletions(-)

diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
index 4db71d76d7b..fc9981379f8 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -117,8 +117,7 @@ public final class HadoopReduceCombineFunction
 Class inKeyClass =
 (Class) TypeExtractor.getParameterType(Reducer.class, 
reducer.getClass(), 0);
 TypeSerializer keySerializer =
-TypeExtractor.getForClass(inKeyClass)
-
.createSerializer(getRuntimeContext().getExecutionConfig());
+
getRuntimeContext().createSerializer(TypeExtractor.getForClass(inKeyClass));
 this.valueIterator = new 
HadoopTupleUnwrappingIterator<>(keySerializer);
 this.combineCollector =