xuyangzhong commented on code in PR #27159:
URL: https://github.com/apache/flink/pull/27159#discussion_r2468262658


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java:
##########
@@ -299,47 +399,185 @@ private static boolean isTableScanSupported(TableScan 
tableScan, int[] lookupKey
             return false;
         }
 
-        int[][] idxsOfAllIndexes = 
getColumnIndicesOfAllTableIndexes(tableSourceTable);
-        if (idxsOfAllIndexes.length == 0) {
-            return false;
-        }
-        // the source must have at least one index, and the join key contains 
one index
-        Set<Integer> lookupKeysSet = 
Arrays.stream(lookupKeys).boxed().collect(Collectors.toSet());
-
-        boolean lookupKeyContainsOneIndex =
-                Arrays.stream(idxsOfAllIndexes)
-                        .peek(idxsOfIndex -> 
Preconditions.checkState(idxsOfIndex.length > 0))
-                        .anyMatch(
-                                idxsOfIndex ->
-                                        Arrays.stream(idxsOfIndex)
-                                                
.allMatch(lookupKeysSet::contains));
-        if (!lookupKeyContainsOneIndex) {
+        Set<Integer> lookupKeySet = 
Arrays.stream(lookupKeys).boxed().collect(Collectors.toSet());
+
+        if (!isLookupKeysContainsIndex(tableSourceTable, lookupKeySet)) {
             return false;
         }
 
         // the lookup source must support async lookup
         return LookupJoinUtil.isAsyncLookup(
                 tableSourceTable,
-                lookupKeysSet,
+                lookupKeySet,
                 null, // hint
                 false, // upsertMaterialize
                 false // preferCustomShuffle
                 );
     }
 
+    private static boolean isLookupKeysContainsIndex(
+            TableSourceTable tableSourceTable, Set<Integer> lookupKeySet) {
+        // the source must have at least one index, and the join key contains 
one index
+        int[][] idxsOfAllIndexes =
+                getAllIndexesColumnsFromTableSchema(
+                        
tableSourceTable.contextResolvedTable().getResolvedSchema());
+        if (idxsOfAllIndexes.length == 0) {
+            return false;
+        }
+
+        final Set<Integer> lookupKeySetPassThroughProjectPushDownSpec;
+        Optional<ProjectPushDownSpec> projectPushDownSpec =
+                Arrays.stream(tableSourceTable.abilitySpecs())
+                        .filter(spec -> spec instanceof ProjectPushDownSpec)
+                        .map(spec -> (ProjectPushDownSpec) spec)
+                        .findFirst();
+
+        if (projectPushDownSpec.isEmpty()) {
+            lookupKeySetPassThroughProjectPushDownSpec = lookupKeySet;
+        } else {
+            Map<Integer, Integer> mapOut2InPos = new HashMap<>();
+            int[][] projectedFields = 
projectPushDownSpec.get().getProjectedFields();
+            for (int i = 0; i < projectedFields.length; i++) {
+                int[] projectedField = projectedFields[i];
+                // skip nested projection push-down spec
+                if (projectedField.length > 1) {
+                    continue;
+                }
+                int input = projectedField[0];
+                mapOut2InPos.put(i, input);
+            }
+
+            lookupKeySetPassThroughProjectPushDownSpec =
+                    lookupKeySet.stream()
+                            .flatMap(out -> 
Stream.ofNullable(mapOut2InPos.get(out)))
+                            .collect(Collectors.toSet());
+        }
+
+        return Arrays.stream(idxsOfAllIndexes)
+                .peek(idxsOfIndex -> 
Preconditions.checkState(idxsOfIndex.length > 0))
+                .anyMatch(
+                        idxsOfIndex ->
+                                Arrays.stream(idxsOfIndex)
+                                        .allMatch(
+                                                
lookupKeySetPassThroughProjectPushDownSpec
+                                                        ::contains));
+    }
+
+    private static boolean areAllSourceAbilitySpecsSupported(
+            TableScan tableScan, SourceAbilitySpec[] sourceAbilitySpecs) {
+        if (!Arrays.stream(sourceAbilitySpecs)
+                .allMatch(spec -> 
ALL_SUPPORTED_ABILITY_SPEC_IN_SOURCE.contains(spec.getClass()))) {
+            return false;
+        }
+
+        Optional<ReadingMetadataSpec> metadataSpec =
+                Arrays.stream(sourceAbilitySpecs)
+                        .filter(spec -> spec instanceof ReadingMetadataSpec)
+                        .map(spec -> (ReadingMetadataSpec) spec)
+                        .findFirst();
+        if (metadataSpec.isPresent() && 
!metadataSpec.get().getMetadataKeys().isEmpty()) {
+            return false;
+        }
+
+        // source with non-deterministic filter pushed down is not supported
+        Optional<FilterPushDownSpec> filterPushDownSpec =
+                Arrays.stream(sourceAbilitySpecs)
+                        .filter(spec -> spec instanceof FilterPushDownSpec)
+                        .map(spec -> (FilterPushDownSpec) spec)
+                        .findFirst();
+        if (filterPushDownSpec.isEmpty()) {
+            return true;
+        }
+
+        List<RexNode> filtersOnSource = 
filterPushDownSpec.get().getPredicates();
+        if (!areAllRexNodeDeterministic(filtersOnSource)) {
+            return false;
+        }
+
+        ChangelogMode changelogMode = getChangelogMode((StreamPhysicalRel) 
tableScan);
+        if (changelogMode.containsOnly(RowKind.INSERT)) {

Review Comment:
   It also allows cdc source. This logic is:
   
   1. if the source is INSERT-ONLY source, then pass and return true
   2. else the source is CDC source
       2.1 if the filter pushed down into the source is not applied on one set 
of upsert key, return false
       2.2 else pass and return true
   
   About the logic why the filter must be applied on one set of upsert key when 
consuming cdc, we have talked before
   https://github.com/apache/flink/pull/27111#discussion_r2454406713, and I 
also raise a [jira](https://issues.apache.org/jira/browse/FLINK-38579) for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to