This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ab011243a8 [FLINK-35934][table-planner] Add CompiledPlan annotations 
to BatchExecValues
8ab011243a8 is described below

commit 8ab011243a8028aeee5bbc73ac56a7426a5c1916
Author: James Hughes <[email protected]>
AuthorDate: Wed Aug 7 07:49:59 2024 -0400

    [FLINK-35934][table-planner] Add CompiledPlan annotations to BatchExecValues
---
 .../plan/nodes/exec/batch/BatchExecValues.java     | 22 ++++++
 .../planner/plan/utils/ExecNodeMetadataUtil.java   |  2 +
 .../ValuesBatchRestoreTest.java}                   | 13 ++--
 .../{stream => common}/ValuesTestPrograms.java     |  4 +-
 .../plan/nodes/exec/stream/ValuesRestoreTest.java  |  1 +
 .../values-test/plan/values-test.json              | 80 ++++++++++++++++++++++
 6 files changed, 114 insertions(+), 8 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java
index b1be0f83954..8417a75cf6b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.data.RowData;
@@ -25,15 +26,25 @@ import 
org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecValues;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.types.logical.RowType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.rex.RexLiteral;
 
 import java.util.List;
 
 /** Batch {@link ExecNode} that read records from given values. */
+@ExecNodeMetadata(
+        name = "batch-exec-values",
+        version = 1,
+        producedTransformations = CommonExecValues.VALUES_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v2_0,
+        minStateVersion = FlinkVersion.v2_0)
 public class BatchExecValues extends CommonExecValues implements 
BatchExecNode<RowData> {
 
     public BatchExecValues(
@@ -50,6 +61,17 @@ public class BatchExecValues extends CommonExecValues 
implements BatchExecNode<R
                 description);
     }
 
+    @JsonCreator
+    public BatchExecValues(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_TUPLES) List<List<RexLiteral>> tuples,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(id, context, persistedConfig, tuples, outputType, description);
+    }
+
     @Override
     protected Transformation<RowData> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index ffc4da4a6de..bd6a7d4e1da 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort;
 import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCalc;
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
@@ -159,6 +160,7 @@ public final class ExecNodeMetadataUtil {
                     add(BatchExecCalc.class);
                     add(BatchExecExchange.class);
                     add(BatchExecSort.class);
+                    add(BatchExecValues.class);
                 }
             };
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ValuesBatchRestoreTest.java
similarity index 75%
copy from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java
copy to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ValuesBatchRestoreTest.java
index e6ce507ef03..03e8898951a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ValuesBatchRestoreTest.java
@@ -16,19 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.ValuesTestPrograms;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Collections;
 import java.util.List;
 
-/** Restore tests for {@link StreamExecValues}. */
-public class ValuesRestoreTest extends RestoreTestBase {
+/** Batch Compiled Plan tests for {@link BatchExecValues}. */
+public class ValuesBatchRestoreTest extends BatchRestoreTestBase {
 
-    public ValuesRestoreTest() {
-        super(StreamExecValues.class, AfterRestoreSource.NO_RESTORE);
+    public ValuesBatchRestoreTest() {
+        super(BatchExecValues.class);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ValuesTestPrograms.java
similarity index 93%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesTestPrograms.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ValuesTestPrograms.java
index 792299685f2..300464feb6a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ValuesTestPrograms.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
 
 import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;
@@ -24,7 +24,7 @@ import org.apache.flink.table.test.program.TableTestProgram;
 /** {@link TableTestProgram} definitions for testing {@link StreamExecValues}. 
*/
 public class ValuesTestPrograms {
 
-    static final TableTestProgram VALUES_TEST =
+    public static final TableTestProgram VALUES_TEST =
             TableTestProgram.of("values-test", "validates values node")
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink_t")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java
index e6ce507ef03..47f00ab2b3a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.ValuesTestPrograms;
 import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-values_1/values-test/plan/values-test.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-values_1/values-test/plan/values-test.json
new file mode 100644
index 00000000000..7e45b691171
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-values_1/values-test/plan/values-test.json
@@ -0,0 +1,80 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "batch-exec-values_1",
+    "tuples" : [ [ {
+      "kind" : "LITERAL",
+      "value" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "LITERAL",
+      "value" : 2,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "LITERAL",
+      "value" : "Hi",
+      "type" : "CHAR(2) NOT NULL"
+    } ], [ {
+      "kind" : "LITERAL",
+      "value" : 3,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "LITERAL",
+      "value" : 4,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "LITERAL",
+      "value" : "Hello",
+      "type" : "CHAR(5) NOT NULL"
+    } ] ],
+    "outputType" : "ROW<`EXPR$0` INT NOT NULL, `EXPR$1` INT NOT NULL, `EXPR$2` 
VARCHAR(5) NOT NULL>",
+    "description" : "Values(tuples=[[{ 1, 2, _UTF-16LE'Hi' }, { 3, 4, 
_UTF-16LE'Hello' }]], values=[EXPR$0, EXPR$1, EXPR$2])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` INT NOT NULL, `EXPR$1` INT NOT NULL, `EXPR$2` 
VARCHAR(5) NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[EXPR$0, EXPR$1, EXPR$2])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file

Reply via email to