This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cc348d48096 [FLINK-39801][table] Skip serializing empty 
partition/order keys for PTF table args
cc348d48096 is described below

commit cc348d480963ef619b3f55832e8ed2f50944ca88
Author: Ramin Gharib <[email protected]>
AuthorDate: Mon Jun 1 13:24:36 2026 +0200

    [FLINK-39801][table] Skip serializing empty partition/order keys for PTF 
table args
---
 .../nodes/exec/serde/RexNodeJsonDeserializer.java  | 56 ++++++++++++++--------
 .../nodes/exec/serde/RexNodeJsonSerializer.java    | 25 ++++++----
 .../plan/from-changelog-retract-restore.json       |  3 --
 .../plan/process-late-events-restore.json          |  2 -
 .../plan/process-map-state-restore.json            |  1 -
 .../plan/process-multi-input-restore.json          |  2 -
 .../plan/process-multi-state-restore.json          |  1 -
 .../process-partitioned-named-timers-restore.json  |  1 -
 .../plan/process-row-semantic-table-restore.json   |  2 -
 .../process-updating-output-upsert-restore.json    |  1 -
 .../plan/to-changelog-retract-restore.json         |  3 --
 11 files changed, 51 insertions(+), 46 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index 0c6f3ace423..2c063aade64 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -69,6 +69,7 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Function;
 
 import static com.google.common.collect.ImmutableRangeSet.Builder;
 import static com.google.common.collect.ImmutableRangeSet.builder;
@@ -329,30 +330,43 @@ final class RexNodeJsonDeserializer extends 
StdDeserializer<RexNode> {
 
         final int inputIndex = 
jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
 
-        final JsonNode partitionKeysNode = 
jsonNode.required(FIELD_NAME_PARTITION_KEYS);
-        final int[] partitionKeys = new int[partitionKeysNode.size()];
-        for (int i = 0; i < partitionKeysNode.size(); ++i) {
-            partitionKeys[i] = partitionKeysNode.get(i).asInt();
-        }
+        final int[] partitionKeys = deserializeIntArray(jsonNode, 
FIELD_NAME_PARTITION_KEYS);
+        final int[] orderKeys = deserializeIntArray(jsonNode, 
FIELD_NAME_ORDER_KEYS);
+        final SortOrder[] sortOrders =
+                deserializeArray(
+                        jsonNode,
+                        FIELD_NAME_ORDER_DIRECTIONS,
+                        node -> SortOrder.valueOf(node.asText()),
+                        new SortOrder[0]);
 
-        final JsonNode orderKeysNode = 
jsonNode.required(FIELD_NAME_ORDER_KEYS);
-        final int[] orderKeys = new int[orderKeysNode.size()];
-        for (int i = 0; i < orderKeysNode.size(); ++i) {
-            orderKeys[i] = orderKeysNode.get(i).asInt();
-        }
+        return new RexTableArgCall(callType, inputIndex, partitionKeys, 
orderKeys, sortOrders);
+    }
 
-        final JsonNode orderDirectionsNode = 
jsonNode.get(FIELD_NAME_ORDER_DIRECTIONS);
-        final SortOrder[] order;
-        if (orderDirectionsNode != null && !orderDirectionsNode.isEmpty()) {
-            order = new SortOrder[orderDirectionsNode.size()];
-            for (int i = 0; i < orderDirectionsNode.size(); ++i) {
-                order[i] = 
SortOrder.valueOf(orderDirectionsNode.get(i).asText());
-            }
-        } else {
-            order = new SortOrder[0];
-        }
+    private static int[] deserializeIntArray(JsonNode jsonNode, String 
fieldName) {
+        return deserializeListOrEmpty(jsonNode, fieldName, 
JsonNode::asInt).stream()
+                .mapToInt(Integer::intValue)
+                .toArray();
+    }
+
+    private static <T> T[] deserializeArray(
+            JsonNode jsonNode,
+            String fieldName,
+            Function<JsonNode, T> elementDeserializer,
+            T[] emptyArray) {
+        return deserializeListOrEmpty(jsonNode, fieldName, 
elementDeserializer).toArray(emptyArray);
+    }
 
-        return new RexTableArgCall(callType, inputIndex, partitionKeys, 
orderKeys, order);
+    private static <T> List<T> deserializeListOrEmpty(
+            JsonNode jsonNode, String fieldName, Function<JsonNode, T> 
elementDeserializer) {
+        final JsonNode arrayNode = jsonNode.get(fieldName);
+        if (arrayNode == null || arrayNode.isEmpty()) {
+            return List.of();
+        }
+        final List<T> result = new ArrayList<>(arrayNode.size());
+        for (final JsonNode element : arrayNode) {
+            result.add(elementDeserializer.apply(element));
+        }
+        return result;
     }
 
     private static RexNode deserializeCall(JsonNode jsonNode, SerdeContext 
serdeContext)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index a342dc66913..6495778609c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -340,16 +340,23 @@ final class RexNodeJsonSerializer extends 
StdSerializer<RexNode> {
         gen.writeStartObject();
         gen.writeStringField(FIELD_NAME_KIND, KIND_TABLE_ARG_CALL);
         gen.writeNumberField(FIELD_NAME_INPUT_INDEX, 
tableArgCall.getInputIndex());
-        gen.writeFieldName(FIELD_NAME_PARTITION_KEYS);
-        gen.writeArray(tableArgCall.getPartitionKeys(), 0, 
tableArgCall.getPartitionKeys().length);
-        gen.writeFieldName(FIELD_NAME_ORDER_KEYS);
-        gen.writeArray(tableArgCall.getOrderKeys(), 0, 
tableArgCall.getOrderKeys().length);
-        gen.writeFieldName(FIELD_NAME_ORDER_DIRECTIONS);
-        gen.writeStartArray();
-        for (SortOrder order : tableArgCall.getSortOrder()) {
-            gen.writeString(order.name());
+        if (tableArgCall.getPartitionKeys().length > 0) {
+            gen.writeFieldName(FIELD_NAME_PARTITION_KEYS);
+            gen.writeArray(
+                    tableArgCall.getPartitionKeys(), 0, 
tableArgCall.getPartitionKeys().length);
+        }
+        if (tableArgCall.getOrderKeys().length > 0) {
+            gen.writeFieldName(FIELD_NAME_ORDER_KEYS);
+            gen.writeArray(tableArgCall.getOrderKeys(), 0, 
tableArgCall.getOrderKeys().length);
+            if (tableArgCall.getSortOrder().length > 0) {
+                gen.writeFieldName(FIELD_NAME_ORDER_DIRECTIONS);
+                gen.writeStartArray();
+                for (SortOrder order : tableArgCall.getSortOrder()) {
+                    gen.writeString(order.name());
+                }
+                gen.writeEndArray();
+            }
         }
-        gen.writeEndArray();
         serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, 
tableArgCall.getType(), gen);
         gen.writeEndObject();
     }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/plan/from-changelog-retract-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/plan/from-changelog-retract-restore.json
index b795986b6d6..c1004042c1b 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/plan/from-changelog-retract-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/plan/from-changelog-retract-restore.json
@@ -43,9 +43,6 @@
       "operands" : [ {
         "kind" : "TABLE_ARG_CALL",
         "inputIndex" : 0,
-        "partitionKeys" : [ ],
-        "orderKeys" : [ ],
-        "orderDirections" : [ ],
         "type" : "ROW<`id` INT, `op` VARCHAR(2147483647), `name` 
VARCHAR(2147483647)> NOT NULL"
       }, {
         "kind" : "CALL",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json
index 8d22a24a156..4b029426df9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json
@@ -180,8 +180,6 @@
         "kind" : "TABLE_ARG_CALL",
         "inputIndex" : 0,
         "partitionKeys" : [ 0 ],
-        "orderKeys" : [ ],
-        "orderDirections" : [ ],
         "type" : {
           "type" : "ROW",
           "nullable" : false,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-map-state-restore/plan/process-map-state-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-map-state-restore/plan/process-map-state-restore.json
index a799c8a0179..64b93fb20b5 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-map-state-restore/plan/process-map-state-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-map-state-restore/plan/process-map-state-restore.json
@@ -54,7 +54,6 @@
         "kind" : "TABLE_ARG_CALL",
         "inputIndex" : 0,
         "partitionKeys" : [ 0 ],
-        "orderKeys" : [ ],
         "type" : "ROW<`name` VARCHAR(2147483647), `score` INT> NOT NULL"
       }, {
         "kind" : "CALL",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-input-restore/plan/process-multi-input-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-input-restore/plan/process-multi-input-restore.json
index c5eb1b79fb2..6a7c8eae544 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-input-restore/plan/process-multi-input-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-input-restore/plan/process-multi-input-restore.json
@@ -94,13 +94,11 @@
         "kind" : "TABLE_ARG_CALL",
         "inputIndex" : 0,
         "partitionKeys" : [ 0 ],
-        "orderKeys" : [ ],
         "type" : "ROW<`name` VARCHAR(2147483647), `score` INT> NOT NULL"
       }, {
         "kind" : "TABLE_ARG_CALL",
         "inputIndex" : 1,
         "partitionKeys" : [ 0 ],
-        "orderKeys" : [ ],
         "type" : "ROW<`name` VARCHAR(2147483647), `city` VARCHAR(2147483647)> 
NOT NULL"
       }, {
         "kind" : "CALL",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-state-restore/plan/process-multi-state-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-state-restore/plan/process-multi-state-restore.json
index d8482f85423..147160be7fc 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-state-restore/plan/process-multi-state-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-state-restore/plan/process-multi-state-restore.json
@@ -54,7 +54,6 @@
         "kind" : "TABLE_ARG_CALL",
         "inputIndex" : 0,
         "partitionKeys" : [ 0 ],
-        "orderKeys" : [ ],
         "type" : "ROW<`name` VARCHAR(2147483647), `score` INT> NOT NULL"
       }, {
         "kind" : "CALL",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-partitioned-named-timers-restore/plan/process-partitioned-named-timers-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-partitioned-named-timers-restore/plan/process-partitioned-named-timers-restore.json
index 3d2db4fe4ee..ae8b2d16604 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-partitioned-named-timers-restore/plan/process-partitioned-named-timers-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-partitioned-named-timers-restore/plan/process-partitioned-named-timers-restore.json
@@ -159,7 +159,6 @@
         "kind" : "TABLE_ARG_CALL",
         "inputIndex" : 0,
         "partitionKeys" : [ 0 ],
-        "orderKeys" : [ ],
         "type" : {
           "type" : "ROW",
           "nullable" : false,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-row-semantic-table-restore/plan/process-row-semantic-table-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-row-semantic-table-restore/plan/process-row-semantic-table-restore.json
index 781b49d4eff..757655dc968 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-row-semantic-table-restore/plan/process-row-semantic-table-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-row-semantic-table-restore/plan/process-row-semantic-table-restore.json
@@ -40,8 +40,6 @@
       "operands" : [ {
         "kind" : "TABLE_ARG_CALL",
         "inputIndex" : 0,
-        "partitionKeys" : [ ],
-        "orderKeys" : [ ],
         "type" : "ROW<`name` VARCHAR(2147483647), `score` INT> NOT NULL"
       }, {
         "kind" : "LITERAL",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-updating-output-upsert-restore/plan/process-updating-output-upsert-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-updating-output-upsert-restore/plan/process-updating-output-upsert-restore.json
index f0fd3007f04..b2743e6c069 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-updating-output-upsert-restore/plan/process-updating-output-upsert-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-updating-output-upsert-restore/plan/process-updating-output-upsert-restore.json
@@ -59,7 +59,6 @@
         "kind" : "TABLE_ARG_CALL",
         "inputIndex" : 0,
         "partitionKeys" : [ 0 ],
-        "orderKeys" : [ ],
         "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `EXPR$1` BIGINT> 
NOT NULL"
       }, {
         "kind" : "CALL",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
index 324327cad4b..f64d0eb37b3 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
@@ -45,9 +45,6 @@
       "operands" : [ {
         "kind" : "TABLE_ARG_CALL",
         "inputIndex" : 0,
-        "partitionKeys" : [ ],
-        "orderKeys" : [ ],
-        "orderDirections" : [ ],
         "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT> NOT 
NULL"
       }, {
         "kind" : "CALL",

Reply via email to