yuchuanchen commented on code in PR #20616:
URL: https://github.com/apache/flink/pull/20616#discussion_r952063956


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java:
##########
@@ -963,6 +971,121 @@ public void testStreamReadWithProjectPushDown() throws 
Exception {
         result.getJobClient().get().cancel();
     }
 
+    @Test(timeout = 120000)
+    public void testReadParquetWithNullableComplexType() throws Exception {
+        List<Row> expectedRows;
+        final String catalogName = "hive";
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(3);
+        env.enableCheckpointing(100);
+        StreamTableEnvironment tEnv =
+                HiveTestUtils.createTableEnvInStreamingMode(env, 
SqlDialect.HIVE);
+        tEnv.registerCatalog(catalogName, hiveCatalog);
+        tEnv.useCatalog(catalogName);
+
+        List<Row> rows = generateRows();
+        expectedRows = generateExpectedRows(rows);
+        DataStream<Row> stream =
+                env.addSource(
+                                new FiniteTestSource<>(rows),
+                                new RowTypeInfo(
+                                        new TypeInformation[] {
+                                            Types.INT,
+                                            Types.STRING,
+                                            new RowTypeInfo(
+                                                    new TypeInformation[] {
+                                                        Types.STRING, 
Types.INT, Types.INT
+                                                    },
+                                                    new String[] {"c1", "c2", 
"c3"}),
+                                            new MapTypeInfo<>(Types.STRING, 
Types.STRING),
+                                            Types.OBJECT_ARRAY(Types.STRING),
+                                            Types.STRING
+                                        },
+                                        new String[] {"a", "b", "c", "d", "e", 
"f"}))
+                        .filter((FilterFunction<Row>) value -> true)
+                        .setParallelism(3); // to parallel tasks
+
+        tEnv.createTemporaryView("my_table", stream);
+        insertToSinkAndCompare(tEnv, expectedRows);
+    }
+
+    private static List<Row> generateRows() {
+        List<Row> rows = new ArrayList<>();
+        for (int i = 0; i < 10000; i++) {
+            Map<String, String> e = new HashMap<>();
+            e.put(i + "", i % 2 == 0 ? null : i + "");
+            String[] f = new String[2];
+            f[0] = i % 3 == 0 ? null : i + "";
+            f[1] = i % 3 == 2 ? null : i + "";
+            rows.add(
+                    Row.of(
+                            i,
+                            String.valueOf(i % 10),
+                            Row.of(
+                                    i % 2 == 0 ? null : String.valueOf(i % 10),
+                                    i % 3 == 0 ? null : i % 10,
+                                    i % 5 == 0 ? null : i % 10),
+                            e,
+                            f,
+                            String.valueOf(i % 10)));
+        }
+        return rows;
+    }
+
+    private static List<Row> generateExpectedRows(List<Row> rows) {
+        List<Row> sortedRows, expectedRows;
+        sortedRows = new ArrayList<>();
+        sortedRows.addAll(rows);
+        sortedRows.addAll(rows);
+        sortedRows.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+
+        expectedRows = new ArrayList<>();
+        for (int i = 0; i < sortedRows.size(); i++) {
+            Row rowExpect = Row.copy(sortedRows.get(i));
+            Row nestedRow = (Row) rowExpect.getField(2);
+            if (nestedRow.getField(0) == null
+                    && nestedRow.getField(1) == null
+                    && nestedRow.getField(2) == null) {
+                rowExpect.setField(2, null);
+            }
+            expectedRows.add(rowExpect);
+        }
+        return expectedRows;
+    }
+
+    private static void insertToSinkAndCompare(StreamTableEnvironment tEnv, 
List<Row> expectedRows)
+            throws Exception {
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql(
+                "CREATE TABLE sink_table (a int, b string,"
+                        + "c struct<c1:string, c2:int, c3:int>,"
+                        + "d map<string, string>, e array<string>, f string "
+                        + ") "
+                        + " stored as parquet"
+                        + " TBLPROPERTIES ("
+                        + 
"'sink.partition-commit.policy.kind'='metastore,success-file',"
+                        + "'auto-compaction'='true',"
+                        + "'compaction.file-size' = '128MB',"
+                        + "'sink.rolling-policy.file-size' = '1b'"
+                        + ")");
+        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        String sql =
+                "insert into sink_table /*+ OPTIONS('sink.parallelism' = '3') 
*/"
+                        + " select * from my_table";
+        tEnv.executeSql(sql).await();
+        assertIterator(tEnv.executeSql("select * from sink_table").collect(), 
expectedRows);
+    }
+
+    private static void assertIterator(CloseableIterator<Row> iterator, 
List<Row> expectedRows)

Review Comment:
   good idea



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java:
##########
@@ -963,6 +971,121 @@ public void testStreamReadWithProjectPushDown() throws 
Exception {
         result.getJobClient().get().cancel();
     }
 
+    @Test(timeout = 120000)
+    public void testReadParquetWithNullableComplexType() throws Exception {
+        List<Row> expectedRows;
+        final String catalogName = "hive";
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(3);
+        env.enableCheckpointing(100);
+        StreamTableEnvironment tEnv =
+                HiveTestUtils.createTableEnvInStreamingMode(env, 
SqlDialect.HIVE);
+        tEnv.registerCatalog(catalogName, hiveCatalog);
+        tEnv.useCatalog(catalogName);
+
+        List<Row> rows = generateRows();
+        expectedRows = generateExpectedRows(rows);
+        DataStream<Row> stream =
+                env.addSource(
+                                new FiniteTestSource<>(rows),
+                                new RowTypeInfo(
+                                        new TypeInformation[] {
+                                            Types.INT,
+                                            Types.STRING,
+                                            new RowTypeInfo(
+                                                    new TypeInformation[] {
+                                                        Types.STRING, 
Types.INT, Types.INT
+                                                    },
+                                                    new String[] {"c1", "c2", 
"c3"}),
+                                            new MapTypeInfo<>(Types.STRING, 
Types.STRING),
+                                            Types.OBJECT_ARRAY(Types.STRING),
+                                            Types.STRING
+                                        },
+                                        new String[] {"a", "b", "c", "d", "e", 
"f"}))
+                        .filter((FilterFunction<Row>) value -> true)
+                        .setParallelism(3); // to parallel tasks
+
+        tEnv.createTemporaryView("my_table", stream);
+        insertToSinkAndCompare(tEnv, expectedRows);
+    }
+
+    private static List<Row> generateRows() {
+        List<Row> rows = new ArrayList<>();
+        for (int i = 0; i < 10000; i++) {
+            Map<String, String> e = new HashMap<>();
+            e.put(i + "", i % 2 == 0 ? null : i + "");
+            String[] f = new String[2];
+            f[0] = i % 3 == 0 ? null : i + "";
+            f[1] = i % 3 == 2 ? null : i + "";
+            rows.add(
+                    Row.of(
+                            i,
+                            String.valueOf(i % 10),
+                            Row.of(
+                                    i % 2 == 0 ? null : String.valueOf(i % 10),
+                                    i % 3 == 0 ? null : i % 10,
+                                    i % 5 == 0 ? null : i % 10),
+                            e,
+                            f,
+                            String.valueOf(i % 10)));
+        }
+        return rows;
+    }
+
+    private static List<Row> generateExpectedRows(List<Row> rows) {
+        List<Row> sortedRows, expectedRows;
+        sortedRows = new ArrayList<>();
+        sortedRows.addAll(rows);
+        sortedRows.addAll(rows);
+        sortedRows.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+
+        expectedRows = new ArrayList<>();
+        for (int i = 0; i < sortedRows.size(); i++) {
+            Row rowExpect = Row.copy(sortedRows.get(i));
+            Row nestedRow = (Row) rowExpect.getField(2);
+            if (nestedRow.getField(0) == null
+                    && nestedRow.getField(1) == null
+                    && nestedRow.getField(2) == null) {
+                rowExpect.setField(2, null);
+            }
+            expectedRows.add(rowExpect);
+        }
+        return expectedRows;
+    }
+
+    private static void insertToSinkAndCompare(StreamTableEnvironment tEnv, 
List<Row> expectedRows)

Review Comment:
   good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to