zhangjun0x01 commented on a change in pull request #1936:
URL: https://github.com/apache/iceberg/pull/1936#discussion_r555468396
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -103,4 +124,45 @@ public void testLimitPushDown() {
Assert.assertEquals("should have 1 record", 1, mixedResult.size());
Assert.assertArrayEquals("Should produce the expected records",
mixedResult.get(0), new Object[] {1, "a"});
}
+
+ @Test
+ public void testParallelismOptimize() {
+ sql("INSERT INTO %s VALUES (1,'hello')", TABLE_NAME);
+ sql("INSERT INTO %s VALUES (2,'iceberg')", TABLE_NAME);
+
+ TableEnvironment tenv = getTableEnv();
+
+ // empty table ,parallelism at least 1
+ Table tableEmpty = tenv.sqlQuery(String.format("SELECT * FROM %s",
TABLE_NAME));
+ testParallelismSettingTranslateAndAssert(1, tableEmpty, tenv);
+
+ // make sure to generate 2 CombinedScanTasks
+ org.apache.iceberg.Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
+ Stream<FileScanTask> stream =
StreamSupport.stream(table.newScan().planFiles().spliterator(), false);
+ Optional<FileScanTask> fileScanTaskOptional =
stream.max(Comparator.comparing(FileScanTask::length));
+ Assert.assertTrue(fileScanTaskOptional.isPresent());
+ long maxFileLen = fileScanTaskOptional.get().length();
+ sql("ALTER TABLE %s SET ('read.split.open-file-cost'='1',
'read.split.target-size'='%s')", TABLE_NAME, maxFileLen);
+
+ // 2 splits ,the parallelism is 2
+ Table tableSelect = tenv.sqlQuery(String.format("SELECT * FROM %s",
TABLE_NAME));
+ testParallelismSettingTranslateAndAssert(2, tableSelect, tenv);
+
+ // 2 splits and limit is 1 ,the parallelism is 1
+ Table tableLimit = tenv.sqlQuery(String.format("SELECT * FROM %s LIMIT 1",
TABLE_NAME));
+ testParallelismSettingTranslateAndAssert(1, tableLimit, tenv);
+ }
+
+ private void testParallelismSettingTranslateAndAssert(int expected, Table
table, TableEnvironment tEnv) {
Review comment:
Here we need to get the parallelism of an Operator. It is indeed get by
using the flink internal code, because I have not found other ways to obtain
the parallelism.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]