zhangjun0x01 commented on a change in pull request #1936:
URL: https://github.com/apache/iceberg/pull/1936#discussion_r559301071



##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -103,4 +124,45 @@ public void testLimitPushDown() {
     Assert.assertEquals("should have 1 record", 1, mixedResult.size());
     Assert.assertArrayEquals("Should produce the expected records", 
mixedResult.get(0), new Object[] {1, "a"});
   }
+
+  @Test
+  public void testParallelismOptimize() {
+    sql("INSERT INTO %s  VALUES (1,'hello')", TABLE_NAME);
+    sql("INSERT INTO %s  VALUES (2,'iceberg')", TABLE_NAME);
+
+    TableEnvironment tenv = getTableEnv();
+
+    // empty table ,parallelism at least 1
+    Table tableEmpty = tenv.sqlQuery(String.format("SELECT * FROM %s", 
TABLE_NAME));
+    testParallelismSettingTranslateAndAssert(1, tableEmpty, tenv);
+
+    // make sure to generate 2 CombinedScanTasks
+    org.apache.iceberg.Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
+    Stream<FileScanTask> stream = 
StreamSupport.stream(table.newScan().planFiles().spliterator(), false);
+    Optional<FileScanTask> fileScanTaskOptional =  
stream.max(Comparator.comparing(FileScanTask::length));
+    Assert.assertTrue(fileScanTaskOptional.isPresent());
+    long maxFileLen = fileScanTaskOptional.get().length();
+    sql("ALTER TABLE %s SET ('read.split.open-file-cost'='1', 
'read.split.target-size'='%s')", TABLE_NAME, maxFileLen);
+
+    // 2 splits ,the parallelism is  2
+    Table tableSelect = tenv.sqlQuery(String.format("SELECT * FROM %s", 
TABLE_NAME));
+    testParallelismSettingTranslateAndAssert(2, tableSelect, tenv);
+
+    // 2 splits  and limit is 1 ,the parallelism is  1
+    Table tableLimit = tenv.sqlQuery(String.format("SELECT * FROM %s LIMIT 1", 
TABLE_NAME));
+    testParallelismSettingTranslateAndAssert(1, tableLimit, tenv);
+  }
+
+  private void testParallelismSettingTranslateAndAssert(int expected, Table 
table, TableEnvironment tEnv) {

Review comment:
       Flink 1.12 does refactor `ExecNode`, I found an easier way to assert 
parallelism, I will update it later 

##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -103,4 +124,45 @@ public void testLimitPushDown() {
     Assert.assertEquals("should have 1 record", 1, mixedResult.size());
     Assert.assertArrayEquals("Should produce the expected records", 
mixedResult.get(0), new Object[] {1, "a"});
   }
+
+  @Test
+  public void testParallelismOptimize() {
+    sql("INSERT INTO %s  VALUES (1,'hello')", TABLE_NAME);
+    sql("INSERT INTO %s  VALUES (2,'iceberg')", TABLE_NAME);
+
+    TableEnvironment tenv = getTableEnv();
+
+    // empty table ,parallelism at least 1
+    Table tableEmpty = tenv.sqlQuery(String.format("SELECT * FROM %s", 
TABLE_NAME));
+    testParallelismSettingTranslateAndAssert(1, tableEmpty, tenv);
+
+    // make sure to generate 2 CombinedScanTasks
+    org.apache.iceberg.Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
+    Stream<FileScanTask> stream = 
StreamSupport.stream(table.newScan().planFiles().spliterator(), false);
+    Optional<FileScanTask> fileScanTaskOptional =  
stream.max(Comparator.comparing(FileScanTask::length));
+    Assert.assertTrue(fileScanTaskOptional.isPresent());
+    long maxFileLen = fileScanTaskOptional.get().length();
+    sql("ALTER TABLE %s SET ('read.split.open-file-cost'='1', 
'read.split.target-size'='%s')", TABLE_NAME, maxFileLen);
+
+    // 2 splits ,the parallelism is  2
+    Table tableSelect = tenv.sqlQuery(String.format("SELECT * FROM %s", 
TABLE_NAME));
+    testParallelismSettingTranslateAndAssert(2, tableSelect, tenv);
+
+    // 2 splits  and limit is 1 ,the parallelism is  1
+    Table tableLimit = tenv.sqlQuery(String.format("SELECT * FROM %s LIMIT 1", 
TABLE_NAME));
+    testParallelismSettingTranslateAndAssert(1, tableLimit, tenv);
+  }
+
+  private void testParallelismSettingTranslateAndAssert(int expected, Table 
table, TableEnvironment tEnv) {

Review comment:
       Here we need to get the parallelism of an Operator. It is indeed get by 
using the flink internal code, because I have not found other ways to obtain 
the parallelism.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to