zhangjun0x01 commented on a change in pull request #1936:
URL: https://github.com/apache/iceberg/pull/1936#discussion_r562446908
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -106,6 +109,51 @@ public void testResiduals() throws Exception {
assertRecords(runWithFilter(filter, "where dt='2020-03-20' and id=123"),
expectedRecords, SCHEMA);
}
+ @Test
+ public void testInferedParallelism() throws IOException {
+ Table table = catalog.createTable(TableIdentifier.of("default", "t"),
SCHEMA, SPEC);
+
+ TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
+ FlinkInputFormat flinkInputFormat =
FlinkSource.forRowData().tableLoader(tableLoader).table(table).buildFormat();
+ ScanContext scanContext = ScanContext.builder().build();
+
+ // Empty table ,parallelism at least 1
+ int parallelism = FlinkSource.forRowData()
+ .flinkConf(new Configuration())
+ .inferParallelism(flinkInputFormat, scanContext);
+ Assert.assertEquals("Should produce the expected parallelism.", 1,
parallelism);
+
+ List<Record> writeRecords = RandomGenericData.generate(SCHEMA, 2, 0L);
+ writeRecords.get(0).set(1, 123L);
+ writeRecords.get(0).set(2, "2020-03-20");
+ writeRecords.get(1).set(1, 456L);
+ writeRecords.get(1).set(2, "2020-03-20");
+
+ GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, TEMPORARY_FOLDER);
+
+ DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0),
writeRecords);
+ DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0),
+ RandomGenericData.generate(SCHEMA, 2, 0L));
+ helper.appendToTable(dataFile1, dataFile2);
+
+ // Make sure to generate 2 CombinedScanTasks
+ long maxFileLen = Math.max(dataFile1.fileSizeInBytes(),
dataFile2.fileSizeInBytes());
+ executeSQL(String
+ .format("ALTER TABLE t SET ('read.split.open-file-cost'='1',
'read.split.target-size'='%s')", maxFileLen));
+
+ // 2 splits ,the parallelism is 2
+ parallelism = FlinkSource.forRowData()
Review comment:
I add the test case , but I did not split these test cases into
different methods because they share a lot of code. If they are split, there
may be a lot of duplicate code.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]