twalthr commented on a change in pull request #18756:
URL: https://github.com/apache/flink/pull/18756#discussion_r809881032



##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
##########
@@ -132,5 +133,26 @@
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public static <T> Collection<ConfigOption<T>> getAllConfigOptions(Class<?> 
configOptionsClass)

Review comment:
       `<T>` doesn't make sense here as we can always assume that we have 
config options of various types. The return type should be 
`Set<ConfigOption<?>>`. Can you move this change into a separate commit. We 
should always split core changes.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
##########
@@ -125,7 +144,13 @@ public void replaceInputEdge(int index, ExecEdge 
newInputEdge) {
     @Override
     public Transformation<T> translateToPlan(Planner planner) {

Review comment:
       make this `final`?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfiguration.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.delegation.PlannerConfiguration;
+
+import java.time.ZoneId;
+import java.util.Optional;
+
+/**
+ * Configuration view that combines the {@link PlannerConfiguration} with the 
{@link
+ * ExecNodeBase#getPersistedConfig()} configuration. The persisted 
configuration of the {@link
+ * ExecNode} which is deserialized from the JSON plan has precedence over the 
{@link
+ * PlannerConfiguration}.
+ */
+@Internal
+public class ExecNodeConfiguration implements ReadableConfig {

Review comment:
       `final`, maybe even package scoped?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
##########
@@ -136,8 +161,16 @@ public void replaceInputEdge(int index, ExecEdge 
newInputEdge) {
         return transformation;
     }
 
-    /** Internal method, translates this node into a Flink operator. */
-    protected abstract Transformation<T> translateToPlanInternal(PlannerBase 
planner);
+    /**
+     * Internal method, translates this node into a Flink operator.
+     *
+     * @param planner The planner.
+     * @param config The configuration that all the nodes implementing this 
method should use,

Review comment:
       Maybe we can a bit more detailed here: `per-ExecNode configuration that 
contains the merged configuration from various layers.`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java
##########
@@ -137,8 +139,8 @@ public BatchExecHashAggregate(
 
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
-                createTransformationName(planner.getTableConfig()),
-                createTransformationDescription(planner.getTableConfig()),
+                createTransformationName(planner.getConfiguration()),

Review comment:
       use `config` here

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java
##########
@@ -100,17 +105,17 @@ public BatchExecPythonGroupWindowAggregate(
 
     @SuppressWarnings("unchecked")
     @Override
-    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+    protected Transformation<RowData> translateToPlanInternal(
+            PlannerBase planner, ExecNodeConfiguration config) {
         final ExecEdge inputEdge = getInputEdges().get(0);
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
         final RowType inputRowType = (RowType) inputEdge.getOutputType();
         final RowType outputRowType = 
InternalTypeInfo.of(getOutputType()).toRowType();
 
         final Tuple2<Long, Long> windowSizeAndSlideSize = 
WindowCodeGenerator.getWindowDef(window);
-        final TableConfig tableConfig = planner.getTableConfig();
         final Configuration mergedConfig =
-                CommonPythonUtil.getMergedConfig(planner.getExecEnv(), 
tableConfig);
+                CommonPythonUtil.getMergedConfig(planner.getExecEnv(), 
planner.getTableConfig());

Review comment:
       also here

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java
##########
@@ -82,23 +87,24 @@ public BatchExecPythonGroupAggregate(
 
     @SuppressWarnings("unchecked")
     @Override
-    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+    protected Transformation<RowData> translateToPlanInternal(
+            PlannerBase planner, ExecNodeConfiguration config) {
         final ExecEdge inputEdge = getInputEdges().get(0);
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
         final RowType inputRowType = (RowType) inputEdge.getOutputType();
         final RowType outputRowType = 
InternalTypeInfo.of(getOutputType()).toRowType();
-        Configuration config =
+        Configuration mergedConfig =
                 CommonPythonUtil.getMergedConfig(planner.getExecEnv(), 
planner.getTableConfig());

Review comment:
       use `config.getTableConfig()` here?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
##########
@@ -265,7 +267,7 @@ public TemporalTableSourceSpec getTemporalTableSourceSpec() 
{
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransformation,
-                createTransformationMeta(LOOKUP_JOIN_TRANSFORMATION, 
planner.getTableConfig()),
+                createTransformationMeta(LOOKUP_JOIN_TRANSFORMATION, 
planner.getConfiguration()),

Review comment:
       use `config`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
##########
@@ -60,31 +62,33 @@
     protected CommonExecWindowTableFunction(
             int id,
             ExecNodeContext context,
+            ReadableConfig config,
             TimeAttributeWindowingStrategy windowingStrategy,
             List<InputProperty> inputProperties,
             RowType outputType,
             String description) {
-        super(id, context, inputProperties, outputType, description);
+        super(id, context, config, inputProperties, outputType, description);
         checkArgument(inputProperties.size() == 1);
         this.windowingStrategy = checkNotNull(windowingStrategy);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+    protected Transformation<RowData> translateToPlanInternal(
+            PlannerBase planner, ExecNodeConfiguration config) {
         final ExecEdge inputEdge = getInputEdges().get(0);
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
         WindowAssigner<TimeWindow> windowAssigner = 
createWindowAssigner(windowingStrategy);
         final ZoneId shiftTimeZone =
                 TimeWindowUtil.getShiftTimeZone(
-                        windowingStrategy.getTimeAttributeType(), 
planner.getTableConfig());
+                        windowingStrategy.getTimeAttributeType(), 
config.getLocalTimeZone());
         WindowTableFunctionOperator windowTableFunctionOperator =
                 new WindowTableFunctionOperator(
                         windowAssigner, 
windowingStrategy.getTimeAttributeIndex(), shiftTimeZone);
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
-                createTransformationMeta(WINDOW_TRANSFORMATION, 
planner.getTableConfig()),
+                createTransformationMeta(WINDOW_TRANSFORMATION, 
planner.getConfiguration()),

Review comment:
       here

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecAggregateBase.java
##########
@@ -43,10 +44,11 @@
     protected StreamExecAggregateBase(
             int id,
             ExecNodeContext context,
+            ReadableConfig config,

Review comment:
       `plannerConfig`?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
##########
@@ -67,6 +70,7 @@ public StreamExecCalc(
     public StreamExecCalc(
             @JsonProperty(FIELD_NAME_ID) int id,
             @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig config,

Review comment:
       `persistedConfig`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ConfigurationJsonDeserializer.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Custom deserializer for {@link Configuration} used for {@link 
ExecNodeBase#getPersistedConfig}.
+ */
+@Internal
+class ConfigurationJsonDeserializer extends StdDeserializer<Configuration> {
+
+    public ConfigurationJsonDeserializer() {
+        super(Configuration.class);
+    }
+
+    @Override
+    public Configuration deserialize(
+            JsonParser jsonParser, DeserializationContext 
deserializationContext)
+            throws IOException {
+        return Configuration.fromMap(
+                jsonParser.readValueAs(new TypeReference<Map<String, 
String>>() {}));

Review comment:
       move the reference to a static constant? is there no built-in reference 
for this commonly used map string-string

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java
##########
@@ -118,7 +127,7 @@ public StreamExecExchange(
 
         final Transformation<RowData> transformation =
                 new PartitionTransformation<>(inputTransform, partitioner);
-        createTransformationMeta(EXCHANGE_TRANSFORMATION, 
planner.getTableConfig())
+        createTransformationMeta(EXCHANGE_TRANSFORMATION, 
planner.getConfiguration())

Review comment:
       `config`

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowTableFunctionJsonPlanTest_jsonplan/testIndividualWindowTVFProcessingTime.out
##########
@@ -173,6 +173,9 @@
   }, {
     "id" : 3,
     "type" : "stream-exec-watermark-assigner_1",
+    "configuration" : {
+      "table.exec.source.idle-timeout" : "0 ms"

Review comment:
       Also can we check one more time whether `table.exec.state.ttl` should be 
in the plan. I think we can leave this mutable.

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExpand.scala
##########
@@ -53,6 +54,7 @@ class BatchPhysicalExpand(
 
   override def translateToExecNode(): ExecNode[_] = {
     new BatchExecExpand(
+      ShortcutUtils.unwrapConfig(this),

Review comment:
       This will not give you planner config but table config. Maybe we should 
rename this method. I think we can also retrieve the planner config from a 
`RelNode`.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecOverAggregate.java
##########
@@ -157,23 +161,22 @@ public BatchExecOverAggregate(
             managedMemory = 0L;
         } else {
             List<OverWindowFrame> windowFrames =
-                    createOverWindowFrames(planner, inputType, sortSpec, 
inputTypeWithConstants);
+                    createOverWindowFrames(

Review comment:
       is `planner` still needed for this method? or can be pass `RelBuilder` 
directly?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
##########
@@ -87,14 +87,18 @@ protected void resolveInputPriorityConflict(ExecNode<?> 
node, int higherInput, i
                 // we should split it into two nodes
                 BatchExecExchange newExchange =
                         new BatchExecExchange(
-                                inputProperty, (RowType) 
exchange.getOutputType(), "Exchange");
+                                configuration,

Review comment:
       is this `plannerConfig`? I guess yes, right?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java
##########
@@ -451,31 +453,35 @@ static boolean isChainableSource(ExecNode<?> node, 
ProcessorContext context) {
     // Multiple Input Nodes Creating
     // 
--------------------------------------------------------------------------------
 
-    private List<ExecNode<?>> createMultipleInputNodes(List<ExecNodeWrapper> 
rootWrappers) {
+    private List<ExecNode<?>> createMultipleInputNodes(
+            ReadableConfig config, List<ExecNodeWrapper> rootWrappers) {
         List<ExecNode<?>> result = new ArrayList<>();
         Map<ExecNodeWrapper, ExecNode<?>> visitedMap = new HashMap<>();
         for (ExecNodeWrapper rootWrapper : rootWrappers) {
-            result.add(getMultipleInputNode(rootWrapper, visitedMap));
+            result.add(getMultipleInputNode(config, rootWrapper, visitedMap));
         }
         return result;
     }
 
     private ExecNode<?> getMultipleInputNode(
-            ExecNodeWrapper wrapper, Map<ExecNodeWrapper, ExecNode<?>> 
visitedMap) {
+            ReadableConfig config,

Review comment:
       call the variables in this class `plannerConfig` to make it more readable

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java
##########
@@ -84,16 +87,16 @@ public BatchExecHashAggregate(
 
     @SuppressWarnings("unchecked")
     @Override
-    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+    protected Transformation<RowData> translateToPlanInternal(
+            PlannerBase planner, ExecNodeConfiguration config) {
         final ExecEdge inputEdge = getInputEdges().get(0);
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
 
         final RowType inputRowType = (RowType) inputEdge.getOutputType();
         final RowType outputRowType = (RowType) getOutputType();
 
-        final TableConfig config = planner.getTableConfig();
-        final CodeGeneratorContext ctx = new CodeGeneratorContext(config);
+        final CodeGeneratorContext ctx = new 
CodeGeneratorContext(planner.getTableConfig());

Review comment:
       use `config.getTableConfig()` here

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowTableFunctionJsonPlanTest_jsonplan/testIndividualWindowTVFProcessingTime.out
##########
@@ -173,6 +173,9 @@
   }, {
     "id" : 3,
     "type" : "stream-exec-watermark-assigner_1",
+    "configuration" : {
+      "table.exec.source.idle-timeout" : "0 ms"

Review comment:
       I don't think that this changes the topology. We can remove it from the 
consumed options.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -176,4 +186,37 @@ public String toString() {
         }
         return new ExecNodeContext(metadata.name(), metadata.version());
     }
+
+    public static <T extends ExecNode<?>> ReadableConfig newPersistedConfig(

Review comment:
       add a JavaDoc




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to