[flink] branch master updated: [FLINK-31035] add warn info to user when NoNodeException happend
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f89be70c2b0 [FLINK-31035] add warn info to user when NoNodeException happend f89be70c2b0 is described below commit f89be70c2b00522ccde0c47f45c2236acdd4ad6d Author: xuyu <11161...@vivo.com> AuthorDate: Tue Jul 11 18:44:56 2023 +0800 [FLINK-31035] add warn info to user when NoNodeException happend --- .../org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java index a5bbb8fa1a9..95c81ac7717 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java @@ -364,6 +364,9 @@ public class ZooKeeperStateHandleStore return client.getChildren().forPath(path); } catch (KeeperException.NoNodeException ignored) { // Concurrent deletion, retry +LOG.debug( +"Unable to get all handles, retrying (ZNode was likely deleted concurrently: {})", +ignored.getMessage()); } } }
[flink] 05/05: [FLINK-22734][table-runtime] Fix comments for HashJoin and Calc OperatorFusionCodeGen
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f480b6d313f64ff5d8d1e6f1d74f2dacc3ea133b Author: fengli AuthorDate: Mon Jun 26 18:05:42 2023 +0800 [FLINK-22734][table-runtime] Fix comments for HashJoin and Calc OperatorFusionCodeGen This closes #22734 --- .../nodes/exec/batch/BatchExecMultipleInput.java | 8 +- .../planner/codegen/GeneratedExpression.scala | 2 +- .../planner/plan/fusion/FusionCodegenUtil.scala| 84 ++- .../fusion/OpFusionCodegenSpecGeneratorBase.scala | 107 .../plan/fusion/spec/CalcFusionCodegenSpec.scala | 41 ++-- .../fusion/spec/HashJoinFusionCodegenSpec.scala| 269 +++-- .../spec/InputAdapterFusionCodegenSpec.scala | 16 +- .../plan/fusion/spec/OutputFusionCodegenSpec.scala | 8 +- .../planner/utils/JavaScalaConversionUtil.scala| 7 +- .../runtime/hashtable/BaseHybridHashTable.java | 2 +- 10 files changed, 322 insertions(+), 222 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMultipleInput.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMultipleInput.java index c7b661609dd..8ffbcbc7fc9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMultipleInput.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMultipleInput.java @@ -44,8 +44,8 @@ import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor; import org.apache.flink.table.runtime.operators.fusion.OperatorFusionCodegenFactory; import org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory; -import org.apache.flink.table.runtime.operators.multipleinput.MultipleInputSpec; import org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapperGenerator; +import org.apache.flink.table.runtime.operators.multipleinput.input.InputSelectionSpec; import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; @@ -140,7 +140,7 @@ public class BatchExecMultipleInput extends ExecNodeBase boolean fusionCodegenEnabled = config.get(TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED); // multiple operator fusion codegen if (fusionCodegenEnabled && allSupportFusionCodegen()) { -final List multipleInputSpecs = new ArrayList<>(); +final List inputSelectionSpecs = new ArrayList<>(); int i = 0; for (ExecEdge inputEdge : originalEdges) { int multipleInputId = i + 1; @@ -165,7 +165,7 @@ public class BatchExecMultipleInput extends ExecNodeBase ExecEdge.builder().source(inputAdapter).target(target).build()); // The input id and read order -multipleInputSpecs.add(new MultipleInputSpec(multipleInputId, readOrders[i])); +inputSelectionSpecs.add(new InputSelectionSpec(multipleInputId, readOrders[i])); i++; } @@ -184,7 +184,7 @@ public class BatchExecMultipleInput extends ExecNodeBase // generate fusion operator Tuple2, Object> multipleOperatorTuple = -FusionCodegenUtil.generateFusionOperator(outputGenerator, multipleInputSpecs); +FusionCodegenUtil.generateFusionOperator(outputGenerator, inputSelectionSpecs); operatorFactory = multipleOperatorTuple._1; Pair parallelismPair = getInputMaxParallelism(inputTransforms); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala index 13469590491..108ea10d1ad 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala @@ -39,7 +39,7 @@ import org.apache.flink.table.types.logical.LogicalType case class GeneratedExpression( resultTerm: String, nullTerm: String, -code: String, +var code: String, resultType: LogicalType, literalValue: Option[Any] = None) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/FusionCodegenUtil.scala b/flink-table/flink-table-planner/src/ma
[flink] 04/05: [FLINK-32278][table-runtime] Calc and HashJoin operator support operator fusion codegen
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f03c26a6f0189b78975828dc77482e5ba1e53641 Author: fengli AuthorDate: Thu Jun 15 19:41:20 2023 +0800 [FLINK-32278][table-runtime] Calc and HashJoin operator support operator fusion codegen --- .../generated/execution_config_configuration.html | 6 + .../table/api/config/ExecutionConfigOptions.java | 8 + .../planner/plan/fusion/OpFusionCodegenSpec.java | 5 +- .../plan/fusion/OpFusionCodegenSpecGenerator.java | 6 + .../plan/nodes/exec/FusionCodegenExecNode.java | 1 + .../plan/nodes/exec/batch/BatchExecCalc.java | 31 .../plan/nodes/exec/batch/BatchExecHashJoin.java | 63 +++ .../nodes/exec/batch/BatchExecInputAdapter.java| 83 + .../nodes/exec/batch/BatchExecMultipleInput.java | 158 +--- .../plan/nodes/exec/common/CommonExecCalc.java | 4 +- .../processor/ForwardHashExchangeProcessor.java| 4 + .../MultipleInputNodeCreationProcessor.java| 16 +- .../planner/plan/fusion/FusionCodegenUtil.scala| 171 ++ .../fusion/OpFusionCodegenSpecGeneratorBase.scala | 35 .../fusion/spec/HashJoinFusionCodegenSpec.scala| 11 +- .../physical/batch/BatchPhysicalHashJoin.scala | 1 + .../runtime/batch/sql/MultipleInputITCase.scala| 6 +- .../batch/sql/OperatorFusionCodegenITCase.scala| 200 + .../runtime/hashtable/BaseHybridHashTable.java | 10 +- .../table/runtime/hashtable/BinaryHashTable.java | 9 +- .../runtime/hashtable/LongHybridHashTable.java | 64 ++- .../operators/fusion/FusionStreamOperatorBase.java | 3 +- 22 files changed, 858 insertions(+), 37 deletions(-) diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html index 32046454a17..6c34ab5a969 100644 --- a/docs/layouts/shortcodes/generated/execution_config_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html @@ -76,6 +76,12 @@ By default no operator is disabled. Long The maximum number of input records can be buffered for MiniBatch. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: MiniBatch only works for non-windowed aggregations currently. If table.exec.mini-batch.enabled is set true, its value must be positive. + +table.exec.operator-fusion-codegen.enabled Batch Streaming +false +Boolean +If true, multiple physical operators will be compiled into a single operator by planner which can improve the performance. + table.exec.rank.topn-cache-size Streaming 1 diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java index e37222cabcf..df5438106d2 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java @@ -389,6 +389,14 @@ public class ExecutionConfigOptions { + "\"SortMergeJoin\", \"HashAgg\", \"SortAgg\".\n" + "By default no operator is disabled."); +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) +public static final ConfigOption TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED = +key("table.exec.operator-fusion-codegen.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"If true, multiple physical operators will be compiled into a single operator by planner which can improve the performance."); + /** @deprecated Use {@link ExecutionOptions#BATCH_SHUFFLE_MODE} instead. */ @Deprecated @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java index 876c44c138d..dc3ed4cce90 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java @@ -82,11 +82,12 @@ publi
[flink] 03/05: [FLINK-32278][table-runtime] Introduce the Calc and HashJoin operator spec to support operator fusion codegen
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6040f480aae92f5449eb548a9eb3e042442b2bbb Author: fengli AuthorDate: Mon Jun 5 20:43:07 2023 +0800 [FLINK-32278][table-runtime] Introduce the Calc and HashJoin operator spec to support operator fusion codegen --- .../plan/fusion/OpFusionCodegenSpecGenerator.java | 4 + .../flink/table/planner/codegen/CodeGenUtils.scala | 2 + .../planner/codegen/CodeGeneratorContext.scala | 48 ++ .../planner/codegen/LongHashJoinGenerator.scala| 2 +- .../fusion/OpFusionCodegenSpecGeneratorBase.scala | 2 +- .../OneInputOpFusionCodegenSpecGenerator.scala | 2 +- .../SourceOpFusionCodegenSpecGenerator.scala} | 12 +- .../TwoInputOpFusionCodegenSpecGenerator.scala | 2 +- .../plan/fusion/spec/CalcFusionCodegenSpec.scala | 96 .../fusion/spec/HashJoinFusionCodegenSpec.scala| 539 + .../spec/InputAdapterFusionCodegenSpec.scala | 82 .../plan/fusion/spec/OutputFusionCodegenSpec.scala | 49 ++ .../table/runtime/operators/join/HashJoinType.java | 4 + 13 files changed, 833 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpecGenerator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpecGenerator.java index 943aecdead9..2b355f6610b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpecGenerator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpecGenerator.java @@ -56,6 +56,10 @@ public abstract class OpFusionCodegenSpecGenerator { this.opFusionCodegenSpec.setup(opFusionContext); } +public RowType getOutputType() { +return outputType; +} + public OpFusionCodegenSpec getOpFusionCodegenSpec() { return opFusionCodegenSpec; } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala index 4bbe41c5591..58214199aa1 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala @@ -58,6 +58,8 @@ object CodeGenUtils { val DEFAULT_TIMEZONE_TERM = "timeZone" + val DEFAULT_INPUT_TERM = "in" + val DEFAULT_INPUT1_TERM = "in1" val DEFAULT_INPUT2_TERM = "in2" diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala index 6ba1e1483c9..69ede520531 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala @@ -90,6 +90,14 @@ class CodeGeneratorContext(val tableConfig: ReadableConfig, val classLoader: Cla private val reusablePerRecordStatements: mutable.LinkedHashSet[String] = mutable.LinkedHashSet[String]() + // set of statements that will be added only for operator fusion codegen process method + private val reusableFusionCodegenProcessStatements: mutable.TreeMap[Int, String] = +mutable.TreeMap[Int, String]() + + // set of statements that will be added only for operator fusion codegen endInput method + private val reusableFusionCodegenEndInputStatements: mutable.TreeMap[Int, String] = +mutable.TreeMap[Int, String]() + // map of initial input unboxing expressions that will be added only once // (inputTerm, index) -> expr val reusableInputUnboxingExprs: mutable.Map[(String, Int), GeneratedExpression] = @@ -305,6 +313,38 @@ class CodeGeneratorContext(val tableConfig: ReadableConfig, val classLoader: Cla reusableCleanupStatements.mkString("", "\n", "\n") } + /** + * @return + * code block of statements that need to be placed in the getInputs() method of + * [FusionStreamOperator] + */ + def reuseFusionProcessCode(): String = { +reusableFusionCodegenProcessStatements.values.mkString(",\n") + } + + /** + * @return + * code block of statements that need to be placed in the endInput() method of + * [BoundedMultiInput] + */ + def reuseFusionEndInputCode(inputId: String): String = { +val endInputCode = reusableFusionCodegenEndInputStatements + .map { +case (id, code) => s""" + |case $id: +
[flink] 02/05: [FLINK-32277][table-runtime] Introduce the base FusionStreamOperator for OFCG
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 1f062a38f43cdc9fdbaab3c68e1ec1c1ffdb88b5 Author: fengli AuthorDate: Mon Jun 5 20:41:25 2023 +0800 [FLINK-32277][table-runtime] Introduce the base FusionStreamOperator for OFCG --- .../operators/fusion/FusionStreamOperatorBase.java | 60 ++ .../fusion/OperatorFusionCodegenFactory.java | 50 ++ .../BatchMultipleInputStreamOperator.java | 2 +- .../multipleinput/input/InputSelectionHandler.java | 22 +--- .../{InputSpec.java => InputSelectionSpec.java}| 49 -- .../operators/multipleinput/input/InputSpec.java | 37 ++--- .../input/InputSelectionHandlerTest.java | 4 +- 7 files changed, 153 insertions(+), 71 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/fusion/FusionStreamOperatorBase.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/fusion/FusionStreamOperatorBase.java new file mode 100644 index 000..3ebe474fac0 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/fusion/FusionStreamOperatorBase.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.fusion; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.InputSelectable; +import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.table.data.RowData; + +/** + * Base {@link MultipleInputStreamOperator} to handle multiple operator fusion codegen in table + * module. + */ +public abstract class FusionStreamOperatorBase extends AbstractStreamOperatorV2 +implements MultipleInputStreamOperator, InputSelectable, BoundedMultiInput { + +protected final StreamOperatorParameters parameters; + +public FusionStreamOperatorBase( +StreamOperatorParameters parameters, int numberOfInputs) { +super(parameters, numberOfInputs); +this.parameters = parameters; +} + +public org.apache.flink.streaming.runtime.tasks.StreamTask getContainingTask() { +return parameters.getContainingTask(); +} + +public long computeMemorySize(double operatorFraction) { +final double memFraction = +parameters +.getStreamConfig() +.getManagedMemoryFractionOperatorUseCaseOfSlot( + org.apache.flink.core.memory.ManagedMemoryUseCase.OPERATOR, + getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration(), +getRuntimeContext().getUserCodeClassLoader()); +return getContainingTask() +.getEnvironment() +.getMemoryManager() +.computeMemorySize(memFraction * operatorFraction); +} +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/fusion/OperatorFusionCodegenFactory.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/fusion/OperatorFusionCodegenFactory.java new file mode 100644 index 000..052b49c8220 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/fusion/OperatorFusionCodegenFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the Lic
[flink] 01/05: [FLINK-32277][table-runtime] Introduce the basic operator fusion codegen framework
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 02be55ab780186a4b738a331e17c436ad2543de3 Author: fengli AuthorDate: Mon Jun 5 20:33:54 2023 +0800 [FLINK-32277][table-runtime] Introduce the basic operator fusion codegen framework --- .../planner/plan/fusion/OpFusionCodegenSpec.java | 117 .../plan/fusion/OpFusionCodegenSpecGenerator.java | 155 +++ .../table/planner/plan/fusion/OpFusionContext.java | 74 .../table/planner/plan/nodes/exec/ExecEdge.java| 10 + .../table/planner/plan/nodes/exec/ExecNode.java| 2 +- .../planner/plan/nodes/exec/ExecNodeBase.java | 40 .../plan/nodes/exec/FusionCodegenExecNode.java | 37 .../flink/table/planner/codegen/CodeGenUtils.scala | 10 +- .../table/planner/codegen/ExprCodeGenerator.scala | 9 - .../plan/fusion/OpFusionCodegenSpecBase.scala | 48 + .../fusion/OpFusionCodegenSpecGeneratorBase.scala | 211 + .../OneInputOpFusionCodegenSpecGenerator.scala | 35 .../TwoInputOpFusionCodegenSpecGenerator.scala | 37 .../plan/nodes/exec/TestingBatchExecNode.java | 11 ++ 14 files changed, 785 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java new file mode 100644 index 000..876c44c138d --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.fusion; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.planner.codegen.CodeGeneratorContext; +import org.apache.flink.table.planner.codegen.ExprCodeGenerator; +import org.apache.flink.table.planner.codegen.GeneratedExpression; + +import java.util.List; +import java.util.Set; + +/** An interface for those physical operators that support operator fusion codegen. */ +@Internal +public interface OpFusionCodegenSpec { + +/** + * Initializes the operator spec. Sets access to the context. This method must be called before + * doProduce and doConsume related methods. + */ +void setup(OpFusionContext opFusionContext); + +/** Prefix used in the current operator's variable names. */ +String variablePrefix(); + +/** + * The subset of column index those should be evaluated before this operator. + * + * We will use this to insert some code to access those columns that are actually used by + * current operator before calling doProcessConsume(). + */ +Set usedInputColumns(int inputId); + +/** + * Specific inputId of current operator needed {@link RowData} type, this is used to notify the + * upstream operator wrap the proper {@link RowData} we needed before call doProcessConsume + * method. For example, HashJoin build side need {@link BinaryRowData}. + */ +Class getInputRowDataClass(int inputId); + +/** + * Every operator need one {@link CodeGeneratorContext} to store the context needed during + * operator fusion codegen. + */ +CodeGeneratorContext getCodeGeneratorContext(); + +/** Get the {@link ExprCodeGenerator} used by this operator during operator fusion codegen, . */ +ExprCodeGenerator getExprCodeGenerator(); + +/** + * Generate the Java source code to process rows, only the leaf operator in operator DAG need to + * generate the code which produce the row, other middle operators just call its input {@link + * OpFusionCodegenSpecGenerator#processProduce(CodeGeneratorContext)} normally, otherwise, the + * operator has some specific logic. The leaf operator produce row first, and then call {@link + * OpFusionContext#processConsume(List)} metho
[flink] branch master updated (354a8852766 -> f480b6d313f)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 354a8852766 [FLINK-32420][connectors/common] Using the HeapPriorityQueue to improve the watermark aggregation performance when parallelism is high new 02be55ab780 [FLINK-32277][table-runtime] Introduce the basic operator fusion codegen framework new 1f062a38f43 [FLINK-32277][table-runtime] Introduce the base FusionStreamOperator for OFCG new 6040f480aae [FLINK-32278][table-runtime] Introduce the Calc and HashJoin operator spec to support operator fusion codegen new f03c26a6f01 [FLINK-32278][table-runtime] Calc and HashJoin operator support operator fusion codegen new f480b6d313f [FLINK-22734][table-runtime] Fix comments for HashJoin and Calc OperatorFusionCodeGen The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../generated/execution_config_configuration.html | 6 + .../table/api/config/ExecutionConfigOptions.java | 8 + .../planner/plan/fusion/OpFusionCodegenSpec.java | 118 + .../plan/fusion/OpFusionCodegenSpecGenerator.java | 165 ++ .../table/planner/plan/fusion/OpFusionContext.java | 74 +++ .../table/planner/plan/nodes/exec/ExecEdge.java| 10 + .../table/planner/plan/nodes/exec/ExecNode.java| 2 +- .../planner/plan/nodes/exec/ExecNodeBase.java | 40 ++ .../plan/nodes/exec/FusionCodegenExecNode.java | 38 ++ .../plan/nodes/exec/batch/BatchExecCalc.java | 31 ++ .../plan/nodes/exec/batch/BatchExecHashJoin.java | 63 +++ .../nodes/exec/batch/BatchExecInputAdapter.java| 83 +++ .../nodes/exec/batch/BatchExecMultipleInput.java | 158 +- .../plan/nodes/exec/common/CommonExecCalc.java | 4 +- .../processor/ForwardHashExchangeProcessor.java| 4 + .../MultipleInputNodeCreationProcessor.java| 16 +- .../flink/table/planner/codegen/CodeGenUtils.scala | 12 +- .../planner/codegen/CodeGeneratorContext.scala | 48 ++ .../table/planner/codegen/ExprCodeGenerator.scala | 9 - .../planner/codegen/GeneratedExpression.scala | 2 +- .../planner/codegen/LongHashJoinGenerator.scala| 2 +- .../planner/plan/fusion/FusionCodegenUtil.scala| 243 + .../plan/fusion/OpFusionCodegenSpecBase.scala | 48 ++ .../fusion/OpFusionCodegenSpecGeneratorBase.scala | 235 + .../OneInputOpFusionCodegenSpecGenerator.scala | 35 ++ .../SourceOpFusionCodegenSpecGenerator.scala | 33 ++ .../TwoInputOpFusionCodegenSpecGenerator.scala | 37 ++ .../plan/fusion/spec/CalcFusionCodegenSpec.scala | 109 .../fusion/spec/HashJoinFusionCodegenSpec.scala| 563 + .../spec/InputAdapterFusionCodegenSpec.scala | 82 +++ .../plan/fusion/spec/OutputFusionCodegenSpec.scala | 49 ++ .../physical/batch/BatchPhysicalHashJoin.scala | 1 + .../planner/utils/JavaScalaConversionUtil.scala| 7 +- .../plan/nodes/exec/TestingBatchExecNode.java | 11 + .../runtime/batch/sql/MultipleInputITCase.scala| 6 +- .../batch/sql/OperatorFusionCodegenITCase.scala| 200 .../runtime/hashtable/BaseHybridHashTable.java | 10 +- .../table/runtime/hashtable/BinaryHashTable.java | 9 +- .../runtime/hashtable/LongHybridHashTable.java | 64 ++- .../operators/fusion/FusionStreamOperatorBase.java | 61 +++ .../fusion/OperatorFusionCodegenFactory.java | 50 ++ .../table/runtime/operators/join/HashJoinType.java | 4 + .../BatchMultipleInputStreamOperator.java | 2 +- .../multipleinput/input/InputSelectionHandler.java | 22 +- .../{InputSpec.java => InputSelectionSpec.java}| 49 +- .../operators/multipleinput/input/InputSpec.java | 37 +- .../input/InputSelectionHandlerTest.java | 4 +- 47 files changed, 2749 insertions(+), 115 deletions(-) create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpec.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionCodegenSpecGenerator.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/fusion/OpFusionContext.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/FusionCodegenExecNode.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecInputAdapter.java create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/FusionCodegenUtil.scala create mode 100644 flink-table/flink-table-planner/src/m
[flink-kubernetes-operator] branch main updated: [hotfix][docs] Add a 'Build from Source' page
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 98e7a670 [hotfix][docs] Add a 'Build from Source' page 98e7a670 is described below commit 98e7a6706550c2568869b9733227d9834b3c58c1 Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com> AuthorDate: Wed Jul 12 15:51:04 2023 +0200 [hotfix][docs] Add a 'Build from Source' page --- docs/content/docs/development/guide.md | 24 1 file changed, 24 insertions(+) diff --git a/docs/content/docs/development/guide.md b/docs/content/docs/development/guide.md index 402a5f71..c8cfd163 100644 --- a/docs/content/docs/development/guide.md +++ b/docs/content/docs/development/guide.md @@ -28,6 +28,30 @@ under the License. We gathered a set of best practices here to aid development. +## Build from sources + +In order to build the operator you need to [clone the git repository]({{< github_repo >}}). + +```bash +git clone {{< github_repo >}} +``` + +To build from the command line, it is necessary to have **Maven 3** and a **Java Development Kit** (JDK) installed. Please note that Flink Kubernetes Operator requires **Java 11**. + +To build the project, you can use the following command: + +```bash +mvn clean install +``` + +To speed up the build you can: +- skip the tests by using ' -DskipTests' +- use Maven's parallel build feature, e.g., 'mvn package -T 1C' will attempt to build 1 module for each CPU core in parallel + +```bash +mvn clean install -DskipTests -T 1C +``` + ## Local environment setup We recommend you install [Docker Desktop](https://www.docker.com/products/docker-desktop), [minikube](https://minikube.sigs.k8s.io/docs/start/)
[flink] 01/02: [FLINK-32420][refactor] Add the AbstractHeapPriorityQueueElement
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 224aa2e9ca20ed7ce009a2c2f485c1a232e4c807 Author: 1996fanrui <1996fan...@gmail.com> AuthorDate: Thu Jun 29 00:36:58 2023 +0800 [FLINK-32420][refactor] Add the AbstractHeapPriorityQueueElement --- .../heap/AbstractHeapPriorityQueueElement.java | 39 ++ .../state/InternalPriorityQueueTestBase.java | 18 ++ .../runtime/testutils/statemigration/TestType.java | 18 ++ .../state/RocksDBCachingPriorityQueueSet.java | 17 ++ 4 files changed, 47 insertions(+), 45 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueueElement.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueueElement.java new file mode 100644 index 000..bee5c0eb3ab --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueueElement.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +/** Abstract base class for {@link HeapPriorityQueueElement}. */ +public abstract class AbstractHeapPriorityQueueElement implements HeapPriorityQueueElement { + +private int internalIndex; + +public AbstractHeapPriorityQueueElement() { +this.internalIndex = HeapPriorityQueueElement.NOT_CONTAINED; +} + +@Override +public int getInternalIndex() { +return internalIndex; +} + +@Override +public void setInternalIndex(int newIndex) { +this.internalIndex = newIndex; +} +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java index 4a3b243fca4..0d1387dc4d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java @@ -25,7 +25,7 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueueElement; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.MathUtils; @@ -343,17 +343,15 @@ public abstract class InternalPriorityQueueTestBase { protected abstract boolean testSetSemanticsAgainstDuplicateElements(); /** Payload for usage in the test. */ -protected static class TestElement -implements HeapPriorityQueueElement, Keyed, PriorityComparable { +protected static class TestElement extends AbstractHeapPriorityQueueElement +implements Keyed, PriorityComparable { private final long key; private final long priority; -private int internalIndex; public TestElement(long key, long priority) { this.key = key; this.priority = priority; -this.internalIndex = NOT_CONTAINED; } @Override @@ -369,16 +367,6 @@ public abstract class InternalPriorityQueueTestBase { return priority; } -@Override -public int getInternalIndex() { -return internalIndex; -} - -@Override -public void setInternalIndex(int newIndex) { -internalIndex = newIndex; -} - @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java index b024f76cd41..8db517f68b3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigrat
[flink] 02/02: [FLINK-32420][connectors/common] Using the HeapPriorityQueue to improve the watermark aggregation performance when parallelism is high
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 354a8852766b16873b5fad972e4440c1eaa4c40a Author: 1996fanrui <1996fan...@gmail.com> AuthorDate: Thu Jun 29 01:06:08 2023 +0800 [FLINK-32420][connectors/common] Using the HeapPriorityQueue to improve the watermark aggregation performance when parallelism is high --- .../source/coordinator/SourceCoordinator.java | 76 ++ .../SourceCoordinatorAlignmentTest.java| 92 ++ 2 files changed, 154 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index f3b4f9dc775..faeac9a8dc4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -38,6 +38,10 @@ import org.apache.flink.runtime.source.event.ReportedWatermarkEvent; import org.apache.flink.runtime.source.event.RequestSplitEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent; +import org.apache.flink.runtime.state.PriorityComparable; +import org.apache.flink.runtime.state.PriorityComparator; +import org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueueElement; +import org.apache.flink.runtime.state.heap.HeapPriorityQueue; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TemporaryClassLoaderContext; @@ -46,6 +50,7 @@ import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.ByteArrayInputStream; @@ -53,7 +58,6 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -642,9 +646,47 @@ public class SourceCoordinator } } -private static class WatermarkAggregator { -private final Map watermarks = new HashMap<>(); -private Watermark aggregatedWatermark = new Watermark(Long.MIN_VALUE); +/** The watermark element for {@link HeapPriorityQueue}. */ +public static class WatermarkElement extends AbstractHeapPriorityQueueElement +implements PriorityComparable { + +private final Watermark watermark; + +public WatermarkElement(Watermark watermark) { +this.watermark = watermark; +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o instanceof WatermarkElement) { +return watermark.equals(((WatermarkElement) o).watermark); +} +return false; +} + +@Override +public int hashCode() { +return watermark.hashCode(); +} + +@Override +public int comparePriorityTo(@Nonnull WatermarkElement other) { +return Long.compare(watermark.getTimestamp(), other.watermark.getTimestamp()); +} +} + +/** The aggregated watermark is the smallest watermark of all keys. */ +static class WatermarkAggregator { + +private final Map watermarks = new HashMap<>(); + +private final HeapPriorityQueue orderedWatermarks = +new HeapPriorityQueue<>(PriorityComparator.forPriorityComparableObjects(), 10); + +private static final Watermark DEFAULT_WATERMARK = new Watermark(Long.MIN_VALUE); /** * Update the {@link Watermark} for the given {@code key)}. @@ -653,17 +695,20 @@ public class SourceCoordinator * Optional.empty()} otherwise. */ public Optional aggregate(T key, Watermark watermark) { -watermarks.put(key, watermark); -Watermark newMinimum = -watermarks.values().stream() - .min(Comparator.comparingLong(Watermark::getTimestamp)) -.orElseThrow(IllegalStateException::new); -if (newMinimum.equals(aggregatedWatermark)) { +Watermark oldAggregatedWatermark = getAggregatedWatermark(); + +WatermarkElement watermarkElement = new WatermarkElement(watermark); +WatermarkElement oldWatermarkElement = watermarks.put(key, watermarkElement); +if (oldWatermarkElement != null) { +orderedWatermarks.remove(oldWatermarkElement); +} +orderedWatermarks.add(w
[flink] branch master updated (2dfff436c09 -> 354a8852766)
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 2dfff436c09 [FLINK-32549][network] Tiered storage memory manager supports ownership transfer for buffers new 224aa2e9ca2 [FLINK-32420][refactor] Add the AbstractHeapPriorityQueueElement new 354a8852766 [FLINK-32420][connectors/common] Using the HeapPriorityQueue to improve the watermark aggregation performance when parallelism is high The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../source/coordinator/SourceCoordinator.java | 76 ++ .../heap/AbstractHeapPriorityQueueElement.java}| 24 +++--- .../SourceCoordinatorAlignmentTest.java| 92 ++ .../state/InternalPriorityQueueTestBase.java | 18 + .../runtime/testutils/statemigration/TestType.java | 18 + .../state/RocksDBCachingPriorityQueueSet.java | 17 +--- 6 files changed, 175 insertions(+), 70 deletions(-) copy flink-runtime/src/{test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java => main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueueElement.java} (62%)
[flink-connector-mongodb] branch main updated: [FLINK-32348][connectors/mongodb][tests] Fix MongoDB tests are flaky and time out
This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git The following commit(s) were added to refs/heads/main by this push: new e2babc9 [FLINK-32348][connectors/mongodb][tests] Fix MongoDB tests are flaky and time out e2babc9 is described below commit e2babc9bcfa501a3f6727f28c677c199f7bfcad5 Author: Jiabao Sun AuthorDate: Wed Jul 12 17:12:06 2023 +0800 [FLINK-32348][connectors/mongodb][tests] Fix MongoDB tests are flaky and time out This closes #13. --- .../mongodb/source/enumerator/MongoSourceEnumerator.java| 13 + .../source/reader/split/MongoScanSourceSplitReader.java | 1 - 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java index d13a843..951c527 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java @@ -97,6 +97,15 @@ public class MongoSourceEnumerator continue; } +// close idle readers +if (splitAssigner.noMoreSplits() && boundedness == Boundedness.BOUNDED) { +context.signalNoMoreSplits(nextAwaiting); +awaitingReader.remove(); +LOG.info( +"All scan splits have been assigned, closing idle reader {}", nextAwaiting); +continue; +} + Optional split = splitAssigner.getNext(); if (split.isPresent()) { final MongoSourceSplit mongoSplit = split.get(); @@ -104,10 +113,6 @@ public class MongoSourceEnumerator awaitingReader.remove(); LOG.info("Assign split {} to subtask {}", mongoSplit, nextAwaiting); break; -} else if (splitAssigner.noMoreSplits() && boundedness == Boundedness.BOUNDED) { -LOG.info("All splits have been assigned"); - context.registeredReaders().keySet().forEach(context::signalNoMoreSplits); -break; } else { // there is no available splits by now, skip assigning break; diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java index 4702f94..134fe73 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java @@ -119,7 +119,6 @@ public class MongoScanSourceSplitReader implements MongoSourceSplitReader
[flink-kubernetes-operator] branch main updated: [FLINK-32572] Do not collect metrics for finished vertices
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new daeb4b65 [FLINK-32572] Do not collect metrics for finished vertices daeb4b65 is described below commit daeb4b6559f0d26b1b0f23be5e8230f895b0a03e Author: Gyula Fora AuthorDate: Tue Jul 4 15:40:37 2023 +0200 [FLINK-32572] Do not collect metrics for finished vertices --- .../autoscaler/ScalingMetricCollector.java | 130 ++--- .../operator/autoscaler/metrics/FlinkMetric.java | 18 ++- .../autoscaler/metrics/ScalingMetrics.java | 21 ++-- .../operator/autoscaler/topology/JobTopology.java | 11 +- .../operator/autoscaler/topology/VertexInfo.java | 9 ++ .../operator/autoscaler/utils/AutoScalerUtils.java | 23 .../operator/autoscaler/JobTopologyTest.java | 4 +- .../MetricsCollectionAndEvaluationTest.java| 45 ++- .../autoscaler/ScalingMetricCollectorTest.java | 108 + .../autoscaler/utils/AutoScalerUtilsTest.java | 59 ++ 10 files changed, 340 insertions(+), 88 deletions(-) diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java index 4d75b528..3ad1db39 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java @@ -19,7 +19,6 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; @@ -30,7 +29,9 @@ import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetrics; import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; @@ -66,14 +67,12 @@ import java.util.stream.Collectors; public abstract class ScalingMetricCollector { private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricCollector.class); -private final Map>>> +private final Map>> availableVertexMetricNames = new ConcurrentHashMap<>(); private final Map> histories = new ConcurrentHashMap<>(); -private final Map topologies = new ConcurrentHashMap<>(); - private Clock clock = Clock.systemDefaultZone(); public CollectedMetricHistory updateMetrics( @@ -104,8 +103,7 @@ public abstract class ScalingMetricCollector { metricHistory.clear(); metricCollectionStartTs = now; } -var topology = -getJobTopology(flinkService, resourceID, conf, autoscalerInfo, jobDetailsInfo); +var topology = getJobTopology(flinkService, conf, autoscalerInfo, jobDetailsInfo); // Trim metrics outside the metric window from metrics history var metricWindowSize = getMetricWindowSize(conf); @@ -157,23 +155,16 @@ public abstract class ScalingMetricCollector { protected JobTopology getJobTopology( FlinkService flinkService, -ResourceID resourceID, Configuration conf, AutoScalerInfo scalerInfo, JobDetailsInfo jobDetailsInfo) throws Exception { -var topology = -topologies.computeIfAbsent( -resourceID, -r -> { -var t = getJobTopology(jobDetailsInfo); -scalerInfo.updateVertexList( -t.getVerticesInTopologicalOrder(), clock.instant(), conf); -return t; -}); -updateKafkaSourceMaxParallelisms(flinkService, conf, jobDetailsInfo.getJobId(), topology); -return topology; +var t = getJobTopology(jobDetailsInfo); +