This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 6fb528f2 [FLINK-30710] Fix invalid field id for nested type in spark catalog 6fb528f2 is described below commit 6fb528f234e6d33bc3797ec23e8a7b7206a7a063 Author: shammon <zjur...@gmail.com> AuthorDate: Tue Jan 17 15:51:40 2023 +0800 [FLINK-30710] Fix invalid field id for nested type in spark catalog This closes #486 --- .../flink/table/store/spark/SparkTypeUtils.java | 7 ++++++- .../flink/table/store/spark/SparkReadITCase.java | 19 +++++++++++++++++++ .../apache/flink/table/store/spark/SparkTypeTest.java | 7 ++++++- 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java index 7264c167..6dc51c2b 100644 --- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java +++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java @@ -50,6 +50,7 @@ import org.apache.spark.sql.types.UserDefinedType; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; /** Utils for spark {@link DataType}. */ public class SparkTypeUtils { @@ -191,6 +192,8 @@ public class SparkTypeUtils { private static class SparkToFlinkTypeVisitor { + private final AtomicInteger currentIndex = new AtomicInteger(0); + static org.apache.flink.table.store.types.DataType visit(DataType type) { return visit(type, new SparkToFlinkTypeVisitor()); } @@ -238,7 +241,9 @@ public class SparkTypeUtils { org.apache.flink.table.store.types.DataType fieldType = fieldResults.get(i).copy(field.nullable()); String comment = field.getComment().getOrElse(() -> null); - newFields.add(new DataField(i, field.name(), fieldType, comment)); + newFields.add( + new DataField( + currentIndex.getAndIncrement(), field.name(), fieldType, comment)); } return new RowType(newFields); diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java index 87e583f5..bab448f8 100644 --- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java +++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java @@ -487,4 +487,23 @@ public class SparkReadITCase extends SparkReadTestBase { .isEqualTo( "[[3,WrappedArray(true, false),2], [4,WrappedArray(true, false, true),3]]"); } + + @Test + public void testCreateNestedField() { + spark.sql( + "CREATE TABLE tablestore.default.nested_table ( a INT, b STRUCT<b1: STRUCT<b11: INT, b12 INT>, b2 BIGINT>)"); + assertThat( + spark.sql("SHOW CREATE TABLE tablestore.default.nested_table") + .collectAsList() + .toString()) + .isEqualTo( + String.format( + "[[CREATE TABLE nested_table (\n" + + " `a` INT,\n" + + " `b` STRUCT<`b1`: STRUCT<`b11`: INT, `b12`: INT>, `b2`: BIGINT>)\n" + + "TBLPROPERTIES(\n" + + " 'path' = '%s')\n" + + "]]", + new Path(warehousePath, "default.db/nested_table"))); + } } diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java index 4ea93aaa..50ab8940 100644 --- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java +++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java @@ -24,6 +24,8 @@ import org.apache.flink.table.store.types.RowType; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; +import java.util.concurrent.atomic.AtomicInteger; + import static org.apache.flink.table.store.spark.SparkTypeUtils.fromFlinkRowType; import static org.apache.flink.table.store.spark.SparkTypeUtils.toFlinkType; import static org.assertj.core.api.Assertions.assertThat; @@ -32,7 +34,10 @@ import static org.assertj.core.api.Assertions.assertThat; public class SparkTypeTest { public static final RowType ALL_TYPES = - RowType.builder() + RowType.builder( + true, + new AtomicInteger( + 1)) // posX and posY have field id 0 and 1, here we start from 2 .field("id", DataTypes.INT().notNull()) .field("name", DataTypes.STRING()) /* optional by default */ .field("salary", DataTypes.DOUBLE().notNull())