This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5486cdd0a4a Implement ASOF LEFT Join
5486cdd0a4a is described below

commit 5486cdd0a4a8788edeb844492cb0565aa5f72239
Author: Weihao Li <[email protected]>
AuthorDate: Mon May 19 19:39:54 2025 +0800

    Implement ASOF LEFT Join
---
 .../relational/it/db/it/IoTDBAsofJoinTableIT.java  |  79 ++++++++++
 .../db/it/IoTDBMultiTAGsWithAttributesTableIT.java | 131 +++++++++++++++-
 ...java => AbstractAsofMergeSortJoinOperator.java} |  55 +------
 .../relational/AsofMergeSortInnerJoinOperator.java | 168 ++++-----------------
 .../relational/AsofMergeSortLeftJoinOperator.java  | 140 +++++++++++++++++
 .../plan/planner/TableOperatorGenerator.java       |  23 +++
 .../optimizations/PushPredicateIntoTableScan.java  |   3 +
 .../plan/relational/sql/parser/AstBuilder.java     |   7 +-
 .../plan/relational/analyzer/AsofJoinTest.java     |   3 -
 9 files changed, 415 insertions(+), 194 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBAsofJoinTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBAsofJoinTableIT.java
index 10f510c5001..eb48343a107 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBAsofJoinTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBAsofJoinTableIT.java
@@ -143,4 +143,83 @@ public class IoTDBAsofJoinTableIT {
         retArray,
         DATABASE_NAME);
   }
+
+  @Test
+  public void leftJoinTest() {
+    retArray =
+        new String[] {
+          "2020-01-01T00:00:01.000Z,d1,1,null,null,null,",
+          "2020-01-01T00:00:03.000Z,d1,3,2020-01-01T00:00:03.000Z,d1,30,",
+          "2020-01-01T00:00:05.000Z,d1,5,2020-01-01T00:00:05.000Z,d2,50,",
+          "2020-01-01T00:00:08.000Z,d2,8,2020-01-01T00:00:05.000Z,d2,50,"
+        };
+    tableResultSetEqualTest(
+        "SELECT t1.time as time1, t1.device as device1, t1.value as value1, \n"
+            + "       t2.time as time2, t2.device as device2, t2.value as 
value2 \n"
+            + "FROM \n"
+            + "table1 t1 ASOF LEFT JOIN table2 t2\n"
+            + "ON\n"
+            + "t1.time>=t2.time\n"
+            + "order by time1",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    retArray =
+        new String[] {
+          "2020-01-01T00:00:01.000Z,d1,1,null,null,null,",
+          "2020-01-01T00:00:03.000Z,d1,3,2020-01-01T00:00:03.000Z,d1,30,",
+          "2020-01-01T00:00:05.000Z,d1,5,2020-01-01T00:00:03.000Z,d1,30,",
+          "2020-01-01T00:00:08.000Z,d2,8,2020-01-01T00:00:05.000Z,d2,50,"
+        };
+    tableResultSetEqualTest(
+        "SELECT t1.time as time1, t1.device as device1, t1.value as value1, \n"
+            + "       t2.time as time2, t2.device as device2, t2.value as 
value2 \n"
+            + "FROM \n"
+            + "table1 t1 ASOF LEFT JOIN table2 t2\n"
+            + "ON\n"
+            + "t1.device=t2.device AND t1.time>=t2.time\n"
+            + "order by time1",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    retArray =
+        new String[] {
+          "2020-01-01T00:00:01.000Z,d1,1,2020-01-01T00:00:02.000Z,d1,20,",
+          "2020-01-01T00:00:03.000Z,d1,3,2020-01-01T00:00:04.000Z,d2,40,",
+          "2020-01-01T00:00:05.000Z,d1,5,null,null,null,",
+          "2020-01-01T00:00:08.000Z,d2,8,null,null,null,"
+        };
+    tableResultSetEqualTest(
+        "SELECT t1.time as time1, t1.device as device1, t1.value as value1, \n"
+            + "       t2.time as time2, t2.device as device2, t2.value as 
value2 \n"
+            + "FROM \n"
+            + "table1 t1 ASOF LEFT JOIN table2 t2\n"
+            + "ON\n"
+            + "t1.time<t2.time\n"
+            + "order by time1",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    retArray =
+        new String[] {
+          "2020-01-01T00:00:01.000Z,d1,1,2020-01-01T00:00:02.000Z,d1,20,",
+          "2020-01-01T00:00:03.000Z,d1,3,null,null,null,",
+          "2020-01-01T00:00:05.000Z,d1,5,null,null,null,",
+          "2020-01-01T00:00:08.000Z,d2,8,null,null,null,"
+        };
+    tableResultSetEqualTest(
+        "SELECT t1.time as time1, t1.device as device1, t1.value as value1, \n"
+            + "       t2.time as time2, t2.device as device2, t2.value as 
value2 \n"
+            + "FROM \n"
+            + "table1 t1 ASOF LEFT JOIN table2 t2\n"
+            + "ON\n"
+            + "t1.device=t2.device AND t1.time<t2.time\n"
+            + "order by time1",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
index ea0628dd696..b0b70d5f0df 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
@@ -2511,7 +2511,7 @@ public class IoTDBMultiTAGsWithAttributesTableIT {
   }
 
   @Test
-  public void asofJoinTest() {
+  public void asofInnerJoinTest() {
     expectedHeader = new String[] {"time", "device", "level", "time", 
"device", "level"};
     retArray =
         new String[] {
@@ -2625,6 +2625,130 @@ public class IoTDBMultiTAGsWithAttributesTableIT {
         DATABASE_NAME);
   }
 
+  @Test
+  public void asofLeftJoinTest() {
+    expectedHeader = new String[] {"time", "device", "level", "time", 
"device", "level"};
+    retArray =
+        new String[] {
+          "1970-01-01T00:00:00.000Z,d1,l1,null,null,null,",
+          "1971-01-01T00:00:00.000Z,d1,l1,1970-01-01T00:00:00.100Z,d1,l5,",
+          "1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,d1,l1,",
+          "1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,d999,null,",
+          "1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,null,l999,",
+          "1970-01-01T00:00:00.020Z,d1,l2,1970-01-01T00:00:00.010Z,d11,l11,",
+          "1971-01-01T00:00:00.100Z,d1,l2,1971-01-01T00:00:00.000Z,d1,l1,",
+          "1971-01-01T00:00:00.100Z,d1,l2,1971-01-01T00:00:00.000Z,d999,null,",
+          "1971-01-01T00:00:00.100Z,d1,l2,1971-01-01T00:00:00.000Z,null,l999,",
+          "1971-04-26T17:46:40.000Z,d1,l2,1971-01-01T00:00:00.000Z,d1,l1,"
+        };
+    // test single join condition
+    tableResultSetEqualTest(
+        "select 
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level 
from table0 asof left join table1 on "
+            + "table0.time>table1.time "
+            + "order by 
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+    // test expr and '>=' in ASOF condition
+    tableResultSetEqualTest(
+        "select 
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level 
from table0 asof left join table1 on "
+            + "table0.time>=table1.time+1 "
+            + "order by 
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    retArray =
+        new String[] {
+          "1970-01-01T00:00:00.000Z,d1,l1,null,null,null,",
+          "1971-01-01T00:00:00.000Z,d1,l1,1970-01-01T00:00:00.000Z,d1,l1,",
+          "1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,d1,l1,",
+          "1970-01-01T00:00:00.020Z,d1,l2,null,null,null,",
+          "1971-01-01T00:00:00.100Z,d1,l2,1970-01-01T00:00:00.020Z,d1,l2,",
+          "1971-04-26T17:46:40.000Z,d1,l2,1970-01-01T00:00:00.020Z,d1,l2,",
+          "1970-01-01T00:00:00.040Z,d1,l3,null,null,null,",
+          "1971-01-01T00:00:00.500Z,d1,l3,1970-01-01T00:00:00.040Z,d1,l3,",
+          "1971-04-26T17:46:40.020Z,d1,l3,1970-01-01T00:00:00.040Z,d1,l3,",
+          "1970-01-01T00:00:00.080Z,d1,l4,null,null,null,"
+        };
+    // test multi join conditions
+    tableResultSetEqualTest(
+        "select 
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level 
from table0 asof left join table1 on "
+            + "table0.device=table1.device and table1.level=table0.level and 
table0.time>table1.time "
+            + "order by 
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+    // test expr and '>=' in ASOF condition
+    tableResultSetEqualTest(
+        "select 
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level 
from table0 asof left join table1 on "
+            + "table0.device=table1.device and table1.level=table0.level and 
table0.time>=table1.time+1 "
+            + "order by 
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    retArray =
+        new String[] {
+          "1970-01-01T00:00:00.000Z,d1,l1,1970-01-01T00:00:00.010Z,d11,l11,",
+          "1971-01-01T00:00:00.000Z,d1,l1,null,null,null,",
+          "1971-01-01T00:01:40.000Z,d1,l1,null,null,null,",
+          "1970-01-01T00:00:00.020Z,d1,l2,1970-01-01T00:00:00.030Z,d11,l11,",
+          "1971-01-01T00:00:00.100Z,d1,l2,null,null,null,",
+          "1971-04-26T17:46:40.000Z,d1,l2,null,null,null,",
+          "1970-01-01T00:00:00.040Z,d1,l3,1970-01-01T00:00:00.080Z,d1,l4,",
+          "1971-01-01T00:00:00.500Z,d1,l3,null,null,null,",
+          "1971-04-26T17:46:40.020Z,d1,l3,null,null,null,",
+          "1970-01-01T00:00:00.080Z,d1,l4,1970-01-01T00:00:00.100Z,d1,l5,"
+        };
+    // test single join condition
+    tableResultSetEqualTest(
+        "select 
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level 
from table0 asof left join table1 on "
+            + "table0.time<table1.time "
+            + "order by 
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+    // test expr and '<=' in ASOF condition
+    tableResultSetEqualTest(
+        "select 
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level 
from table0 asof left join table1 on "
+            + "table0.time<=table1.time-1 "
+            + "order by 
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    retArray =
+        new String[] {
+          "1970-01-01T00:00:00.000Z,d1,l1,1971-01-01T00:00:00.000Z,d1,l1,",
+          "1971-01-01T00:00:00.000Z,d1,l1,null,null,null,",
+          "1971-01-01T00:01:40.000Z,d1,l1,null,null,null,",
+          "1970-01-01T00:00:00.020Z,d1,l2,null,null,null,",
+          "1971-01-01T00:00:00.100Z,d1,l2,null,null,null,",
+          "1971-04-26T17:46:40.000Z,d1,l2,null,null,null,",
+          "1970-01-01T00:00:00.040Z,d1,l3,null,null,null,",
+          "1971-01-01T00:00:00.500Z,d1,l3,null,null,null,",
+          "1971-04-26T17:46:40.020Z,d1,l3,null,null,null,",
+          "1970-01-01T00:00:00.080Z,d1,l4,null,null,null,"
+        };
+    // test multi join conditions
+    tableResultSetEqualTest(
+        "select 
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level 
from table0 asof left join table1 on "
+            + "table0.device=table1.device and table1.level=table0.level and 
table0.time<table1.time "
+            + "order by 
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+    // test expr and '>=' in ASOF condition
+    tableResultSetEqualTest(
+        "select 
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level 
from table0 asof left join table1 on "
+            + "table0.device=table1.device and table1.level=table0.level and 
table0.time<=table1.time-1 "
+            + "order by 
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
   @Test
   public void exceptionTest() {
     String errMsg = TSStatusCode.SEMANTIC_ERROR.getStatusCode() + ": " + 
ONLY_SUPPORT_EQUI_JOIN;
@@ -2648,6 +2772,11 @@ public class IoTDBMultiTAGsWithAttributesTableIT {
         "select * from table0 t0 full join table1 t1 on t0.device=t1.device OR 
t0.time=t1.time",
         errMsg,
         DATABASE_NAME);
+
+    tableAssertTestFail(
+        "select * from table0 asof (tolerance 1s) left join table1 on 
table0.time<=table1.time",
+        "Tolerance in ASOF JOIN is only support INNER type now",
+        DATABASE_NAME);
   }
 
   public static void repeatTest(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortInnerJoinOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAsofMergeSortJoinOperator.java
similarity index 81%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortInnerJoinOperator.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAsofMergeSortJoinOperator.java
index 59762176934..0f880ba2f88 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortInnerJoinOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAsofMergeSortJoinOperator.java
@@ -28,12 +28,12 @@ import org.apache.tsfile.read.common.block.TsBlock;
 
 import java.util.List;
 
-public class AsofMergeSortInnerJoinOperator extends MergeSortInnerJoinOperator 
{
+public abstract class AbstractAsofMergeSortJoinOperator extends 
AbstractMergeSortJoinOperator {
   private final JoinKeyComparator asofComparator;
   private final int leftAsofJoinKeyIndex;
   private final int rightAsofJoinKeyIndex;
 
-  public AsofMergeSortInnerJoinOperator(
+  public AbstractAsofMergeSortJoinOperator(
       OperatorContext operatorContext,
       Operator leftChild,
       int[] leftJoinKeyPositions,
@@ -58,50 +58,6 @@ public class AsofMergeSortInnerJoinOperator extends 
MergeSortInnerJoinOperator {
     this.rightAsofJoinKeyIndex = 
rightJoinKeyPositions[rightJoinKeyPositions.length - 1];
   }
 
-  @Override
-  protected boolean processFinished() {
-    // all the join keys in rightTsBlock are less or equal than leftTsBlock, 
just skip right
-    if (allRightLessOrEqualThanLeft()) {
-      resetRightBlockList();
-      return true;
-    }
-
-    // skip all NULL values in left, because NULL value can not appear in the 
inner join result
-    while (currentLeftHasNullValue()) {
-      if (leftFinishedWithIncIndex()) {
-        return true;
-      }
-    }
-
-    // skip all NULL values in right, because NULL value can not appear in the 
inner join result
-    while (currentRightHasNullValue()) {
-      if (rightFinishedWithIncIndex()) {
-        return true;
-      }
-    }
-
-    // find first candidate of right meets the conditions
-    while (lessThanOrEqual(
-        rightBlockList.get(rightBlockListIdx),
-        rightJoinKeyPositions,
-        rightIndex,
-        leftBlock,
-        leftJoinKeyPositions,
-        leftIndex)) {
-      if (rightFinishedWithIncIndex()) {
-        return true;
-      }
-    }
-    if (currentRoundNeedStop()) {
-      return true;
-    }
-
-    // has right values meet condition, append to join result
-    hasMatchedRightValueToProbeLeft();
-    // always inc leftIndex after current left result appended
-    return leftFinishedWithIncIndex();
-  }
-
   // check if the last value of the right is less or equal than left
   protected boolean allRightLessOrEqualThanLeft() {
     return lessThanOrEqual(
@@ -113,7 +69,7 @@ public class AsofMergeSortInnerJoinOperator extends 
MergeSortInnerJoinOperator {
         leftIndex);
   }
 
-  private boolean lessThanOrEqual(
+  protected boolean lessThanOrEqual(
       TsBlock leftBlock,
       int[] leftPositions,
       int lIndex,
@@ -234,4 +190,9 @@ public class AsofMergeSortInnerJoinOperator extends 
MergeSortInnerJoinOperator {
     }
     return true;
   }
+
+  @Override
+  protected void recordsWhenDataMatches() {
+    // do nothing
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortInnerJoinOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortInnerJoinOperator.java
index 59762176934..f8a57dd3c40 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortInnerJoinOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortInnerJoinOperator.java
@@ -19,19 +19,19 @@
 
 package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator.JoinKeyComparator;
 
 import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.util.List;
 
-public class AsofMergeSortInnerJoinOperator extends MergeSortInnerJoinOperator 
{
-  private final JoinKeyComparator asofComparator;
-  private final int leftAsofJoinKeyIndex;
-  private final int rightAsofJoinKeyIndex;
+public class AsofMergeSortInnerJoinOperator extends 
AbstractAsofMergeSortJoinOperator {
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(AsofMergeSortInnerJoinOperator.class);
 
   public AsofMergeSortInnerJoinOperator(
       OperatorContext operatorContext,
@@ -53,9 +53,21 @@ public class AsofMergeSortInnerJoinOperator extends 
MergeSortInnerJoinOperator {
         rightOutputSymbolIdx,
         joinKeyComparators,
         dataTypes);
-    this.asofComparator = joinKeyComparators.get(joinKeyComparators.size() - 
1);
-    this.leftAsofJoinKeyIndex = 
leftJoinKeyPositions[leftJoinKeyPositions.length - 1];
-    this.rightAsofJoinKeyIndex = 
rightJoinKeyPositions[rightJoinKeyPositions.length - 1];
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    if (retainedTsBlock != null) {
+      return true;
+    }
+
+    return !leftFinished && !rightFinished;
+  }
+
+  @Override
+  protected boolean prepareInput() throws Exception {
+    gotCandidateBlocks();
+    return leftBlockNotEmpty() && rightBlockNotEmpty() && gotNextRightBlock();
   }
 
   @Override
@@ -102,136 +114,14 @@ public class AsofMergeSortInnerJoinOperator extends 
MergeSortInnerJoinOperator {
     return leftFinishedWithIncIndex();
   }
 
-  // check if the last value of the right is less or equal than left
-  protected boolean allRightLessOrEqualThanLeft() {
-    return lessThanOrEqual(
-        rightBlockList.get(rightBlockList.size() - 1),
-        rightJoinKeyPositions,
-        rightBlockList.get(rightBlockList.size() - 1).getPositionCount() - 1,
-        leftBlock,
-        leftJoinKeyPositions,
-        leftIndex);
-  }
-
-  private boolean lessThanOrEqual(
-      TsBlock leftBlock,
-      int[] leftPositions,
-      int lIndex,
-      TsBlock rightBlock,
-      int[] rightPositions,
-      int rIndex) {
-    // if join key size equals to 1, can return true in inner join
-    if (rightPositions.length == 1 && 
rightBlock.getColumn(rightPositions[0]).isNull(rIndex)) {
-      return true;
-    }
-
-    int lastIndex = comparators.size() - 1;
-    for (int i = 0; i < lastIndex; i++) {
-      if (comparators
-          .get(i)
-          .lessThan(leftBlock, leftPositions[i], lIndex, rightBlock, 
rightPositions[i], rIndex)
-          .orElse(false)) {
-        return true;
-      } else if (!comparators
-          .get(i)
-          .equalsTo(leftBlock, leftPositions[i], lIndex, rightBlock, 
rightPositions[i], rIndex)
-          .orElse(false)) {
-        return false;
-      }
-    }
-
-    return comparators
-        .get(lastIndex)
-        .lessThanOrEqual(
-            leftBlock,
-            leftPositions[lastIndex],
-            lIndex,
-            rightBlock,
-            rightPositions[lastIndex],
-            rIndex)
-        .orElse(false);
-  }
-
-  /**
-   * Examine if stop this round and rebuild rightBlockLists.
-   *
-   * @return true if rightBlockListIdx more than zero.
-   */
-  protected boolean currentRoundNeedStop() {
-    if (rightBlockListIdx > 0) {
-      for (int i = 0; i < rightBlockListIdx; i++) {
-        long size = rightBlockList.get(i).getRetainedSizeInBytes();
-        usedMemory -= size;
-        memoryReservationManager.releaseMemoryCumulatively(size);
-      }
-      rightBlockList = rightBlockList.subList(rightBlockListIdx, 
rightBlockList.size());
-      rightBlockListIdx = 0;
-      return true;
-    }
-
-    return false;
-  }
-
-  public boolean hasMatchedRightValueToProbeLeft() {
-    int tmpBlockIdx = rightBlockListIdx;
-    int tmpIdx = rightIndex;
-    boolean hasMatched = false;
-    long matchedTime = Long.MIN_VALUE;
-    while (equalsIgnoreAsof(
-            leftBlock,
-            leftJoinKeyPositions,
-            leftIndex,
-            rightBlockList.get(tmpBlockIdx),
-            rightJoinKeyPositions,
-            tmpIdx)
-        && asofComparator
-            .lessThan(
-                leftBlock,
-                leftAsofJoinKeyIndex,
-                leftIndex,
-                rightBlockList.get(rightBlockListIdx),
-                rightAsofJoinKeyIndex,
-                rightIndex)
-            .orElse(false)) {
-      long currentTime =
-          
rightBlockList.get(tmpBlockIdx).getColumn(rightAsofJoinKeyIndex).getLong(tmpIdx);
-      if (matchedTime == Long.MIN_VALUE) {
-        matchedTime = currentTime;
-      } else if (currentTime != matchedTime) {
-        break;
-      }
-
-      hasMatched = true;
-      appendValueToResultWhenMatches(tmpBlockIdx, tmpIdx);
-
-      tmpIdx++;
-      if (tmpIdx >= rightBlockList.get(tmpBlockIdx).getPositionCount()) {
-        tmpIdx = 0;
-        tmpBlockIdx++;
-      }
-
-      if (tmpBlockIdx >= rightBlockList.size()) {
-        break;
-      }
-    }
-    return hasMatched;
-  }
-
-  protected boolean equalsIgnoreAsof(
-      TsBlock leftBlock,
-      int[] leftPositions,
-      int lIndex,
-      TsBlock rightBlock,
-      int[] rightPositions,
-      int rIndex) {
-    for (int i = 0; i < comparators.size() - 1; i++) {
-      if (!comparators
-          .get(i)
-          .equalsTo(leftBlock, leftPositions[i], lIndex, rightBlock, 
rightPositions[i], rIndex)
-          .orElse(false)) {
-        return false;
-      }
-    }
-    return true;
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(leftChild)
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(rightChild)
+        + RamUsageEstimator.sizeOf(leftOutputSymbolIdx)
+        + RamUsageEstimator.sizeOf(rightOutputSymbolIdx)
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+        + resultBuilder.getRetainedSizeInBytes();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortLeftJoinOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortLeftJoinOperator.java
new file mode 100644
index 00000000000..8307c242163
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortLeftJoinOperator.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator.JoinKeyComparator;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+
+public class AsofMergeSortLeftJoinOperator extends 
AbstractAsofMergeSortJoinOperator {
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(AsofMergeSortLeftJoinOperator.class);
+
+  public AsofMergeSortLeftJoinOperator(
+      OperatorContext operatorContext,
+      Operator leftChild,
+      int[] leftJoinKeyPositions,
+      int[] leftOutputSymbolIdx,
+      Operator rightChild,
+      int[] rightJoinKeyPositions,
+      int[] rightOutputSymbolIdx,
+      List<JoinKeyComparator> joinKeyComparators,
+      List<TSDataType> dataTypes) {
+    super(
+        operatorContext,
+        leftChild,
+        leftJoinKeyPositions,
+        leftOutputSymbolIdx,
+        rightChild,
+        rightJoinKeyPositions,
+        rightOutputSymbolIdx,
+        joinKeyComparators,
+        dataTypes);
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    if (retainedTsBlock != null) {
+      return true;
+    }
+
+    return !leftFinished;
+  }
+
+  @Override
+  protected boolean prepareInput() throws Exception {
+    gotCandidateBlocks();
+    if (rightFinished) {
+      return leftBlockNotEmpty();
+    }
+    return leftBlockNotEmpty() && rightBlockNotEmpty() && gotNextRightBlock();
+  }
+
+  @Override
+  protected boolean processFinished() {
+    if (rightFinished) {
+      appendLeftWithEmptyRight();
+      return true;
+    }
+
+    // all the join keys in rightTsBlock are less or equal than leftTsBlock, 
just skip right
+    if (allRightLessOrEqualThanLeft()) {
+      resetRightBlockList();
+      return true;
+    }
+
+    // skip all NULL values in left, because NULL value can not appear in the 
inner join result
+    while (currentLeftHasNullValue()) {
+      appendOneLeftRowWithEmptyRight();
+      if (leftFinishedWithIncIndex()) {
+        return true;
+      }
+    }
+
+    // skip all NULL values in right, because NULL value can not appear in the 
inner join result
+    while (currentRightHasNullValue()) {
+      if (rightFinishedWithIncIndex()) {
+        return true;
+      }
+    }
+
+    // find first candidate of right meets the conditions
+    while (lessThanOrEqual(
+        rightBlockList.get(rightBlockListIdx),
+        rightJoinKeyPositions,
+        rightIndex,
+        leftBlock,
+        leftJoinKeyPositions,
+        leftIndex)) {
+      if (rightFinishedWithIncIndex()) {
+        return true;
+      }
+    }
+    if (currentRoundNeedStop()) {
+      return true;
+    }
+
+    // has right values meet condition, append to join result
+    // else append left
+    if (!hasMatchedRightValueToProbeLeft()) {
+      appendLeftWithEmptyRight();
+    }
+
+    // always inc leftIndex after current left result appended
+    return leftFinishedWithIncIndex();
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(leftChild)
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(rightChild)
+        + RamUsageEstimator.sizeOf(leftOutputSymbolIdx)
+        + RamUsageEstimator.sizeOf(rightOutputSymbolIdx)
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+        + resultBuilder.getRetainedSizeInBytes();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index f5efc969cd3..3ad31ab62a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -93,6 +93,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOpera
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractAggTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AsofMergeSortInnerJoinOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AsofMergeSortLeftJoinOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DefaultAggTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DeviceIteratorScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.InformationSchemaTableScanOperator;
@@ -2008,6 +2009,28 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
                     || asofOperator == 
ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL,
                 !asofJoinClause.isOperatorContainsGreater()),
             dataTypes);
+      } else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.LEFT) 
{
+        OperatorContext operatorContext =
+            context
+                .getDriverContext()
+                .addOperatorContext(
+                    context.getNextOperatorId(),
+                    node.getPlanNodeId(),
+                    AsofMergeSortLeftJoinOperator.class.getSimpleName());
+        return new AsofMergeSortLeftJoinOperator(
+            operatorContext,
+            leftChild,
+            leftJoinKeyPositions,
+            leftOutputSymbolIdx,
+            rightChild,
+            rightJoinKeyPositions,
+            rightOutputSymbolIdx,
+            JoinKeyComparatorFactory.getAsofComparators(
+                joinKeyTypes,
+                asofOperator == 
ComparisonExpression.Operator.LESS_THAN_OR_EQUAL
+                    || asofOperator == 
ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL,
+                !asofJoinClause.isOperatorContainsGreater()),
+            dataTypes);
       } else {
         throw new IllegalStateException("Unsupported ASOF join type: " + 
node.getJoinType());
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index f96061b8b6e..5cda1274ab8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -753,6 +753,9 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
 
           equiJoinClauses.add(new JoinNode.EquiJoinClause(leftSymbol, 
rightSymbol));
         } else {
+          if (conjunct.equals(TRUE_LITERAL)) {
+            continue;
+          }
           if (node.getJoinType() != INNER) {
             throw new SemanticException(ONLY_SUPPORT_EQUI_JOIN);
           }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index 51b1b8afafb..db4ff12d74b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -2345,7 +2345,7 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
     }
 
     JoinCriteria criteria;
-
+    TimeDuration timeDuration = null;
     if (ctx.NATURAL() != null) {
       right = (Relation) visit(ctx.right);
       criteria = new NaturalJoin();
@@ -2353,7 +2353,6 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
       right = (Relation) visit(ctx.rightRelation);
       if (ctx.joinCriteria().ON() != null) {
         if (ctx.ASOF() != null) {
-          TimeDuration timeDuration = null;
           if (ctx.timeDuration() != null) {
             timeDuration = 
DateTimeUtils.constructTimeDuration(ctx.timeDuration().getText());
 
@@ -2386,8 +2385,8 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
       joinType = Join.Type.INNER;
     }
 
-    if (criteria instanceof AsofJoinOn && joinType != Join.Type.INNER) {
-      throw new SemanticException("ASOF JOIN is only support INNER type now");
+    if (criteria instanceof AsofJoinOn && joinType != Join.Type.INNER && 
timeDuration != null) {
+      throw new SemanticException("Tolerance in ASOF JOIN is only support 
INNER type now");
     }
 
     return new Join(getLocation(ctx), joinType, left, right, criteria);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AsofJoinTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AsofJoinTest.java
index 6c92220780a..62ec85d2774 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AsofJoinTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AsofJoinTest.java
@@ -275,8 +275,5 @@ public class AsofJoinTest {
     assertAnalyzeSemanticException(
         "select * from table1 asof (tolerance 1ms) join table2 on table1.tag1 
= table2.tag1 and table1.time > table2.s1",
         "right child type of ASOF main JOIN expression must be TIMESTAMP: 
actual type INT64");
-    assertAnalyzeSemanticException(
-        "select * from table1 asof (tolerance 1ms) left join table2 on 
table1.time > table2.time",
-        "ASOF JOIN is only support INNER type now");
   }
 }


Reply via email to