Au-Miner commented on code in PR #27159:
URL: https://github.com/apache/flink/pull/27159#discussion_r2465578643
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java:
##########
@@ -299,47 +398,175 @@ private static boolean isTableScanSupported(TableScan
tableScan, int[] lookupKey
return false;
}
- int[][] idxsOfAllIndexes =
getColumnIndicesOfAllTableIndexes(tableSourceTable);
- if (idxsOfAllIndexes.length == 0) {
- return false;
- }
- // the source must have at least one index, and the join key contains
one index
- Set<Integer> lookupKeysSet =
Arrays.stream(lookupKeys).boxed().collect(Collectors.toSet());
-
- boolean lookupKeyContainsOneIndex =
- Arrays.stream(idxsOfAllIndexes)
- .peek(idxsOfIndex ->
Preconditions.checkState(idxsOfIndex.length > 0))
- .anyMatch(
- idxsOfIndex ->
- Arrays.stream(idxsOfIndex)
-
.allMatch(lookupKeysSet::contains));
- if (!lookupKeyContainsOneIndex) {
+ Set<Integer> lookupKeySet =
Arrays.stream(lookupKeys).boxed().collect(Collectors.toSet());
+
+ if (!isLookupKeysContainsIndex(tableSourceTable, lookupKeySet)) {
return false;
}
// the lookup source must support async lookup
return LookupJoinUtil.isAsyncLookup(
tableSourceTable,
- lookupKeysSet,
+ lookupKeySet,
null, // hint
false, // upsertMaterialize
false // preferCustomShuffle
);
}
+ private static boolean isLookupKeysContainsIndex(
+ TableSourceTable tableSourceTable, Set<Integer> lookupKeySet) {
+ // the source must have at least one index, and the join key contains
one index
+ int[][] idxsOfAllIndexes =
+ getAllIndexesColumnsFromTableSchema(
+
tableSourceTable.contextResolvedTable().getResolvedSchema());
+ if (idxsOfAllIndexes.length == 0) {
+ return false;
+ }
+
+ final Set<Integer> lookupKeySetPassThroughProjectPushDownSpec;
+ Optional<ProjectPushDownSpec> projectPushDownSpec =
+ Arrays.stream(tableSourceTable.abilitySpecs())
+ .filter(spec -> spec instanceof ProjectPushDownSpec)
+ .map(spec -> (ProjectPushDownSpec) spec)
+ .findFirst();
+
+ if (projectPushDownSpec.isEmpty()) {
+ lookupKeySetPassThroughProjectPushDownSpec = lookupKeySet;
+ } else {
+ Map<Integer, Integer> mapOut2InPos = new HashMap<>();
+ int[][] projectedFields =
projectPushDownSpec.get().getProjectedFields();
+ for (int i = 0; i < projectedFields.length; i++) {
+ int[] projectedField = projectedFields[i];
+ // skip nested projection push-down spec
+ if (projectedField.length > 1) {
+ continue;
+ }
+ int input = projectedField[0];
+ mapOut2InPos.put(i, input);
+ }
+
+ lookupKeySetPassThroughProjectPushDownSpec =
+ lookupKeySet.stream()
+ .flatMap(out ->
Stream.ofNullable(mapOut2InPos.get(out)))
+ .collect(Collectors.toSet());
+ }
+
+ return Arrays.stream(idxsOfAllIndexes)
+ .peek(idxsOfIndex ->
Preconditions.checkState(idxsOfIndex.length > 0))
+ .anyMatch(
+ idxsOfIndex ->
+ Arrays.stream(idxsOfIndex)
+ .allMatch(
+
lookupKeySetPassThroughProjectPushDownSpec
+ ::contains));
+ }
+
+ private static boolean areAllSourceAbilitySpecsSupported(
+ TableScan tableScan, SourceAbilitySpec[] sourceAbilitySpecs) {
+ if (!Arrays.stream(sourceAbilitySpecs)
+ .allMatch(spec ->
ALL_SUPPORTED_ABILITY_SPEC_IN_SOURCE.contains(spec.getClass()))) {
+ return false;
+ }
+
+ Optional<ReadingMetadataSpec> metadataSpec =
+ Arrays.stream(sourceAbilitySpecs)
+ .filter(spec -> spec instanceof ReadingMetadataSpec)
+ .map(spec -> (ReadingMetadataSpec) spec)
+ .findFirst();
+ if (metadataSpec.isPresent() &&
!metadataSpec.get().getMetadataKeys().isEmpty()) {
+ return false;
+ }
+
+ // source with non-deterministic filter pushed down is not supported
+ Optional<FilterPushDownSpec> filterPushDownSpec =
+ Arrays.stream(sourceAbilitySpecs)
+ .filter(spec -> spec instanceof FilterPushDownSpec)
+ .map(spec -> (FilterPushDownSpec) spec)
+ .findFirst();
+ if (filterPushDownSpec.isEmpty()) {
+ return true;
+ }
+
+ List<RexNode> filtersOnSource =
filterPushDownSpec.get().getPredicates();
+ if (!areAllRexNodeDeterministic(filtersOnSource)) {
+ return false;
+ }
+
+ ChangelogMode changelogMode = getChangelogMode((StreamPhysicalRel)
tableScan);
+ if (changelogMode.containsOnly(RowKind.INSERT)) {
+ return true;
+ }
+
+ FlinkRelMetadataQuery fmq =
+
FlinkRelMetadataQuery.reuseOrCreate(tableScan.getCluster().getMetadataQuery());
+ Set<ImmutableBitSet> upsertKeys = fmq.getUpsertKeys(tableScan);
+ return filtersOnSource.stream()
+ .allMatch(filter -> isFilterOnOneSetOfUpsertKeys(filter,
upsertKeys));
+ }
+
private static TableScan getTableScan(RelNode node) {
node = unwrapNode(node, true);
// support to get table across more nodes if we support more nodes in
// `ALL_SUPPORTED_DELTA_JOIN_UPSTREAM_NODES`
if (node instanceof StreamPhysicalExchange
- || node instanceof StreamPhysicalDropUpdateBefore) {
+ || node instanceof StreamPhysicalDropUpdateBefore
+ || node instanceof StreamPhysicalCalc) {
return getTableScan(node.getInput(0));
}
Preconditions.checkState(node instanceof TableScan);
return (TableScan) node;
}
+ private static boolean allUpstreamCalcSupported(StreamPhysicalJoin join) {
+ List<Calc> calcListFromLeftInput =
collectCalcBetweenJoinAndTableScan(join.getLeft());
Review Comment:
Why do we only need to consider left input here? My understanding is that
the default left input is exchange, so allowing calc and right input calc will
be merged into join. However, if the relevant rules are deleted, may errors
occur?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java:
##########
@@ -227,56 +292,90 @@ private static List<IntPair>
reverseIntPairs(List<IntPair> intPairs) {
.collect(Collectors.toList());
}
- private static int[][] getColumnIndicesOfAllTableIndexes(TableSourceTable
tableSourceTable) {
- List<List<String>> columnsOfIndexes =
getAllIndexesColumnsOfTable(tableSourceTable);
+ private static int[][] getAllIndexesColumnsFromTableSchema(ResolvedSchema
schema) {
+ List<Index> indexes = schema.getIndexes();
+ List<List<String>> columnsOfIndexes =
+
indexes.stream().map(Index::getColumns).collect(Collectors.toList());
int[][] results = new int[columnsOfIndexes.size()][];
for (int i = 0; i < columnsOfIndexes.size(); i++) {
- List<String> fieldNames =
tableSourceTable.getRowType().getFieldNames();
+ List<String> fieldNames = schema.getColumnNames();
results[i] =
columnsOfIndexes.get(i).stream().mapToInt(fieldNames::indexOf).toArray();
}
return results;
}
- private static List<List<String>> getAllIndexesColumnsOfTable(
- TableSourceTable tableSourceTable) {
- ResolvedSchema schema =
tableSourceTable.contextResolvedTable().getResolvedSchema();
- List<Index> indexes = schema.getIndexes();
- return
indexes.stream().map(Index::getColumns).collect(Collectors.toList());
- }
-
private static boolean areJoinConditionsSupported(StreamPhysicalJoin join)
{
JoinInfo joinInfo = join.analyzeCondition();
// there must be one pair of join key
if (joinInfo.pairs().isEmpty()) {
return false;
}
+ JoinSpec joinSpec = join.joinSpec();
+ Optional<RexNode> nonEquiCond = joinSpec.getNonEquiCondition();
+ if (nonEquiCond.isPresent() &&
!RexUtil.isDeterministic(nonEquiCond.get())) {
Review Comment:
What are the reasons for why joinCond and calcCond donot support
nonDeterministic, and can some comments be written to indicate it
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java:
##########
@@ -83,7 +99,22 @@ public class DeltaJoinUtil {
Sets.newHashSet(
StreamPhysicalTableSourceScan.class,
StreamPhysicalExchange.class,
- StreamPhysicalDropUpdateBefore.class);
+ StreamPhysicalDropUpdateBefore.class,
+ StreamPhysicalCalc.class);
+
+ /**
+ * All supported {@link SourceAbilitySpec}s in sources. Only the sources
with the following
+ * {@link SourceAbilitySpec} can be used as delta join sources. Otherwise,
the regular join will
+ * not be optimized into * the delta join.
+ */
+ private static final Set<Class<?>> ALL_SUPPORTED_ABILITY_SPEC_IN_SOURCE =
+ Sets.newHashSet(
+ FilterPushDownSpec.class,
Review Comment:
It would be better to put FilterPushDownSpec and ProjectPushDownSpec together
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]