Au-Miner commented on code in PR #26649: URL: https://github.com/apache/flink/pull/26649#discussion_r2149598453
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java: ########## @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.utils; + +import org.apache.flink.table.catalog.Index; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.planner.plan.nodes.exec.spec.DeltaJoinSpec; +import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeltaJoin; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntermediateTableScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.IntermediateRelTable; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.trait.DuplicateChanges; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.runtime.functions.table.lookup.CachingAsyncLookupFunction; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.operators.join.lookup.RetryableAsyncLookupFunctionDelegator; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava33.com.google.common.collect.Sets; + +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.util.mapping.IntPair; + +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** Utils for delta joins. */ +public class DeltaJoinUtil { + + /** + * All supported delta join upstream nodes. Only the following nodes are allowed to exist + * between the delta join and the source. Otherwise, the regular join will not be optimized into + * the delta join. + * + * <p>More physical nodes can be added to support more patterns for delta join. + */ + private static final Set<Class<?>> ALL_SUPPORTED_DELTA_JOIN_UPSTREAM_NODES = + Sets.newHashSet(StreamPhysicalTableSourceScan.class, StreamPhysicalExchange.class); + + private static final String NON_ASYNC_LOOKUP_TABLE_ERROR_MSG_TEMPLATE = + "Table [%s] does not support async lookup."; + + private static final String NON_ASYNC_LOOKUP_TABLE_SOLUTION_MSG = + "If the table supports the option of async lookup joins, " + + "add it to the with parameters of the DDL."; + + /** + * All supported join types during delta join optimization. Only the following join types are + * supported during delta join optimization. Otherwise, the regular join will not be optimized + * into the delta join. + * + * <p>More join types can be added to support more patterns for delta join. + */ + private static final Set<FlinkJoinType> ALL_SUPPORTED_JOIN_TYPES = + Sets.newHashSet(FlinkJoinType.INNER); + + private DeltaJoinUtil() {} + + /** Check whether the {@link StreamPhysicalJoin} can be optimized into a delta join. */ + public static boolean canConvertToDeltaJoin(StreamPhysicalJoin join) { + if (!isJoinTypeSupported(join)) { + return false; + } + + if (!areJoinConditionsSupported(join)) { + return false; + } + + // delta join with eventual consistency will send duplicate changes to downstream nodes + if (!canJoinOutputDuplicateChanges(join)) { + return false; + } + + // currently, only join with append-only inputs is supported + if (!areAllInputsInsertOnly(join)) { + return false; + } + + if (!areAllJoinInputsInWhiteList(join)) { + return false; + } + + return areAllJoinTableScansSupported(join); + } + + /** + * Get the {@link RelOptTable} from the {@link TableScan} recursively on the input of this node. + */ + public static RelOptTable getTableScanRelOptTable(RelNode node) { + return getTableScan(node).getTable(); + } + + /** + * Extract the delta join spec used for {@link StreamPhysicalDeltaJoin} from {@link + * StreamPhysicalJoin}. + */ + public static DeltaJoinSpec getDeltaJoinSpec( + StreamPhysicalJoin join, boolean treatRightAsLookupSide) { + JoinInfo joinInfo = join.analyzeCondition(); + RexBuilder rexBuilder = join.getCluster().getRexBuilder(); + + RexNode condition = RexUtil.composeConjunction(rexBuilder, joinInfo.nonEquiConditions); + Optional<RexNode> remainingCondition = + condition.isAlwaysTrue() ? Optional.empty() : Optional.of(condition); + + final RelOptTable lookupRelOptTable; + List<IntPair> streamToLookupJoinKeys = joinInfo.pairs(); + if (treatRightAsLookupSide) { + lookupRelOptTable = DeltaJoinUtil.getTableScanRelOptTable(join.getRight()); + } else { + streamToLookupJoinKeys = reverseIntPairs(streamToLookupJoinKeys); + lookupRelOptTable = DeltaJoinUtil.getTableScanRelOptTable(join.getLeft()); + } + Preconditions.checkState(lookupRelOptTable instanceof TableSourceTable); + final TableSourceTable lookupTable = (TableSourceTable) lookupRelOptTable; + + Map<Integer, FunctionCallUtils.FunctionParam> allLookupKeys = + analyzerDeltaJoinLookupKeys(streamToLookupJoinKeys); + + return new DeltaJoinSpec( + new TemporalTableSourceSpec(lookupTable), + allLookupKeys, + remainingCondition.orElse(null)); + } + + /** Check if the join type is supported during delta join optimization. */ + public static void validateSupportedJoinType(FlinkJoinType joinType) { + if (!ALL_SUPPORTED_JOIN_TYPES.contains(joinType)) { + throw new IllegalStateException( + String.format( + "The current sql doesn't support to do delta join optimization.\n" + + "Unsupported join type [%s] encountered while attempting to convert to delta join.", + joinType)); + } + } + + /** Get the {@link TableScan} recursively on the input of this node. */ + public static RelOptTable getTable(RelNode node) { + return getTableScan(node).getTable(); + } + + /** + * Get the async lookup function to lookup join this temporal table. Furthermore, this method + * also unwraps the cache and retryable lookup function to access the inner {@link + * AsyncTableFunction}. + */ + public static AsyncTableFunction<?> getUnwrappedAsyncLookupFunction( + RelOptTable temporalTable, Collection<Integer> lookupKeys, ClassLoader classLoader) { + UserDefinedFunction lookupFunction = + LookupJoinUtil.getLookupFunction( + temporalTable, + lookupKeys, + classLoader, + true, // async + null, // retryStrategy + false); // applyCustomShuffle + + if (!(lookupFunction instanceof AsyncTableFunction)) { Review Comment: Have moved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
