zhangjun0x01 commented on a change in pull request #1936:
URL: https://github.com/apache/iceberg/pull/1936#discussion_r562450034
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -106,6 +109,51 @@ public void testResiduals() throws Exception {
assertRecords(runWithFilter(filter, "where dt='2020-03-20' and id=123"),
expectedRecords, SCHEMA);
}
+ @Test
+ public void testInferedParallelism() throws IOException {
+ Table table = catalog.createTable(TableIdentifier.of("default", "t"),
SCHEMA, SPEC);
+
+ TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
+ FlinkInputFormat flinkInputFormat =
FlinkSource.forRowData().tableLoader(tableLoader).table(table).buildFormat();
+ ScanContext scanContext = ScanContext.builder().build();
+
+ // Empty table ,parallelism at least 1
+ int parallelism = FlinkSource.forRowData()
+ .flinkConf(new Configuration())
+ .inferParallelism(flinkInputFormat, scanContext);
+ Assert.assertEquals("Should produce the expected parallelism.", 1,
parallelism);
+
+ List<Record> writeRecords = RandomGenericData.generate(SCHEMA, 2, 0L);
+ writeRecords.get(0).set(1, 123L);
+ writeRecords.get(0).set(2, "2020-03-20");
+ writeRecords.get(1).set(1, 456L);
+ writeRecords.get(1).set(2, "2020-03-20");
+
+ GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, TEMPORARY_FOLDER);
+
+ DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0),
writeRecords);
+ DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0),
+ RandomGenericData.generate(SCHEMA, 2, 0L));
Review comment:
At first I copy the code from `TestFlinkScanSql#testResiduals` method to
gererate 2 datafiles.
I think there should be no problem about the partition. `writeRecords` will
write to the partition `2020-03-20`, and randomly generate two records into the
partition `2020-03-21`.
But for simplicity, I modified the code to randomly generate two records for
each partition.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]