Copilot commented on code in PR #1549:
URL: https://github.com/apache/fluss/pull/1549#discussion_r2281132677


##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java:
##########
@@ -100,7 +109,73 @@ public List<SourceSplitBase> generateHybridLakeSplits() 
throws Exception {
                                 .createPlanner(
                                         (LakeSource.PlannerContext) 
lakeSnapshotInfo::getSnapshotId)
                                 .plan());
+
+        if (lakeSplits.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        // first, filter lake splits by partition filters
+        if (!partitionFilters.isEmpty()) {
+            List<String> partitionKeys = fileStoreTable.partitionKeys();
+            boolean isSinglePartitionKey = partitionKeys.size() == 1;
+
+            if (isSinglePartitionKey) {
+                // in single key, name == value
+                Set<String> allowedPartitionNames =
+                        partitionFilters.stream()
+                                .map(fieldEqual -> 
String.valueOf(fieldEqual.equalValue))
+                                .collect(Collectors.toSet());
+                lakeSplits
+                        .entrySet()
+                        .removeIf(entry -> 
!allowedPartitionNames.contains(entry.getKey()));
+
+            } else {
+                // multi partition key
+                List<DataField> dataFields = 
tableInfo.getRowType().getFields();
+                Map<Integer, String> partitionKeyIdxToValueMap = new 
java.util.HashMap<>();
+
+                // build partition key idx to expected value map
+                for (FieldEqual fieldEqual : partitionFilters) {
+                    String fieldName = 
dataFields.get(fieldEqual.fieldIndex).getName();
+                    int partitionKeyIdx = partitionKeys.indexOf(fieldName);
+                    if (partitionKeyIdx >= 0) {
+                        partitionKeyIdxToValueMap.put(
+                                partitionKeyIdx, 
String.valueOf(fieldEqual.equalValue));
+                    }
+                }
+
+                if (!partitionKeyIdxToValueMap.isEmpty()) {
+                    lakeSplits
+                            .entrySet()
+                            .removeIf(
+                                    entry -> {
+                                        String partitionName = entry.getKey(); 
// e.g. "20250815$08"
+                                        String[] partitionValues =
+                                                partitionName.split(
+                                                        Pattern.quote(
+                                                                
PARTITION_SPEC_SEPARATOR)); // ["20250815", "08"]
+
+                                        // check if all partition key idx 
match to expected values
+                                        for (Map.Entry<Integer, String> 
mapEntry :
+                                                
partitionKeyIdxToValueMap.entrySet()) {
+                                            int partitionKeyIdx = 
mapEntry.getKey();
+                                            String expectedValue = 
mapEntry.getValue();
+
+                                            // idx out of bounds or value not 
match
+                                            if (partitionKeyIdx >= 
partitionValues.length
+                                                    || !expectedValue.equals(
+                                                            
partitionValues[partitionKeyIdx])) {

Review Comment:
   This bounds check could incorrectly filter out valid partitions. If 
`partitionKeyIdx` is beyond the partition values array, it suggests a mismatch 
between the expected partition structure and actual data, which should be 
handled as an error rather than silently filtering out the partition.
   ```suggestion
                                               if (partitionKeyIdx >= 
partitionValues.length) {
                                                   throw new 
IllegalStateException(
                                                       String.format(
                                                           "Partition key index 
%d is out of bounds for partition '%s'. Expected partition keys: %s, actual 
partition values: %s",
                                                           partitionKeyIdx,
                                                           partitionName,
                                                           partitionKeys,
                                                           
java.util.Arrays.toString(partitionValues)
                                                       )
                                                   );
                                               }
                                               if 
(!expectedValue.equals(partitionValues[partitionKeyIdx])) {
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java:
##########
@@ -540,6 +592,18 @@ private int[] getKeyRowProjection() {
         return projection;
     }
 
+    @Nullable
+    private Object toPaimonLiteralForPartition(
+            com.alibaba.fluss.types.DataType flussDataType, Object equalValue) 
{
+        String typeSummary = flussDataType.toString().toUpperCase();
+        if (typeSummary.contains("CHAR") || typeSummary.contains("STRING")) {

Review Comment:
   The string-based type checking using `contains()` is fragile and could match 
unintended types. Consider using proper type checking with `flussDataType 
instanceof CharType` or `flussDataType instanceof VarCharType` for more robust 
type detection.
   ```suggestion
           // Use instanceof for robust type checking
           if (flussDataType instanceof CharType
                   || flussDataType instanceof VarCharType
                   || flussDataType instanceof StringType) {
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java:
##########
@@ -100,7 +109,73 @@ public List<SourceSplitBase> generateHybridLakeSplits() 
throws Exception {
                                 .createPlanner(
                                         (LakeSource.PlannerContext) 
lakeSnapshotInfo::getSnapshotId)
                                 .plan());
+
+        if (lakeSplits.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        // first, filter lake splits by partition filters
+        if (!partitionFilters.isEmpty()) {
+            List<String> partitionKeys = fileStoreTable.partitionKeys();
+            boolean isSinglePartitionKey = partitionKeys.size() == 1;
+
+            if (isSinglePartitionKey) {
+                // in single key, name == value
+                Set<String> allowedPartitionNames =
+                        partitionFilters.stream()
+                                .map(fieldEqual -> 
String.valueOf(fieldEqual.equalValue))
+                                .collect(Collectors.toSet());
+                lakeSplits
+                        .entrySet()
+                        .removeIf(entry -> 
!allowedPartitionNames.contains(entry.getKey()));
+
+            } else {
+                // multi partition key
+                List<DataField> dataFields = 
tableInfo.getRowType().getFields();
+                Map<Integer, String> partitionKeyIdxToValueMap = new 
java.util.HashMap<>();
+
+                // build partition key idx to expected value map
+                for (FieldEqual fieldEqual : partitionFilters) {
+                    String fieldName = 
dataFields.get(fieldEqual.fieldIndex).getName();
+                    int partitionKeyIdx = partitionKeys.indexOf(fieldName);
+                    if (partitionKeyIdx >= 0) {
+                        partitionKeyIdxToValueMap.put(
+                                partitionKeyIdx, 
String.valueOf(fieldEqual.equalValue));
+                    }
+                }
+
+                if (!partitionKeyIdxToValueMap.isEmpty()) {
+                    lakeSplits
+                            .entrySet()
+                            .removeIf(
+                                    entry -> {
+                                        String partitionName = entry.getKey(); 
// e.g. "20250815$08"
+                                        String[] partitionValues =
+                                                partitionName.split(
+                                                        Pattern.quote(
+                                                                
PARTITION_SPEC_SEPARATOR)); // ["20250815", "08"]

Review Comment:
   The `Pattern.quote()` call is executed for every partition entry during 
filtering. Consider pre-compiling the pattern outside the removeIf lambda to 
avoid repeated regex compilation overhead.
   ```suggestion
                       // Precompile the partition separator pattern to avoid 
repeated regex compilation
                       Pattern partitionSeparatorPattern = 
Pattern.compile(Pattern.quote(PARTITION_SPEC_SEPARATOR));
                       lakeSplits
                               .entrySet()
                               .removeIf(
                                       entry -> {
                                           String partitionName = 
entry.getKey(); // e.g. "20250815$08"
                                           String[] partitionValues =
                                                   
partitionSeparatorPattern.split(partitionName); // ["20250815", "08"]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to