This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8ab011243a8 [FLINK-35934][table-planner] Add CompiledPlan annotations
to BatchExecValues
8ab011243a8 is described below
commit 8ab011243a8028aeee5bbc73ac56a7426a5c1916
Author: James Hughes <[email protected]>
AuthorDate: Wed Aug 7 07:49:59 2024 -0400
[FLINK-35934][table-planner] Add CompiledPlan annotations to BatchExecValues
---
.../plan/nodes/exec/batch/BatchExecValues.java | 22 ++++++
.../planner/plan/utils/ExecNodeMetadataUtil.java | 2 +
.../ValuesBatchRestoreTest.java} | 13 ++--
.../{stream => common}/ValuesTestPrograms.java | 4 +-
.../plan/nodes/exec/stream/ValuesRestoreTest.java | 1 +
.../values-test/plan/values-test.json | 80 ++++++++++++++++++++++
6 files changed, 114 insertions(+), 8 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java
index b1be0f83954..8417a75cf6b 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.batch;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
@@ -25,15 +26,25 @@ import
org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecValues;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.types.logical.RowType;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.calcite.rex.RexLiteral;
import java.util.List;
/** Batch {@link ExecNode} that read records from given values. */
+@ExecNodeMetadata(
+ name = "batch-exec-values",
+ version = 1,
+ producedTransformations = CommonExecValues.VALUES_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v2_0,
+ minStateVersion = FlinkVersion.v2_0)
public class BatchExecValues extends CommonExecValues implements
BatchExecNode<RowData> {
public BatchExecValues(
@@ -50,6 +61,17 @@ public class BatchExecValues extends CommonExecValues
implements BatchExecNode<R
description);
}
+ @JsonCreator
+ public BatchExecValues(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
persistedConfig,
+ @JsonProperty(FIELD_NAME_TUPLES) List<List<RexLiteral>> tuples,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+ super(id, context, persistedConfig, tuples, outputType, description);
+ }
+
@Override
protected Transformation<RowData> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index ffc4da4a6de..bd6a7d4e1da 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -33,6 +33,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort;
import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCalc;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
@@ -159,6 +160,7 @@ public final class ExecNodeMetadataUtil {
add(BatchExecCalc.class);
add(BatchExecExchange.class);
add(BatchExecSort.class);
+ add(BatchExecValues.class);
}
};
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ValuesBatchRestoreTest.java
similarity index 75%
copy from
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java
copy to
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ValuesBatchRestoreTest.java
index e6ce507ef03..03e8898951a 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ValuesBatchRestoreTest.java
@@ -16,19 +16,20 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
-import
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import
org.apache.flink.table.planner.plan.nodes.exec.common.ValuesTestPrograms;
+import
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
import org.apache.flink.table.test.program.TableTestProgram;
import java.util.Collections;
import java.util.List;
-/** Restore tests for {@link StreamExecValues}. */
-public class ValuesRestoreTest extends RestoreTestBase {
+/** Batch Compiled Plan tests for {@link BatchExecValues}. */
+public class ValuesBatchRestoreTest extends BatchRestoreTestBase {
- public ValuesRestoreTest() {
- super(StreamExecValues.class, AfterRestoreSource.NO_RESTORE);
+ public ValuesBatchRestoreTest() {
+ super(BatchExecValues.class);
}
@Override
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ValuesTestPrograms.java
similarity index 93%
rename from
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesTestPrograms.java
rename to
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ValuesTestPrograms.java
index 792299685f2..300464feb6a 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ValuesTestPrograms.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
@@ -24,7 +24,7 @@ import org.apache.flink.table.test.program.TableTestProgram;
/** {@link TableTestProgram} definitions for testing {@link StreamExecValues}.
*/
public class ValuesTestPrograms {
- static final TableTestProgram VALUES_TEST =
+ public static final TableTestProgram VALUES_TEST =
TableTestProgram.of("values-test", "validates values node")
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java
index e6ce507ef03..47f00ab2b3a 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ValuesRestoreTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
+import
org.apache.flink.table.planner.plan.nodes.exec.common.ValuesTestPrograms;
import
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
import org.apache.flink.table.test.program.TableTestProgram;
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-values_1/values-test/plan/values-test.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-values_1/values-test/plan/values-test.json
new file mode 100644
index 00000000000..7e45b691171
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-values_1/values-test/plan/values-test.json
@@ -0,0 +1,80 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "batch-exec-values_1",
+ "tuples" : [ [ {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 2,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "Hi",
+ "type" : "CHAR(2) NOT NULL"
+ } ], [ {
+ "kind" : "LITERAL",
+ "value" : 3,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 4,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "Hello",
+ "type" : "CHAR(5) NOT NULL"
+ } ] ],
+ "outputType" : "ROW<`EXPR$0` INT NOT NULL, `EXPR$1` INT NOT NULL, `EXPR$2`
VARCHAR(5) NOT NULL>",
+ "description" : "Values(tuples=[[{ 1, 2, _UTF-16LE'Hi' }, { 3, 4,
_UTF-16LE'Hello' }]], values=[EXPR$0, EXPR$1, EXPR$2])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "b",
+ "dataType" : "INT"
+ }, {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` INT NOT NULL, `EXPR$1` INT NOT NULL, `EXPR$2`
VARCHAR(5) NOT NULL>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[EXPR$0, EXPR$1, EXPR$2])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file