This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fb528f2 [FLINK-30710] Fix invalid field id for nested type in spark 
catalog
6fb528f2 is described below

commit 6fb528f234e6d33bc3797ec23e8a7b7206a7a063
Author: shammon <zjur...@gmail.com>
AuthorDate: Tue Jan 17 15:51:40 2023 +0800

    [FLINK-30710] Fix invalid field id for nested type in spark catalog
    
    This closes #486
---
 .../flink/table/store/spark/SparkTypeUtils.java       |  7 ++++++-
 .../flink/table/store/spark/SparkReadITCase.java      | 19 +++++++++++++++++++
 .../apache/flink/table/store/spark/SparkTypeTest.java |  7 ++++++-
 3 files changed, 31 insertions(+), 2 deletions(-)

diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
index 7264c167..6dc51c2b 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
@@ -50,6 +50,7 @@ import org.apache.spark.sql.types.UserDefinedType;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /** Utils for spark {@link DataType}. */
 public class SparkTypeUtils {
@@ -191,6 +192,8 @@ public class SparkTypeUtils {
 
     private static class SparkToFlinkTypeVisitor {
 
+        private final AtomicInteger currentIndex = new AtomicInteger(0);
+
         static org.apache.flink.table.store.types.DataType visit(DataType 
type) {
             return visit(type, new SparkToFlinkTypeVisitor());
         }
@@ -238,7 +241,9 @@ public class SparkTypeUtils {
                 org.apache.flink.table.store.types.DataType fieldType =
                         fieldResults.get(i).copy(field.nullable());
                 String comment = field.getComment().getOrElse(() -> null);
-                newFields.add(new DataField(i, field.name(), fieldType, 
comment));
+                newFields.add(
+                        new DataField(
+                                currentIndex.getAndIncrement(), field.name(), 
fieldType, comment));
             }
 
             return new RowType(newFields);
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 87e583f5..bab448f8 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -487,4 +487,23 @@ public class SparkReadITCase extends SparkReadTestBase {
                 .isEqualTo(
                         "[[3,WrappedArray(true, false),2], 
[4,WrappedArray(true, false, true),3]]");
     }
+
+    @Test
+    public void testCreateNestedField() {
+        spark.sql(
+                "CREATE TABLE tablestore.default.nested_table ( a INT, b 
STRUCT<b1: STRUCT<b11: INT, b12 INT>, b2 BIGINT>)");
+        assertThat(
+                        spark.sql("SHOW CREATE TABLE 
tablestore.default.nested_table")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo(
+                        String.format(
+                                "[[CREATE TABLE nested_table (\n"
+                                        + "  `a` INT,\n"
+                                        + "  `b` STRUCT<`b1`: STRUCT<`b11`: 
INT, `b12`: INT>, `b2`: BIGINT>)\n"
+                                        + "TBLPROPERTIES(\n"
+                                        + "  'path' = '%s')\n"
+                                        + "]]",
+                                new Path(warehousePath, 
"default.db/nested_table")));
+    }
 }
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
index 4ea93aaa..50ab8940 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.store.types.RowType;
 import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.Test;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import static 
org.apache.flink.table.store.spark.SparkTypeUtils.fromFlinkRowType;
 import static org.apache.flink.table.store.spark.SparkTypeUtils.toFlinkType;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -32,7 +34,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class SparkTypeTest {
 
     public static final RowType ALL_TYPES =
-            RowType.builder()
+            RowType.builder(
+                            true,
+                            new AtomicInteger(
+                                    1)) // posX and posY have field id 0 and 
1, here we start from 2
                     .field("id", DataTypes.INT().notNull())
                     .field("name", DataTypes.STRING()) /* optional by default 
*/
                     .field("salary", DataTypes.DOUBLE().notNull())

Reply via email to