(flink) branch master updated: [FLINK-34496] Break circular dependency in static initialization

2024-02-23 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 2d78c102112 [FLINK-34496] Break circular dependency in static 
initialization
2d78c102112 is described below

commit 2d78c10211272a264712e86192c4dfc59c6a5521
Author: Chesnay Schepler 
AuthorDate: Fri Feb 23 10:26:45 2024 +0100

[FLINK-34496] Break circular dependency in static initialization
---
 .../planner/plan/utils/ExecNodeMetadataUtil.java  | 19 +--
 .../planner/plan/utils/ExecNodeMetadataUtilTest.java  |  3 +--
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index efded08a82e..a7e1d2840ad 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.MultipleExecNodeMetadata;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCalc;
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
@@ -79,8 +78,12 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowJoi
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowRank;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowTableFunction;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+
 import javax.annotation.Nullable;
 
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -232,7 +235,7 @@ public final class ExecNodeMetadataUtil {
 }
 
 private static void addToLookupMap(Class> 
execNodeClass) {
-if (!JsonSerdeUtil.hasJsonCreatorAnnotation(execNodeClass)) {
+if (!hasJsonCreatorAnnotation(execNodeClass)) {
 throw new IllegalStateException(
 String.format(
 "ExecNode: %s does not implement @JsonCreator 
annotation on "
@@ -366,4 +369,16 @@ public final class ExecNodeMetadataUtil {
 return Objects.hash(name, version);
 }
 }
+
+/** Return true if the given class's constructors have @JsonCreator 
annotation, else false. */
+static boolean hasJsonCreatorAnnotation(Class clazz) {
+for (Constructor constructor : clazz.getDeclaredConstructors()) {
+for (Annotation annotation : constructor.getAnnotations()) {
+if (annotation instanceof JsonCreator) {
+return true;
+}
+}
+}
+return false;
+}
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtilTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtilTest.java
index 323a037f310..3e2a8c3275b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtilTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtilTest.java
@@ -32,7 +32,6 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.MultipleExecNodeMetadata;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil;
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
 import org.apache.flink.table.types.logical.LogicalType;
 
@@ -239,7 +238,7 @@ class ExecNodeMetadataUtilTest {
 List>> 
classesWithJsonCreatorInUnsupportedList =
 new ArrayList<>();
 for (Class> clazz : subClasses) {
-boolean hasJsonCreator = 
JsonSerdeUtil.hasJsonCreatorAnnotation(clazz);
+boolean hasJsonCreator = 
ExecNodeMetadataUtil.hasJsonCreatorAnnotation(clazz);
 if (hasJsonCreator 

(flink) branch release-1.18 updated: [FLINK-34496] Break circular dependency in static initialization

2024-02-23 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new 39ed3cf279d [FLINK-34496] Break circular dependency in static 
initialization
39ed3cf279d is described below

commit 39ed3cf279d61e4472e1c30a17927992236df467
Author: Chesnay Schepler 
AuthorDate: Fri Feb 23 10:26:45 2024 +0100

[FLINK-34496] Break circular dependency in static initialization
---
 .../planner/plan/nodes/exec/serde/JsonSerdeUtil.java  | 15 ---
 .../planner/plan/utils/ExecNodeMetadataUtil.java  | 19 +--
 .../planner/plan/utils/ExecNodeMetadataUtilTest.java  |  3 +--
 3 files changed, 18 insertions(+), 19 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index dffd134e97a..2d34c710a76 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -42,7 +42,6 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
 
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.ObjectCodec;
@@ -68,26 +67,12 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexWindowBound;
 
 import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Constructor;
 import java.util.Optional;
 
 /** A utility class that provide abilities for JSON serialization and 
deserialization. */
 @Internal
 public class JsonSerdeUtil {
 
-/** Return true if the given class's constructors have @JsonCreator 
annotation, else false. */
-public static boolean hasJsonCreatorAnnotation(Class clazz) {
-for (Constructor constructor : clazz.getDeclaredConstructors()) {
-for (Annotation annotation : constructor.getAnnotations()) {
-if (annotation instanceof JsonCreator) {
-return true;
-}
-}
-}
-return false;
-}
-
 /**
  * Object mapper shared instance to serialize and deserialize the plan. 
Note that creating and
  * copying of object mappers is expensive and should be avoided.
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index 5c467f29583..84ce3fbf2a9 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.MultipleExecNodeMetadata;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil;
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCorrelate;
@@ -78,8 +77,12 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowJoi
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowRank;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowTableFunction;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+
 import javax.annotation.Nullable;
 
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -230,7 +233,7 @@ public final class ExecNodeMetadataUtil {
 }
 
 private static void addToLookupMap(Class> 
execNodeClass) {
-if (!JsonSerdeUtil.hasJsonCreatorAnnotation(execNodeClass)) {
+if (!hasJsonCreatorAnnotation(execNodeClass)) {
 throw new IllegalStateException(
 

(flink) branch release-1.19 updated: [FLINK-34496] Break circular dependency in static initialization

2024-02-23 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
 new dd77ee5a250 [FLINK-34496] Break circular dependency in static 
initialization
dd77ee5a250 is described below

commit dd77ee5a2501a6750387126c347cf540f3fb172b
Author: Chesnay Schepler 
AuthorDate: Fri Feb 23 10:26:45 2024 +0100

[FLINK-34496] Break circular dependency in static initialization
---
 .../planner/plan/nodes/exec/serde/JsonSerdeUtil.java  | 15 ---
 .../planner/plan/utils/ExecNodeMetadataUtil.java  | 19 +--
 .../planner/plan/utils/ExecNodeMetadataUtilTest.java  |  3 +--
 3 files changed, 18 insertions(+), 19 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index b55fccbff28..fb3a723d9ca 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -42,7 +42,6 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
 
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.ObjectCodec;
@@ -69,26 +68,12 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexWindowBound;
 
 import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Constructor;
 import java.util.Optional;
 
 /** A utility class that provide abilities for JSON serialization and 
deserialization. */
 @Internal
 public class JsonSerdeUtil {
 
-/** Return true if the given class's constructors have @JsonCreator 
annotation, else false. */
-public static boolean hasJsonCreatorAnnotation(Class clazz) {
-for (Constructor constructor : clazz.getDeclaredConstructors()) {
-for (Annotation annotation : constructor.getAnnotations()) {
-if (annotation instanceof JsonCreator) {
-return true;
-}
-}
-}
-return false;
-}
-
 /**
  * Object mapper shared instance to serialize and deserialize the plan. 
Note that creating and
  * copying of object mappers is expensive and should be avoided.
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index efded08a82e..a7e1d2840ad 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.MultipleExecNodeMetadata;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCalc;
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
@@ -79,8 +78,12 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowJoi
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowRank;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowTableFunction;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+
 import javax.annotation.Nullable;
 
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -232,7 +235,7 @@ public final class ExecNodeMetadataUtil {
 }
 
 private static void addToLookupMap(Class> 
execNodeClass) {
-if (!JsonSerdeUtil.hasJsonCreatorAnnotation(execNodeClass)) {
+if (!hasJsonCreatorAnnotation(execNodeClass)) {
 throw new IllegalStateException(
 

(flink) branch master updated (2d78c102112 -> 6c8f3a0799c)

2024-02-23 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 2d78c102112 [FLINK-34496] Break circular dependency in static 
initialization
 add 6c8f3a0799c [FLINK-34496] Remove unused method

No new revisions were added by this update.

Summary of changes:
 .../planner/plan/nodes/exec/serde/JsonSerdeUtil.java  | 15 ---
 1 file changed, 15 deletions(-)



(flink-kubernetes-operator) branch main updated: [FLINK-34471] Tune network memory with memory tuning (#781)

2024-02-23 Thread mxm
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 6a860651 [FLINK-34471] Tune network memory with memory tuning (#781)
6a860651 is described below

commit 6a8606517fb9f38f3a486c022f925368c9b4d9ae
Author: Maximilian Michels 
AuthorDate: Fri Feb 23 11:35:34 2024 +0100

[FLINK-34471] Tune network memory with memory tuning (#781)
---
 .../apache/flink/autoscaler/JobAutoScalerImpl.java |   2 +-
 .../apache/flink/autoscaler/ScalingExecutor.java   |  10 +-
 .../flink/autoscaler/tuning/MemoryTuning.java  |  79 +++---
 .../flink/autoscaler/utils/ResourceCheckUtils.java |   2 +-
 .../MetricsCollectionAndEvaluationTest.java|  21 +++-
 .../flink/autoscaler/ScalingExecutorTest.java  | 114 +
 .../flink/autoscaler/utils/MemoryTuningTest.java   |  64 +---
 7 files changed, 236 insertions(+), 56 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 5bb7b791..cc56eb3e 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -216,7 +216,7 @@ public class JobAutoScalerImpl>
 
 var parallelismChanged =
 scalingExecutor.scaleResource(
-ctx, evaluatedMetrics, scalingHistory, 
scalingTracking, now);
+ctx, evaluatedMetrics, scalingHistory, 
scalingTracking, now, jobTopology);
 
 if (parallelismChanged) {
 autoscalerMetrics.incrementScaling();
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index 4ff9ae36..ef401ffe 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -26,6 +26,7 @@ import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.autoscaler.resources.NoopResourceCheck;
 import org.apache.flink.autoscaler.resources.ResourceCheck;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.autoscaler.tuning.MemoryTuning;
 import org.apache.flink.autoscaler.utils.CalendarUtils;
 import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
@@ -94,7 +95,8 @@ public class ScalingExecutor> {
 EvaluatedMetrics evaluatedMetrics,
 Map> 
scalingHistory,
 ScalingTracking scalingTracking,
-Instant now)
+Instant now,
+JobTopology jobTopology)
 throws Exception {
 var conf = context.getConfiguration();
 var restartTime = scalingTracking.getMaxRestartTimeOrDefault(conf);
@@ -120,7 +122,11 @@ public class ScalingExecutor> {
 
 var configOverrides =
 MemoryTuning.tuneTaskManagerHeapMemory(
-context, evaluatedMetrics, autoScalerEventHandler);
+context,
+evaluatedMetrics,
+jobTopology,
+scalingSummaries,
+autoScalerEventHandler);
 
 if (scalingWouldExceedClusterResources(
 configOverrides.applyOverrides(conf),
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
index 8ca0dcc3..af5fd8a1 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
@@ -18,16 +18,22 @@
 package org.apache.flink.autoscaler.tuning;
 
 import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
 import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
+import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.config

(flink) 01/02: [FLINK-34118] Implement restore tests for Sort node

2024-02-23 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fe3d9a42995cfee0dfd90e8031768cb130543189
Author: bvarghese1 
AuthorDate: Tue Jan 16 17:54:46 2024 -0800

[FLINK-34118] Implement restore tests for Sort node
---
 .../plan/nodes/exec/stream/SortRestoreTest.java|  38 +
 .../plan/nodes/exec/stream/SortTestPrograms.java   |  48 +-
 .../plan/nodes/exec/testutils/RestoreTestBase.java |   9 +-
 .../stream-exec-sort_1/sort-asc/plan/sort-asc.json | 164 +
 .../sort-desc/plan/sort-desc.json  | 164 +
 5 files changed, 421 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortRestoreTest.java
new file mode 100644
index 000..18e9792f9ed
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortRestoreTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecSort}. */
+public class SortRestoreTest extends RestoreTestBase {
+
+public SortRestoreTest() {
+super(StreamExecSort.class, AfterRestoreSource.NO_RESTORE);
+}
+
+@Override
+public List programs() {
+return Arrays.asList(SortTestPrograms.SORT_ASC, 
SortTestPrograms.SORT_DESC);
+}
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortTestPrograms.java
index 0a6f68d4e76..2959e2e6a0e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortTestPrograms.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.table.planner.utils.InternalConfigOptions;
 import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.SourceTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;
@@ -25,7 +26,8 @@ import org.apache.flink.types.Row;
 
 /**
  * {@link TableTestProgram} definitions for testing {@link
- * org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit}.
+ * org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit} 
and {@link
+ * org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSort}.
  */
 public class SortTestPrograms {
 
@@ -123,4 +125,48 @@ public class SortTestPrograms {
 .build())
 .runSql("INSERT INTO sink_t SELECT * from source_t ORDER 
BY a DESC LIMIT 3")
 .build();
+
+static final TableTestProgram SORT_ASC =
+TableTestProgram.of("sort-asc", "validates sort node by sorting 
integers in asc mode")
+
.setupConfig(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema("a INT", "b VARCHAR", "c INT")
+.producedValues(DATA)
+.build())
+.setupTableSink(
+SinkTestStep.newBuilder("sink_t")
+.addSchema("a INT", "b VARCHAR", "c 
BIGINT")
+.consumedValues(
+   

(flink) branch master updated (6c8f3a0799c -> faacf7e28bd)

2024-02-23 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 6c8f3a0799c [FLINK-34496] Remove unused method
 new fe3d9a42995 [FLINK-34118] Implement restore tests for Sort node
 new faacf7e28bd [FLINK-34118] Remove Sort Json Plan tests

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../plan/nodes/exec/stream/SortJsonPlanTest.java   |  66 
 ...{LimitRestoreTest.java => SortRestoreTest.java} |  10 +-
 .../plan/nodes/exec/stream/SortTestPrograms.java   |  48 +-
 .../plan/nodes/exec/testutils/RestoreTestBase.java |   9 +-
 .../stream/SortJsonPlanTest_jsonplan/testSort.out  | 172 -
 .../sort-asc/plan/sort-asc.json}   |  28 +---
 .../sort-desc/plan/sort-desc.json} |  28 +---
 7 files changed, 66 insertions(+), 295 deletions(-)
 delete mode 100644 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest.java
 copy 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/{LimitRestoreTest.java
 => SortRestoreTest.java} (79%)
 delete mode 100644 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest_jsonplan/testSort.out
 copy 
flink-table/flink-table-planner/src/test/resources/restore-tests/{stream-exec-sort-limit_1/sort-limit-asc/plan/sort-limit-asc.json
 => stream-exec-sort_1/sort-asc/plan/sort-asc.json} (85%)
 copy 
flink-table/flink-table-planner/src/test/resources/restore-tests/{stream-exec-sort-limit_1/sort-limit-desc/plan/sort-limit-desc.json
 => stream-exec-sort_1/sort-desc/plan/sort-desc.json} (85%)



(flink) 02/02: [FLINK-34118] Remove Sort Json Plan tests

2024-02-23 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit faacf7e28bd9a43723303d0bd4a6ee9adebcb5bb
Author: bvarghese1 
AuthorDate: Tue Jan 16 17:58:33 2024 -0800

[FLINK-34118] Remove Sort Json Plan tests

- Covered with restore tests
---
 .../plan/nodes/exec/stream/SortJsonPlanTest.java   |  66 
 .../stream/SortJsonPlanTest_jsonplan/testSort.out  | 172 -
 2 files changed, 238 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest.java
deleted file mode 100644
index 91c62f9fe33..000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/** Test json serialization for sort limit. */
-class SortJsonPlanTest extends TableTestBase {
-
-private StreamTableTestUtil util;
-private TableEnvironment tEnv;
-
-@BeforeEach
-void setup() {
-util = streamTestUtil(TableConfig.getDefault());
-tEnv = util.getTableEnv();
-
-String srcTableDdl =
-"CREATE TABLE MyTable (\n"
-+ "  a bigint,\n"
-+ "  b int not null,\n"
-+ "  c varchar,\n"
-+ "  d timestamp(3)\n"
-+ ") with (\n"
-+ "  'connector' = 'values',\n"
-+ "  'bounded' = 'false')";
-tEnv.executeSql(srcTableDdl);
-}
-
-@Test
-void testSort() {
-String sinkTableDdl =
-"CREATE TABLE MySink (\n"
-+ "  a bigint,\n"
-+ "  b bigint\n"
-+ ") with (\n"
-+ "  'connector' = 'values',\n"
-+ "  'sink-insert-only' = 'false',\n"
-+ "  'table-sink-class' = 'DEFAULT')";
-tEnv.executeSql(sinkTableDdl);
-String sql = "insert into MySink SELECT a, a from MyTable order by b";
-util.verifyJsonPlan(sql);
-}
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest_jsonplan/testSort.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest_jsonplan/testSort.out
deleted file mode 100644
index c73bfeead3f..000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest_jsonplan/testSort.out
+++ /dev/null
@@ -1,172 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-"id" : 1,
-"type" : "stream-exec-table-source-scan_1",
-"scanTableSource" : {
-  "table" : {
-"identifier" : "`default_catalog`.`default_database`.`MyTable`",
-"resolvedTable" : {
-  "schema" : {
-"columns" : [ {
-  "name" : "a",
-  "dataType" : "BIGINT"
-}, {
-  "name" : "b",
-  "dataType" : "INT NOT NULL"
-}, {
-  "name" : "c",
-  "dataType" : "VARCHAR(2147483647)"
-}, {
-  "name" : "d",
-  "dataType" : "TIMESTAMP(3)"
-} ],
-"watermarkSpecs" : [ ]
-  },
-  "partitionKeys" : [ ],
-  "options" : {
-"bounded" : "false",
-"connector" : "values"
-  }
-

(flink-kubernetes-operator) branch main updated: [FLINK-34502][autoscaler] Support calculating network memory for forward and rescale edge (#782)

2024-02-23 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 304fca82 [FLINK-34502][autoscaler] Support calculating network memory 
for forward and rescale edge (#782)
304fca82 is described below

commit 304fca82ccc153c0745c468893a8930e9b9be806
Author: Rui Fan <1996fan...@gmail.com>
AuthorDate: Fri Feb 23 23:45:23 2024 +0800

[FLINK-34502][autoscaler] Support calculating network memory for forward 
and rescale edge (#782)
---
 .../flink/autoscaler/ScalingMetricEvaluator.java   |  4 +-
 .../flink/autoscaler/topology/JobTopology.java | 22 +
 .../flink/autoscaler/topology/VertexInfo.java  | 14 +++---
 .../flink/autoscaler/tuning/MemoryTuning.java  | 56 ++
 .../flink/autoscaler/BacklogBasedScalingTest.java  | 22 +
 .../flink/autoscaler/JobAutoScalerImplTest.java|  3 +-
 .../MetricsCollectionAndEvaluationTest.java| 26 +-
 .../autoscaler/RecommendedParallelismTest.java | 10 ++--
 .../flink/autoscaler/ScalingExecutorTest.java  |  5 +-
 .../autoscaler/ScalingMetricCollectorTest.java | 29 ++-
 .../autoscaler/ScalingMetricEvaluatorTest.java | 28 ++-
 .../flink/autoscaler/ScalingTrackingTest.java  |  3 +-
 .../autoscaler/metrics/ScalingMetricsTest.java | 11 +++--
 .../flink/autoscaler/topology/JobTopologyTest.java | 25 ++
 .../{utils => tuning}/MemoryTuningTest.java| 38 ---
 15 files changed, 194 insertions(+), 102 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index 6512758b..085d80ae 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -338,7 +338,7 @@ public class ScalingMetricEvaluator {
 }
 out.put(CATCH_UP_DATA_RATE, 
EvaluatedScalingMetric.of(catchUpInputRate));
 } else {
-var inputs = topology.get(vertex).getInputs();
+var inputs = topology.get(vertex).getInputs().keySet();
 double sumAvgTargetRate = 0;
 double sumCatchUpDataRate = 0;
 for (var inputVertex : inputs) {
@@ -531,7 +531,7 @@ public class ScalingMetricEvaluator {
 JobVertexID from,
 JobVertexID to) {
 
-var toVertexInputs = topology.get(to).getInputs();
+var toVertexInputs = topology.get(to).getInputs().keySet();
 // Case 1: Downstream vertex has single input (from) so we can use the 
most reliable num
 // records in
 if (toVertexInputs.size() == 1) {
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
index c0a98d2e..c140c423 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
@@ -61,7 +61,7 @@ public class JobTopology {
 
 public JobTopology(Set vertexInfo) {
 
-Map> vertexOutputs = new HashMap<>();
+Map> vertexOutputs = new 
HashMap<>();
 vertexInfos =
 ImmutableMap.copyOf(
 
vertexInfo.stream().collect(Collectors.toMap(VertexInfo::getId, v -> v)));
@@ -72,13 +72,13 @@ public class JobTopology {
 info -> {
 var vertexId = info.getId();
 
-vertexOutputs.computeIfAbsent(vertexId, id -> new 
HashSet<>());
+vertexOutputs.computeIfAbsent(vertexId, id -> new 
HashMap<>());
 info.getInputs()
 .forEach(
-inputId ->
+(inputId, shipStrategy) ->
 vertexOutputs
-.computeIfAbsent(inputId, 
id -> new HashSet<>())
-.add(vertexId));
+.computeIfAbsent(inputId, 
id -> new HashMap<>())
+.put(vertexId, 
shipStrategy));
 if (info.isFinished()) {
 finishedVertices.add(vertexId);
 }
@@ -105,7 +105,8 @@ public class JobTopology {
 List sorted = new ArrayList<>(vertexInfos.size());
 
 Map> remainingInputs = new 
HashMap<>(vertexInfos.size());
-vertexInfos.forEach((id, v) -> remainingInputs.put(id, new 
ArrayList<>(v.g

(flink-kubernetes-operator) branch dependabot/maven/flink-autoscaler-plugin-jdbc/org.postgresql-postgresql-42.5.5 deleted (was 96f53bb8)

2024-02-23 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/flink-autoscaler-plugin-jdbc/org.postgresql-postgresql-42.5.5
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


 was 96f53bb8 Bump org.postgresql:postgresql in 
/flink-autoscaler-plugin-jdbc

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



(flink-kubernetes-operator) branch main updated: Add helm chart repo to helm docs (#784)

2024-02-23 Thread gyfora
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new d738c571 Add helm chart repo to helm docs (#784)
d738c571 is described below

commit d738c5718177807a88a1cdb9d799e25e81273ddf
Author: Domenic Bove 
AuthorDate: Fri Feb 23 13:26:54 2024 -0800

Add helm chart repo to helm docs (#784)

Helm docs do not mention the chart repo, which will be useful for anyone 
who doesnt want to download the source code or is using argocd/terraform
---
 docs/content/docs/operations/helm.md | 11 +--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/docs/content/docs/operations/helm.md 
b/docs/content/docs/operations/helm.md
index 28110896..12425a12 100644
--- a/docs/content/docs/operations/helm.md
+++ b/docs/content/docs/operations/helm.md
@@ -26,13 +26,20 @@ under the License.
 
 # Helm installation
 
-The operator installation is managed by a helm chart. To install run:
+The operator installation is managed by a helm chart. To install with the 
chart bundled in the source code run:
 
 ```
 helm install flink-kubernetes-operator helm/flink-kubernetes-operator
 ```
 
-Alternatively to install the operator (and also the helm chart) to a specific 
namespace:
+To install from our Helm Chart Reporsitory run:
+
+```
+helm repo add flink-operator-repo 
https://downloads.apache.org/flink/flink-kubernetes-operator-/
+helm install flink-kubernetes-operator 
flink-operator-repo/flink-kubernetes-operator
+```
+
+Alternatively to install the operator (and also the helm chart) to a specific 
namespace add the arguments `--namespace` and `--create-namespace` ex:
 
 ```
 helm install flink-kubernetes-operator helm/flink-kubernetes-operator 
--namespace flink --create-namespace