This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1b63eefbc92e570572b5e7d92ff3d9130a5dec63
Author: Zhiting Guo <35057824+fre...@users.noreply.github.com>
AuthorDate: Fri Aug 11 11:29:39 2023 +0800

    KYLIN-5782 Duplicated join keys result empty query result
    
    ---------
    Co-authored-by: Zhiting Guo <zhiting....@kyligence.io>
---
 .../org/apache/kylin/query/relnode/KapJoinRel.java | 17 ++++----
 .../apache/kylin/query/relnode/OLAPJoinRel.java    | 48 +++++++++++++---------
 2 files changed, 37 insertions(+), 28 deletions(-)

diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapJoinRel.java 
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapJoinRel.java
index a98ea03827..4a5145d64b 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapJoinRel.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapJoinRel.java
@@ -44,15 +44,14 @@ import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.query.util.ICutContextStrategy;
 import org.apache.kylin.query.util.RexUtils;
 
-import org.apache.kylin.guava30.shaded.common.base.Preconditions;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
-
 import lombok.val;
 
 public class KapJoinRel extends OLAPJoinRel implements KapRel {
@@ -232,9 +231,10 @@ public class KapJoinRel extends OLAPJoinRel implements 
KapRel {
         if (context != null) {
             collectCtxOlapInfoIfExist();
         } else {
-            Map<TblColRef, TblColRef> joinColumns = 
translateJoinColumn(this.getCondition());
+            Map<TblColRef, Set<TblColRef>> joinColumns = 
translateJoinColumn(this.getCondition());
             pushDownJoinColsToSubContexts(joinColumns.entrySet().stream()
-                    .flatMap(e -> Stream.of(e.getKey(), 
e.getValue())).collect(Collectors.toSet()));
+                    .flatMap(e -> Stream.concat(Stream.of(e.getKey()), 
e.getValue().stream()))
+                    .collect(Collectors.toSet()));
         }
     }
 
@@ -251,9 +251,10 @@ public class KapJoinRel extends OLAPJoinRel implements 
KapRel {
             this.context.joins.add(join);
 
         } else {
-            Map<TblColRef, TblColRef> joinColumnsMap = 
translateJoinColumn(this.getCondition());
+            Map<TblColRef, Set<TblColRef>> joinColumnsMap = 
translateJoinColumn(this.getCondition());
             Collection<TblColRef> joinCols = joinColumnsMap.entrySet().stream()
-                    .flatMap(e -> Stream.of(e.getKey(), 
e.getValue())).collect(Collectors.toSet());
+                    .flatMap(e -> Stream.concat(Stream.of(e.getKey()), 
e.getValue().stream()))
+                    .collect(Collectors.toSet());
             joinCols.stream().flatMap(e -> 
e.getSourceColumns().stream()).filter(context::belongToContextTables)
                     .forEach(colRef -> {
                         context.getSubqueryJoinParticipants().add(colRef);
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java
index c62ba3e4ab..1c88f275c9 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java
@@ -20,6 +20,7 @@ package org.apache.kylin.query.relnode;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -55,12 +56,11 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.query.schema.OLAPTable;
 
-import org.apache.kylin.guava30.shaded.common.base.Preconditions;
-
 /**
  *
  */
@@ -192,11 +192,14 @@ public class OLAPJoinRel extends EnumerableJoin 
implements OLAPRel {
             this.context.joins.add(join);
         } else {
             //When join contains subquery, the join-condition fields of 
fact_table will add into context.
-            Map<TblColRef, TblColRef> joinCol = 
translateJoinColumn(this.getCondition());
+            Map<TblColRef, Set<TblColRef>> joinCol = 
translateJoinColumn(this.getCondition());
 
-            for (Map.Entry<TblColRef, TblColRef> columnPair : 
joinCol.entrySet()) {
-                TblColRef fromCol = (rightHasSubquery ? columnPair.getKey() : 
columnPair.getValue());
-                this.context.getSubqueryJoinParticipants().add(fromCol);
+            for (Map.Entry<TblColRef, Set<TblColRef>> columnPair : 
joinCol.entrySet()) {
+                if (rightHasSubquery) {
+                    
this.context.getSubqueryJoinParticipants().add(columnPair.getKey());
+                } else {
+                    
this.context.getSubqueryJoinParticipants().addAll(columnPair.getValue());
+                }
             }
             joinCol.clear();
         }
@@ -227,19 +230,21 @@ public class OLAPJoinRel extends EnumerableJoin 
implements OLAPRel {
     }
 
     protected JoinDesc buildJoin(RexCall condition) {
-        Map<TblColRef, TblColRef> joinColumns = translateJoinColumn(condition);
+        Map<TblColRef, Set<TblColRef>> joinColumns = 
translateJoinColumn(condition);
 
         List<String> pks = new ArrayList<>();
         List<TblColRef> pkCols = new ArrayList<>();
         List<String> fks = new ArrayList<>();
         List<TblColRef> fkCols = new ArrayList<>();
-        for (Map.Entry<TblColRef, TblColRef> columnPair : 
joinColumns.entrySet()) {
+        for (Map.Entry<TblColRef, Set<TblColRef>> columnPair : 
joinColumns.entrySet()) {
             TblColRef fromCol = columnPair.getKey();
-            TblColRef toCol = columnPair.getValue();
-            fks.add(fromCol.getName());
-            fkCols.add(fromCol);
-            pks.add(toCol.getName());
-            pkCols.add(toCol);
+            Set<TblColRef> toCols = columnPair.getValue();
+            for (TblColRef toCol : toCols) {
+                fks.add(fromCol.getName());
+                fkCols.add(fromCol);
+                pks.add(toCol.getName());
+                pkCols.add(toCol);
+            }
         }
 
         JoinDesc join = new JoinDesc();
@@ -251,15 +256,15 @@ public class OLAPJoinRel extends EnumerableJoin 
implements OLAPRel {
         return join;
     }
 
-    protected Map<TblColRef, TblColRef> translateJoinColumn(RexNode condition) 
{
-        Map<TblColRef, TblColRef> joinColumns = new HashMap<>();
+    protected Map<TblColRef, Set<TblColRef>> translateJoinColumn(RexNode 
condition) {
+        Map<TblColRef, Set<TblColRef>> joinColumns = new HashMap<>();
         if (condition instanceof RexCall) {
             translateJoinColumn((RexCall) condition, joinColumns);
         }
         return joinColumns;
     }
 
-    void translateJoinColumn(RexCall condition, Map<TblColRef, TblColRef> 
joinColumns) {
+    void translateJoinColumn(RexCall condition, Map<TblColRef, Set<TblColRef>> 
joinColumns) {
         SqlKind kind = condition.getOperator().getKind();
         if (kind == SqlKind.AND) {
             for (RexNode operand : condition.getOperands()) {
@@ -273,10 +278,13 @@ public class OLAPJoinRel extends EnumerableJoin 
implements OLAPRel {
             RexInputRef op1 = (RexInputRef) operands.get(1);
             TblColRef col1 = columnRowType.getColumnByIndex(op1.getIndex());
             // map left => right
-            if (op0.getIndex() < columnRowTypeLeftRightCut)
-                joinColumns.put(col0, col1);
-            else
-                joinColumns.put(col1, col0);
+            if (op0.getIndex() < columnRowTypeLeftRightCut) {
+                Set<TblColRef> rights = joinColumns.computeIfAbsent(col0, key 
-> new HashSet<>());
+                rights.add(col1);
+            } else {
+                Set<TblColRef> rights = joinColumns.computeIfAbsent(col1, key 
-> new HashSet<>());
+                rights.add(col0);
+            }
         }
     }
 

Reply via email to