[flink] branch master updated: [FLINK-31035] add warn info to user when NoNodeException happend

2023-07-12 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f89be70c2b0 [FLINK-31035] add warn info to user when NoNodeException 
happend
f89be70c2b0 is described below

commit f89be70c2b00522ccde0c47f45c2236acdd4ad6d
Author: xuyu <11161...@vivo.com>
AuthorDate: Tue Jul 11 18:44:56 2023 +0800

[FLINK-31035] add warn info to user when NoNodeException happend
---
 .../org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java  | 3 +++
 1 file changed, 3 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index a5bbb8fa1a9..95c81ac7717 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -364,6 +364,9 @@ public class ZooKeeperStateHandleStore
 return client.getChildren().forPath(path);
 } catch (KeeperException.NoNodeException ignored) {
 // Concurrent deletion, retry
+LOG.debug(
+"Unable to get all handles, retrying (ZNode was 
likely deleted concurrently: {})",
+ignored.getMessage());
 }
 }
 }



[flink] 05/05: [FLINK-22734][table-runtime] Fix comments for HashJoin and Calc OperatorFusionCodeGen

2023-07-12 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f480b6d313f64ff5d8d1e6f1d74f2dacc3ea133b
Author: fengli 
AuthorDate: Mon Jun 26 18:05:42 2023 +0800

[FLINK-22734][table-runtime] Fix comments for HashJoin and Calc 
OperatorFusionCodeGen

This closes #22734
---
 .../nodes/exec/batch/BatchExecMultipleInput.java   |   8 +-
 .../planner/codegen/GeneratedExpression.scala  |   2 +-
 .../planner/plan/fusion/FusionCodegenUtil.scala|  84 ++-
 .../fusion/OpFusionCodegenSpecGeneratorBase.scala  | 107 
 .../plan/fusion/spec/CalcFusionCodegenSpec.scala   |  41 ++--
 .../fusion/spec/HashJoinFusionCodegenSpec.scala| 269 +++--
 .../spec/InputAdapterFusionCodegenSpec.scala   |  16 +-
 .../plan/fusion/spec/OutputFusionCodegenSpec.scala |   8 +-
 .../planner/utils/JavaScalaConversionUtil.scala|   7 +-
 .../runtime/hashtable/BaseHybridHashTable.java |   2 +-
 10 files changed, 322 insertions(+), 222 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMultipleInput.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMultipleInput.java
index c7b661609dd..8ffbcbc7fc9 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMultipleInput.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMultipleInput.java
@@ -44,8 +44,8 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import 
org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
 import 
org.apache.flink.table.runtime.operators.fusion.OperatorFusionCodegenFactory;
 import 
org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory;
-import 
org.apache.flink.table.runtime.operators.multipleinput.MultipleInputSpec;
 import 
org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapperGenerator;
+import 
org.apache.flink.table.runtime.operators.multipleinput.input.InputSelectionSpec;
 import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
@@ -140,7 +140,7 @@ public class BatchExecMultipleInput extends 
ExecNodeBase
 boolean fusionCodegenEnabled = 
config.get(TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED);
 // multiple operator fusion codegen
 if (fusionCodegenEnabled && allSupportFusionCodegen()) {
-final List multipleInputSpecs = new 
ArrayList<>();
+final List inputSelectionSpecs = new 
ArrayList<>();
 int i = 0;
 for (ExecEdge inputEdge : originalEdges) {
 int multipleInputId = i + 1;
@@ -165,7 +165,7 @@ public class BatchExecMultipleInput extends 
ExecNodeBase
 
ExecEdge.builder().source(inputAdapter).target(target).build());
 
 // The input id and read order
-multipleInputSpecs.add(new MultipleInputSpec(multipleInputId, 
readOrders[i]));
+inputSelectionSpecs.add(new 
InputSelectionSpec(multipleInputId, readOrders[i]));
 i++;
 }
 
@@ -184,7 +184,7 @@ public class BatchExecMultipleInput extends 
ExecNodeBase
 
 // generate fusion operator
 Tuple2, Object> 
multipleOperatorTuple =
-FusionCodegenUtil.generateFusionOperator(outputGenerator, 
multipleInputSpecs);
+FusionCodegenUtil.generateFusionOperator(outputGenerator, 
inputSelectionSpecs);
 operatorFactory = multipleOperatorTuple._1;
 
 Pair parallelismPair = 
getInputMaxParallelism(inputTransforms);
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
index 13469590491..108ea10d1ad 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
@@ -39,7 +39,7 @@ import org.apache.flink.table.types.logical.LogicalType
 case class GeneratedExpression(
 resultTerm: String,
 nullTerm: String,
-code: String,
+var code: String,
 resultType: LogicalType,
 literalValue: Option[Any] = None) {
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/FusionCodegenUtil.scala
 
b/flink-table/flink-table-planner/src/ma

[flink] 04/05: [FLINK-32278][table-runtime] Calc and HashJoin operator support operator fusion codegen

2023-07-12 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f03c26a6f0189b78975828dc77482e5ba1e53641
Author: fengli 
AuthorDate: Thu Jun 15 19:41:20 2023 +0800

[FLINK-32278][table-runtime] Calc and HashJoin operator support operator 
fusion codegen
---
 .../generated/execution_config_configuration.html  |   6 +
 .../table/api/config/ExecutionConfigOptions.java   |   8 +
 .../planner/plan/fusion/OpFusionCodegenSpec.java   |   5 +-
 .../plan/fusion/OpFusionCodegenSpecGenerator.java  |   6 +
 .../plan/nodes/exec/FusionCodegenExecNode.java |   1 +
 .../plan/nodes/exec/batch/BatchExecCalc.java   |  31 
 .../plan/nodes/exec/batch/BatchExecHashJoin.java   |  63 +++
 .../nodes/exec/batch/BatchExecInputAdapter.java|  83 +
 .../nodes/exec/batch/BatchExecMultipleInput.java   | 158 +---
 .../plan/nodes/exec/common/CommonExecCalc.java |   4 +-
 .../processor/ForwardHashExchangeProcessor.java|   4 +
 .../MultipleInputNodeCreationProcessor.java|  16 +-
 .../planner/plan/fusion/FusionCodegenUtil.scala| 171 ++
 .../fusion/OpFusionCodegenSpecGeneratorBase.scala  |  35 
 .../fusion/spec/HashJoinFusionCodegenSpec.scala|  11 +-
 .../physical/batch/BatchPhysicalHashJoin.scala |   1 +
 .../runtime/batch/sql/MultipleInputITCase.scala|   6 +-
 .../batch/sql/OperatorFusionCodegenITCase.scala| 200 +
 .../runtime/hashtable/BaseHybridHashTable.java |  10 +-
 .../table/runtime/hashtable/BinaryHashTable.java   |   9 +-
 .../runtime/hashtable/LongHybridHashTable.java |  64 ++-
 .../operators/fusion/FusionStreamOperatorBase.java |   3 +-
 22 files changed, 858 insertions(+), 37 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/execution_config_configuration.html 
b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index 32046454a17..6c34ab5a969 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -76,6 +76,12 @@ By default no operator is disabled.
 Long
 The maximum number of input records can be buffered for 
MiniBatch. MiniBatch is an optimization to buffer input records to reduce state 
access. MiniBatch is triggered with the allowed latency interval and when the 
maximum number of buffered records reached. NOTE: MiniBatch only works for 
non-windowed aggregations currently. If table.exec.mini-batch.enabled is set 
true, its value must be positive.
 
+
+table.exec.operator-fusion-codegen.enabled Batch Streaming
+false
+Boolean
+If true, multiple physical operators will be compiled into a 
single operator by planner which can improve the performance.
+
 
 table.exec.rank.topn-cache-size Streaming
 1
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index e37222cabcf..df5438106d2 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -389,6 +389,14 @@ public class ExecutionConfigOptions {
 + "\"SortMergeJoin\", \"HashAgg\", 
\"SortAgg\".\n"
 + "By default no operator is disabled.");
 
+@Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+public static final ConfigOption 
TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED =
+key("table.exec.operator-fusion-codegen.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"If true, multiple physical operators will be 
compiled into a single operator by planner which can improve the performance.");
+
 /** @deprecated Use {@link ExecutionOptions#BATCH_SHUFFLE_MODE} instead. */
 @Deprecated
 @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java
index 876c44c138d..dc3ed4cce90 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java
@@ -82,11 +82,12 @@ publi

[flink] 03/05: [FLINK-32278][table-runtime] Introduce the Calc and HashJoin operator spec to support operator fusion codegen

2023-07-12 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6040f480aae92f5449eb548a9eb3e042442b2bbb
Author: fengli 
AuthorDate: Mon Jun 5 20:43:07 2023 +0800

[FLINK-32278][table-runtime] Introduce the Calc and HashJoin operator spec 
to support operator fusion codegen
---
 .../plan/fusion/OpFusionCodegenSpecGenerator.java  |   4 +
 .../flink/table/planner/codegen/CodeGenUtils.scala |   2 +
 .../planner/codegen/CodeGeneratorContext.scala |  48 ++
 .../planner/codegen/LongHashJoinGenerator.scala|   2 +-
 .../fusion/OpFusionCodegenSpecGeneratorBase.scala  |   2 +-
 .../OneInputOpFusionCodegenSpecGenerator.scala |   2 +-
 .../SourceOpFusionCodegenSpecGenerator.scala}  |  12 +-
 .../TwoInputOpFusionCodegenSpecGenerator.scala |   2 +-
 .../plan/fusion/spec/CalcFusionCodegenSpec.scala   |  96 
 .../fusion/spec/HashJoinFusionCodegenSpec.scala| 539 +
 .../spec/InputAdapterFusionCodegenSpec.scala   |  82 
 .../plan/fusion/spec/OutputFusionCodegenSpec.scala |  49 ++
 .../table/runtime/operators/join/HashJoinType.java |   4 +
 13 files changed, 833 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpecGenerator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpecGenerator.java
index 943aecdead9..2b355f6610b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpecGenerator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpecGenerator.java
@@ -56,6 +56,10 @@ public abstract class OpFusionCodegenSpecGenerator {
 this.opFusionCodegenSpec.setup(opFusionContext);
 }
 
+public RowType getOutputType() {
+return outputType;
+}
+
 public OpFusionCodegenSpec getOpFusionCodegenSpec() {
 return opFusionCodegenSpec;
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index 4bbe41c5591..58214199aa1 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -58,6 +58,8 @@ object CodeGenUtils {
 
   val DEFAULT_TIMEZONE_TERM = "timeZone"
 
+  val DEFAULT_INPUT_TERM = "in"
+
   val DEFAULT_INPUT1_TERM = "in1"
 
   val DEFAULT_INPUT2_TERM = "in2"
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index 6ba1e1483c9..69ede520531 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -90,6 +90,14 @@ class CodeGeneratorContext(val tableConfig: ReadableConfig, 
val classLoader: Cla
   private val reusablePerRecordStatements: mutable.LinkedHashSet[String] =
 mutable.LinkedHashSet[String]()
 
+  // set of statements that will be added only for operator fusion codegen 
process method
+  private val reusableFusionCodegenProcessStatements: mutable.TreeMap[Int, 
String] =
+mutable.TreeMap[Int, String]()
+
+  // set of statements that will be added only for operator fusion codegen 
endInput method
+  private val reusableFusionCodegenEndInputStatements: mutable.TreeMap[Int, 
String] =
+mutable.TreeMap[Int, String]()
+
   // map of initial input unboxing expressions that will be added only once
   // (inputTerm, index) -> expr
   val reusableInputUnboxingExprs: mutable.Map[(String, Int), 
GeneratedExpression] =
@@ -305,6 +313,38 @@ class CodeGeneratorContext(val tableConfig: 
ReadableConfig, val classLoader: Cla
 reusableCleanupStatements.mkString("", "\n", "\n")
   }
 
+  /**
+   * @return
+   *   code block of statements that need to be placed in the getInputs() 
method of
+   *   [FusionStreamOperator]
+   */
+  def reuseFusionProcessCode(): String = {
+reusableFusionCodegenProcessStatements.values.mkString(",\n")
+  }
+
+  /**
+   * @return
+   *   code block of statements that need to be placed in the endInput() 
method of
+   *   [BoundedMultiInput]
+   */
+  def reuseFusionEndInputCode(inputId: String): String = {
+val endInputCode = reusableFusionCodegenEndInputStatements
+  .map {
+case (id, code) => s"""
+  |case $id:
+ 

[flink] 02/05: [FLINK-32277][table-runtime] Introduce the base FusionStreamOperator for OFCG

2023-07-12 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f062a38f43cdc9fdbaab3c68e1ec1c1ffdb88b5
Author: fengli 
AuthorDate: Mon Jun 5 20:41:25 2023 +0800

[FLINK-32277][table-runtime] Introduce the base FusionStreamOperator for 
OFCG
---
 .../operators/fusion/FusionStreamOperatorBase.java | 60 ++
 .../fusion/OperatorFusionCodegenFactory.java   | 50 ++
 .../BatchMultipleInputStreamOperator.java  |  2 +-
 .../multipleinput/input/InputSelectionHandler.java | 22 +---
 .../{InputSpec.java => InputSelectionSpec.java}| 49 --
 .../operators/multipleinput/input/InputSpec.java   | 37 ++---
 .../input/InputSelectionHandlerTest.java   |  4 +-
 7 files changed, 153 insertions(+), 71 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/fusion/FusionStreamOperatorBase.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/fusion/FusionStreamOperatorBase.java
new file mode 100644
index 000..3ebe474fac0
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/fusion/FusionStreamOperatorBase.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.fusion;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Base {@link MultipleInputStreamOperator} to handle multiple operator fusion 
codegen in table
+ * module.
+ */
+public abstract class FusionStreamOperatorBase extends 
AbstractStreamOperatorV2
+implements MultipleInputStreamOperator, InputSelectable, 
BoundedMultiInput {
+
+protected final StreamOperatorParameters parameters;
+
+public FusionStreamOperatorBase(
+StreamOperatorParameters parameters, int numberOfInputs) {
+super(parameters, numberOfInputs);
+this.parameters = parameters;
+}
+
+public org.apache.flink.streaming.runtime.tasks.StreamTask 
getContainingTask() {
+return parameters.getContainingTask();
+}
+
+public long computeMemorySize(double operatorFraction) {
+final double memFraction =
+parameters
+.getStreamConfig()
+.getManagedMemoryFractionOperatorUseCaseOfSlot(
+
org.apache.flink.core.memory.ManagedMemoryUseCase.OPERATOR,
+
getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration(),
+getRuntimeContext().getUserCodeClassLoader());
+return getContainingTask()
+.getEnvironment()
+.getMemoryManager()
+.computeMemorySize(memFraction * operatorFraction);
+}
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/fusion/OperatorFusionCodegenFactory.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/fusion/OperatorFusionCodegenFactory.java
new file mode 100644
index 000..052b49c8220
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/fusion/OperatorFusionCodegenFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the Lic

[flink] 01/05: [FLINK-32277][table-runtime] Introduce the basic operator fusion codegen framework

2023-07-12 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 02be55ab780186a4b738a331e17c436ad2543de3
Author: fengli 
AuthorDate: Mon Jun 5 20:33:54 2023 +0800

[FLINK-32277][table-runtime] Introduce the basic operator fusion codegen 
framework
---
 .../planner/plan/fusion/OpFusionCodegenSpec.java   | 117 
 .../plan/fusion/OpFusionCodegenSpecGenerator.java  | 155 +++
 .../table/planner/plan/fusion/OpFusionContext.java |  74 
 .../table/planner/plan/nodes/exec/ExecEdge.java|  10 +
 .../table/planner/plan/nodes/exec/ExecNode.java|   2 +-
 .../planner/plan/nodes/exec/ExecNodeBase.java  |  40 
 .../plan/nodes/exec/FusionCodegenExecNode.java |  37 
 .../flink/table/planner/codegen/CodeGenUtils.scala |  10 +-
 .../table/planner/codegen/ExprCodeGenerator.scala  |   9 -
 .../plan/fusion/OpFusionCodegenSpecBase.scala  |  48 +
 .../fusion/OpFusionCodegenSpecGeneratorBase.scala  | 211 +
 .../OneInputOpFusionCodegenSpecGenerator.scala |  35 
 .../TwoInputOpFusionCodegenSpecGenerator.scala |  37 
 .../plan/nodes/exec/TestingBatchExecNode.java  |  11 ++
 14 files changed, 785 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java
new file mode 100644
index 000..876c44c138d
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.fusion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ExprCodeGenerator;
+import org.apache.flink.table.planner.codegen.GeneratedExpression;
+
+import java.util.List;
+import java.util.Set;
+
+/** An interface for those physical operators that support operator fusion 
codegen. */
+@Internal
+public interface OpFusionCodegenSpec {
+
+/**
+ * Initializes the operator spec. Sets access to the context. This method 
must be called before
+ * doProduce and doConsume related methods.
+ */
+void setup(OpFusionContext opFusionContext);
+
+/** Prefix used in the current operator's variable names. */
+String variablePrefix();
+
+/**
+ * The subset of column index those should be evaluated before this 
operator.
+ *
+ * We will use this to insert some code to access those columns that 
are actually used by
+ * current operator before calling doProcessConsume().
+ */
+Set usedInputColumns(int inputId);
+
+/**
+ * Specific inputId of current operator needed {@link RowData} type, this 
is used to notify the
+ * upstream operator wrap the proper {@link RowData} we needed before call 
doProcessConsume
+ * method. For example, HashJoin build side need {@link BinaryRowData}.
+ */
+Class getInputRowDataClass(int inputId);
+
+/**
+ * Every operator need one {@link CodeGeneratorContext} to store the 
context needed during
+ * operator fusion codegen.
+ */
+CodeGeneratorContext getCodeGeneratorContext();
+
+/** Get the {@link ExprCodeGenerator} used by this operator during 
operator fusion codegen, . */
+ExprCodeGenerator getExprCodeGenerator();
+
+/**
+ * Generate the Java source code to process rows, only the leaf operator 
in operator DAG need to
+ * generate the code which produce the row, other middle operators just 
call its input {@link
+ * OpFusionCodegenSpecGenerator#processProduce(CodeGeneratorContext)} 
normally, otherwise, the
+ * operator has some specific logic. The leaf operator produce row first, 
and then call {@link
+ * OpFusionContext#processConsume(List)} metho

[flink] branch master updated (354a8852766 -> f480b6d313f)

2023-07-12 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 354a8852766 [FLINK-32420][connectors/common] Using the 
HeapPriorityQueue to improve the watermark aggregation performance when 
parallelism is high
 new 02be55ab780 [FLINK-32277][table-runtime] Introduce the basic operator 
fusion codegen framework
 new 1f062a38f43 [FLINK-32277][table-runtime] Introduce the base 
FusionStreamOperator for OFCG
 new 6040f480aae [FLINK-32278][table-runtime] Introduce the Calc and 
HashJoin operator spec to support operator fusion codegen
 new f03c26a6f01 [FLINK-32278][table-runtime] Calc and HashJoin operator 
support operator fusion codegen
 new f480b6d313f [FLINK-22734][table-runtime] Fix comments for HashJoin and 
Calc OperatorFusionCodeGen

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../generated/execution_config_configuration.html  |   6 +
 .../table/api/config/ExecutionConfigOptions.java   |   8 +
 .../planner/plan/fusion/OpFusionCodegenSpec.java   | 118 +
 .../plan/fusion/OpFusionCodegenSpecGenerator.java  | 165 ++
 .../table/planner/plan/fusion/OpFusionContext.java |  74 +++
 .../table/planner/plan/nodes/exec/ExecEdge.java|  10 +
 .../table/planner/plan/nodes/exec/ExecNode.java|   2 +-
 .../planner/plan/nodes/exec/ExecNodeBase.java  |  40 ++
 .../plan/nodes/exec/FusionCodegenExecNode.java |  38 ++
 .../plan/nodes/exec/batch/BatchExecCalc.java   |  31 ++
 .../plan/nodes/exec/batch/BatchExecHashJoin.java   |  63 +++
 .../nodes/exec/batch/BatchExecInputAdapter.java|  83 +++
 .../nodes/exec/batch/BatchExecMultipleInput.java   | 158 +-
 .../plan/nodes/exec/common/CommonExecCalc.java |   4 +-
 .../processor/ForwardHashExchangeProcessor.java|   4 +
 .../MultipleInputNodeCreationProcessor.java|  16 +-
 .../flink/table/planner/codegen/CodeGenUtils.scala |  12 +-
 .../planner/codegen/CodeGeneratorContext.scala |  48 ++
 .../table/planner/codegen/ExprCodeGenerator.scala  |   9 -
 .../planner/codegen/GeneratedExpression.scala  |   2 +-
 .../planner/codegen/LongHashJoinGenerator.scala|   2 +-
 .../planner/plan/fusion/FusionCodegenUtil.scala| 243 +
 .../plan/fusion/OpFusionCodegenSpecBase.scala  |  48 ++
 .../fusion/OpFusionCodegenSpecGeneratorBase.scala  | 235 +
 .../OneInputOpFusionCodegenSpecGenerator.scala |  35 ++
 .../SourceOpFusionCodegenSpecGenerator.scala   |  33 ++
 .../TwoInputOpFusionCodegenSpecGenerator.scala |  37 ++
 .../plan/fusion/spec/CalcFusionCodegenSpec.scala   | 109 
 .../fusion/spec/HashJoinFusionCodegenSpec.scala| 563 +
 .../spec/InputAdapterFusionCodegenSpec.scala   |  82 +++
 .../plan/fusion/spec/OutputFusionCodegenSpec.scala |  49 ++
 .../physical/batch/BatchPhysicalHashJoin.scala |   1 +
 .../planner/utils/JavaScalaConversionUtil.scala|   7 +-
 .../plan/nodes/exec/TestingBatchExecNode.java  |  11 +
 .../runtime/batch/sql/MultipleInputITCase.scala|   6 +-
 .../batch/sql/OperatorFusionCodegenITCase.scala| 200 
 .../runtime/hashtable/BaseHybridHashTable.java |  10 +-
 .../table/runtime/hashtable/BinaryHashTable.java   |   9 +-
 .../runtime/hashtable/LongHybridHashTable.java |  64 ++-
 .../operators/fusion/FusionStreamOperatorBase.java |  61 +++
 .../fusion/OperatorFusionCodegenFactory.java   |  50 ++
 .../table/runtime/operators/join/HashJoinType.java |   4 +
 .../BatchMultipleInputStreamOperator.java  |   2 +-
 .../multipleinput/input/InputSelectionHandler.java |  22 +-
 .../{InputSpec.java => InputSelectionSpec.java}|  49 +-
 .../operators/multipleinput/input/InputSpec.java   |  37 +-
 .../input/InputSelectionHandlerTest.java   |   4 +-
 47 files changed, 2749 insertions(+), 115 deletions(-)
 create mode 100644 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java
 create mode 100644 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpecGenerator.java
 create mode 100644 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionContext.java
 create mode 100644 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/FusionCodegenExecNode.java
 create mode 100644 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecInputAdapter.java
 create mode 100644 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/FusionCodegenUtil.scala
 create mode 100644 
flink-table/flink-table-planner/src/m

[flink-kubernetes-operator] branch main updated: [hotfix][docs] Add a 'Build from Source' page

2023-07-12 Thread gyfora
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 98e7a670 [hotfix][docs] Add a 'Build from Source' page
98e7a670 is described below

commit 98e7a6706550c2568869b9733227d9834b3c58c1
Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com>
AuthorDate: Wed Jul 12 15:51:04 2023 +0200

[hotfix][docs] Add a 'Build from Source' page
---
 docs/content/docs/development/guide.md | 24 
 1 file changed, 24 insertions(+)

diff --git a/docs/content/docs/development/guide.md 
b/docs/content/docs/development/guide.md
index 402a5f71..c8cfd163 100644
--- a/docs/content/docs/development/guide.md
+++ b/docs/content/docs/development/guide.md
@@ -28,6 +28,30 @@ under the License.
 
 We gathered a set of best practices here to aid development.
 
+## Build from sources
+
+In order to build the operator you need to [clone the git repository]({{< 
github_repo >}}).
+
+```bash
+git clone {{< github_repo >}}
+```
+
+To build from the command line, it is necessary to have **Maven 3** and a 
**Java Development Kit** (JDK) installed. Please note that Flink Kubernetes 
Operator requires **Java 11**.
+
+To build the project, you can use the following command:
+
+```bash
+mvn clean install
+```
+
+To speed up the build you can:
+- skip the tests by using ' -DskipTests'
+- use Maven's parallel build feature, e.g., 'mvn package -T 1C' will attempt 
to build 1 module for each CPU core in parallel
+
+```bash
+mvn clean install -DskipTests -T 1C
+```
+
 ## Local environment setup
 
 We recommend you install [Docker 
Desktop](https://www.docker.com/products/docker-desktop), 
[minikube](https://minikube.sigs.k8s.io/docs/start/)



[flink] 01/02: [FLINK-32420][refactor] Add the AbstractHeapPriorityQueueElement

2023-07-12 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 224aa2e9ca20ed7ce009a2c2f485c1a232e4c807
Author: 1996fanrui <1996fan...@gmail.com>
AuthorDate: Thu Jun 29 00:36:58 2023 +0800

[FLINK-32420][refactor] Add the AbstractHeapPriorityQueueElement
---
 .../heap/AbstractHeapPriorityQueueElement.java | 39 ++
 .../state/InternalPriorityQueueTestBase.java   | 18 ++
 .../runtime/testutils/statemigration/TestType.java | 18 ++
 .../state/RocksDBCachingPriorityQueueSet.java  | 17 ++
 4 files changed, 47 insertions(+), 45 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueueElement.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueueElement.java
new file mode 100644
index 000..bee5c0eb3ab
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueueElement.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+/** Abstract base class for {@link HeapPriorityQueueElement}. */
+public abstract class AbstractHeapPriorityQueueElement implements 
HeapPriorityQueueElement {
+
+private int internalIndex;
+
+public AbstractHeapPriorityQueueElement() {
+this.internalIndex = HeapPriorityQueueElement.NOT_CONTAINED;
+}
+
+@Override
+public int getInternalIndex() {
+return internalIndex;
+}
+
+@Override
+public void setInternalIndex(int newIndex) {
+this.internalIndex = newIndex;
+}
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
index 4a3b243fca4..0d1387dc4d0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueueElement;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.MathUtils;
 
@@ -343,17 +343,15 @@ public abstract class InternalPriorityQueueTestBase {
 protected abstract boolean testSetSemanticsAgainstDuplicateElements();
 
 /** Payload for usage in the test. */
-protected static class TestElement
-implements HeapPriorityQueueElement, Keyed, 
PriorityComparable {
+protected static class TestElement extends AbstractHeapPriorityQueueElement
+implements Keyed, PriorityComparable {
 
 private final long key;
 private final long priority;
-private int internalIndex;
 
 public TestElement(long key, long priority) {
 this.key = key;
 this.priority = priority;
-this.internalIndex = NOT_CONTAINED;
 }
 
 @Override
@@ -369,16 +367,6 @@ public abstract class InternalPriorityQueueTestBase {
 return priority;
 }
 
-@Override
-public int getInternalIndex() {
-return internalIndex;
-}
-
-@Override
-public void setInternalIndex(int newIndex) {
-internalIndex = newIndex;
-}
-
 @Override
 public boolean equals(Object o) {
 if (this == o) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
index b024f76cd41..8db517f68b3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigrat

[flink] 02/02: [FLINK-32420][connectors/common] Using the HeapPriorityQueue to improve the watermark aggregation performance when parallelism is high

2023-07-12 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 354a8852766b16873b5fad972e4440c1eaa4c40a
Author: 1996fanrui <1996fan...@gmail.com>
AuthorDate: Thu Jun 29 01:06:08 2023 +0800

[FLINK-32420][connectors/common] Using the HeapPriorityQueue to improve the 
watermark aggregation performance when parallelism is high
---
 .../source/coordinator/SourceCoordinator.java  | 76 ++
 .../SourceCoordinatorAlignmentTest.java| 92 ++
 2 files changed, 154 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index f3b4f9dc775..faeac9a8dc4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -38,6 +38,10 @@ import 
org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
 import org.apache.flink.runtime.source.event.RequestSplitEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueueElement;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TemporaryClassLoaderContext;
@@ -46,6 +50,7 @@ import org.apache.flink.util.function.ThrowingRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.ByteArrayInputStream;
@@ -53,7 +58,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -642,9 +646,47 @@ public class SourceCoordinator
 }
 }
 
-private static class WatermarkAggregator {
-private final Map watermarks = new HashMap<>();
-private Watermark aggregatedWatermark = new Watermark(Long.MIN_VALUE);
+/** The watermark element for {@link HeapPriorityQueue}. */
+public static class WatermarkElement extends 
AbstractHeapPriorityQueueElement
+implements PriorityComparable {
+
+private final Watermark watermark;
+
+public WatermarkElement(Watermark watermark) {
+this.watermark = watermark;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o instanceof WatermarkElement) {
+return watermark.equals(((WatermarkElement) o).watermark);
+}
+return false;
+}
+
+@Override
+public int hashCode() {
+return watermark.hashCode();
+}
+
+@Override
+public int comparePriorityTo(@Nonnull WatermarkElement other) {
+return Long.compare(watermark.getTimestamp(), 
other.watermark.getTimestamp());
+}
+}
+
+/** The aggregated watermark is the smallest watermark of all keys. */
+static class WatermarkAggregator {
+
+private final Map watermarks = new HashMap<>();
+
+private final HeapPriorityQueue orderedWatermarks =
+new 
HeapPriorityQueue<>(PriorityComparator.forPriorityComparableObjects(), 10);
+
+private static final Watermark DEFAULT_WATERMARK = new 
Watermark(Long.MIN_VALUE);
 
 /**
  * Update the {@link Watermark} for the given {@code key)}.
@@ -653,17 +695,20 @@ public class SourceCoordinator
  * Optional.empty()} otherwise.
  */
 public Optional aggregate(T key, Watermark watermark) {
-watermarks.put(key, watermark);
-Watermark newMinimum =
-watermarks.values().stream()
-
.min(Comparator.comparingLong(Watermark::getTimestamp))
-.orElseThrow(IllegalStateException::new);
-if (newMinimum.equals(aggregatedWatermark)) {
+Watermark oldAggregatedWatermark = getAggregatedWatermark();
+
+WatermarkElement watermarkElement = new 
WatermarkElement(watermark);
+WatermarkElement oldWatermarkElement = watermarks.put(key, 
watermarkElement);
+if (oldWatermarkElement != null) {
+orderedWatermarks.remove(oldWatermarkElement);
+}
+orderedWatermarks.add(w

[flink] branch master updated (2dfff436c09 -> 354a8852766)

2023-07-12 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 2dfff436c09 [FLINK-32549][network] Tiered storage memory manager 
supports ownership transfer for buffers
 new 224aa2e9ca2 [FLINK-32420][refactor] Add the 
AbstractHeapPriorityQueueElement
 new 354a8852766 [FLINK-32420][connectors/common] Using the 
HeapPriorityQueue to improve the watermark aggregation performance when 
parallelism is high

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../source/coordinator/SourceCoordinator.java  | 76 ++
 .../heap/AbstractHeapPriorityQueueElement.java}| 24 +++---
 .../SourceCoordinatorAlignmentTest.java| 92 ++
 .../state/InternalPriorityQueueTestBase.java   | 18 +
 .../runtime/testutils/statemigration/TestType.java | 18 +
 .../state/RocksDBCachingPriorityQueueSet.java  | 17 +---
 6 files changed, 175 insertions(+), 70 deletions(-)
 copy 
flink-runtime/src/{test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
 => 
main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueueElement.java}
 (62%)



[flink-connector-mongodb] branch main updated: [FLINK-32348][connectors/mongodb][tests] Fix MongoDB tests are flaky and time out

2023-07-12 Thread leonard
This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git


The following commit(s) were added to refs/heads/main by this push:
 new e2babc9  [FLINK-32348][connectors/mongodb][tests] Fix MongoDB tests 
are flaky and time out
e2babc9 is described below

commit e2babc9bcfa501a3f6727f28c677c199f7bfcad5
Author: Jiabao Sun 
AuthorDate: Wed Jul 12 17:12:06 2023 +0800

[FLINK-32348][connectors/mongodb][tests] Fix MongoDB tests are flaky and 
time out

This closes  #13.
---
 .../mongodb/source/enumerator/MongoSourceEnumerator.java| 13 +
 .../source/reader/split/MongoScanSourceSplitReader.java |  1 -
 2 files changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java
index d13a843..951c527 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java
@@ -97,6 +97,15 @@ public class MongoSourceEnumerator
 continue;
 }
 
+// close idle readers
+if (splitAssigner.noMoreSplits() && boundedness == 
Boundedness.BOUNDED) {
+context.signalNoMoreSplits(nextAwaiting);
+awaitingReader.remove();
+LOG.info(
+"All scan splits have been assigned, closing idle 
reader {}", nextAwaiting);
+continue;
+}
+
 Optional split = splitAssigner.getNext();
 if (split.isPresent()) {
 final MongoSourceSplit mongoSplit = split.get();
@@ -104,10 +113,6 @@ public class MongoSourceEnumerator
 awaitingReader.remove();
 LOG.info("Assign split {} to subtask {}", mongoSplit, 
nextAwaiting);
 break;
-} else if (splitAssigner.noMoreSplits() && boundedness == 
Boundedness.BOUNDED) {
-LOG.info("All splits have been assigned");
-
context.registeredReaders().keySet().forEach(context::signalNoMoreSplits);
-break;
 } else {
 // there is no available splits by now, skip assigning
 break;
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
index 4702f94..134fe73 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
@@ -119,7 +119,6 @@ public class MongoScanSourceSplitReader implements 
MongoSourceSplitReader

[flink-kubernetes-operator] branch main updated: [FLINK-32572] Do not collect metrics for finished vertices

2023-07-12 Thread gyfora
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new daeb4b65 [FLINK-32572] Do not collect metrics for finished vertices
daeb4b65 is described below

commit daeb4b6559f0d26b1b0f23be5e8230f895b0a03e
Author: Gyula Fora 
AuthorDate: Tue Jul 4 15:40:37 2023 +0200

[FLINK-32572] Do not collect metrics for finished vertices
---
 .../autoscaler/ScalingMetricCollector.java | 130 ++---
 .../operator/autoscaler/metrics/FlinkMetric.java   |  18 ++-
 .../autoscaler/metrics/ScalingMetrics.java |  21 ++--
 .../operator/autoscaler/topology/JobTopology.java  |  11 +-
 .../operator/autoscaler/topology/VertexInfo.java   |   9 ++
 .../operator/autoscaler/utils/AutoScalerUtils.java |  23 
 .../operator/autoscaler/JobTopologyTest.java   |   4 +-
 .../MetricsCollectionAndEvaluationTest.java|  45 ++-
 .../autoscaler/ScalingMetricCollectorTest.java | 108 +
 .../autoscaler/utils/AutoScalerUtilsTest.java  |  59 ++
 10 files changed, 340 insertions(+), 88 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 4d75b528..3ad1db39 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -19,7 +19,6 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
@@ -30,7 +29,9 @@ import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetrics;
 import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
@@ -66,14 +67,12 @@ import java.util.stream.Collectors;
 public abstract class ScalingMetricCollector {
 private static final Logger LOG = 
LoggerFactory.getLogger(ScalingMetricCollector.class);
 
-private final Map>>>
+private final Map>>
 availableVertexMetricNames = new ConcurrentHashMap<>();
 
 private final Map> 
histories =
 new ConcurrentHashMap<>();
 
-private final Map topologies = new 
ConcurrentHashMap<>();
-
 private Clock clock = Clock.systemDefaultZone();
 
 public CollectedMetricHistory updateMetrics(
@@ -104,8 +103,7 @@ public abstract class ScalingMetricCollector {
 metricHistory.clear();
 metricCollectionStartTs = now;
 }
-var topology =
-getJobTopology(flinkService, resourceID, conf, autoscalerInfo, 
jobDetailsInfo);
+var topology = getJobTopology(flinkService, conf, autoscalerInfo, 
jobDetailsInfo);
 
 // Trim metrics outside the metric window from metrics history
 var metricWindowSize = getMetricWindowSize(conf);
@@ -157,23 +155,16 @@ public abstract class ScalingMetricCollector {
 
 protected JobTopology getJobTopology(
 FlinkService flinkService,
-ResourceID resourceID,
 Configuration conf,
 AutoScalerInfo scalerInfo,
 JobDetailsInfo jobDetailsInfo)
 throws Exception {
 
-var topology =
-topologies.computeIfAbsent(
-resourceID,
-r -> {
-var t = getJobTopology(jobDetailsInfo);
-scalerInfo.updateVertexList(
-t.getVerticesInTopologicalOrder(), 
clock.instant(), conf);
-return t;
-});
-updateKafkaSourceMaxParallelisms(flinkService, conf, 
jobDetailsInfo.getJobId(), topology);
-return topology;
+var t = getJobTopology(jobDetailsInfo);
+