Copilot commented on code in PR #2585:
URL: https://github.com/apache/fluss/pull/2585#discussion_r2772741974


##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java:
##########
@@ -1117,6 +1119,77 @@ void 
testUnionReadPartitionsExistInPaimonButExpiredInFluss() throws Exception {
         jobClient.cancel().get();
     }
 
+    @Test
+    void testPartitionFilterOnPartitionedTableInBatch() throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "stream_pk_table_full";
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        // create table & write initial data
+        long tableId = preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, true, 
bucketLogEndOffset);
+
+        // wait unit records have been synced
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, true);
+
+        // check the status of replica after synced
+        assertReplicaStatus(bucketLogEndOffset);
+        // Stop tiering to ensure we read from Paimon snapshot
+        jobClient.cancel().get();
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+        Map<Long, String> partitionNameById =
+                
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath);
+        Iterator<String> partitionIterator =
+                partitionNameById.values().stream().sorted().iterator();
+        String partition1 = partitionIterator.next();
+        String partition2 = partitionIterator.next();
+        String query =
+                String.format(
+                        "SELECT c1, c2, c3, c19 FROM %s WHERE c19 between '%s' 
and '%s'",
+                        tableName, partition1, partition2);
+
+        assertThat(batchTEnv.explainSql(query))
+                .contains(
+                        String.format(
+                                "TableSourceScan(table=[[testcatalog, %s, %s, "
+                                        + "filter=[and(>=(c19, _UTF-16LE'%s'), 
<=(c19, _UTF-16LE'%s'))], "
+                                        + "project=[c1, c2, c3, c19]]], "
+                                        + "fields=[c1, c2, c3, c19])",
+                                DEFAULT_DB, tableName, partition1, 
partition2));
+
+        CloseableIterator<Row> collected = 
batchTEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        String.format("+I[false, 1, 2, %s]", partition1),
+                        String.format("+I[true, 10, 20, %s]", partition1),
+                        String.format("+I[false, 1, 2, %s]", partition2),
+                        String.format("+I[true, 10, 20, %s]", partition2));
+        List<String> actual = collectBatchRows(collected);
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+
+        query =
+                String.format(
+                        "SELECT c1, c2, c3, c19 FROM %s WHERE c19 = '%s'", 
tableName, partition2);
+
+        assertThat(batchTEnv.explainSql(query))
+                .contains(
+                        String.format(
+                                "TableSourceScan(table=[[testcatalog, %s, %s, "
+                                        + "filter=[=(c19, 
_UTF-16LE'%s':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
+                                        + "project=[c1, c2, c3, c19]]], "
+                                        + "fields=[c1, c2, c3, c19])",
+                                DEFAULT_DB, tableName, partition2));
+        collected = batchTEnv.executeSql(query).collect();

Review Comment:
   The CloseableIterator obtained at line 1162 is not closed before the 
variable is reassigned at line 1184, leading to a resource leak. Similarly, the 
iterator at line 1184 is never closed. Consider using try-with-resources for 
each iterator or explicitly closing them after use.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java:
##########
@@ -110,6 +110,14 @@ public static List<String> collectRowsWithTimeout(
                 Duration.ofMinutes(1));
     }
 
+    public static List<String> collectBatchRows(CloseableIterator<Row> 
iterator) {
+        List<String> actual = new ArrayList<>();
+        while (iterator.hasNext()) {
+            actual.add(iterator.next().toString());

Review Comment:
   The `collectBatchRows` method does not close the CloseableIterator after 
consuming it. This could lead to resource leaks. Consider either: 1) closing 
the iterator at the end of the method similar to how `collectRowsWithTimeout` 
handles it, or 2) documenting that the caller is responsible for closing the 
iterator and ensure callers actually close it (e.g., using try-with-resources).
   ```suggestion
           try {
               while (iterator.hasNext()) {
                   actual.add(iterator.next().toString());
               }
           } finally {
               try {
                   iterator.close();
               } catch (Exception ignored) {
                   // ignore close exceptions in test utility
               }
   ```



##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java:
##########
@@ -1117,6 +1119,77 @@ void 
testUnionReadPartitionsExistInPaimonButExpiredInFluss() throws Exception {
         jobClient.cancel().get();
     }
 
+    @Test
+    void testPartitionFilterOnPartitionedTableInBatch() throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "stream_pk_table_full";
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        // create table & write initial data
+        long tableId = preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, true, 
bucketLogEndOffset);
+
+        // wait unit records have been synced

Review Comment:
   Typo in comment: "wait unit records" should be "wait until records".
   ```suggestion
           // wait until records have been synced
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to