This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5486cdd0a4a Implement ASOF LEFT Join
5486cdd0a4a is described below
commit 5486cdd0a4a8788edeb844492cb0565aa5f72239
Author: Weihao Li <[email protected]>
AuthorDate: Mon May 19 19:39:54 2025 +0800
Implement ASOF LEFT Join
---
.../relational/it/db/it/IoTDBAsofJoinTableIT.java | 79 ++++++++++
.../db/it/IoTDBMultiTAGsWithAttributesTableIT.java | 131 +++++++++++++++-
...java => AbstractAsofMergeSortJoinOperator.java} | 55 +------
.../relational/AsofMergeSortInnerJoinOperator.java | 168 ++++-----------------
.../relational/AsofMergeSortLeftJoinOperator.java | 140 +++++++++++++++++
.../plan/planner/TableOperatorGenerator.java | 23 +++
.../optimizations/PushPredicateIntoTableScan.java | 3 +
.../plan/relational/sql/parser/AstBuilder.java | 7 +-
.../plan/relational/analyzer/AsofJoinTest.java | 3 -
9 files changed, 415 insertions(+), 194 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBAsofJoinTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBAsofJoinTableIT.java
index 10f510c5001..eb48343a107 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBAsofJoinTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBAsofJoinTableIT.java
@@ -143,4 +143,83 @@ public class IoTDBAsofJoinTableIT {
retArray,
DATABASE_NAME);
}
+
+ @Test
+ public void leftJoinTest() {
+ retArray =
+ new String[] {
+ "2020-01-01T00:00:01.000Z,d1,1,null,null,null,",
+ "2020-01-01T00:00:03.000Z,d1,3,2020-01-01T00:00:03.000Z,d1,30,",
+ "2020-01-01T00:00:05.000Z,d1,5,2020-01-01T00:00:05.000Z,d2,50,",
+ "2020-01-01T00:00:08.000Z,d2,8,2020-01-01T00:00:05.000Z,d2,50,"
+ };
+ tableResultSetEqualTest(
+ "SELECT t1.time as time1, t1.device as device1, t1.value as value1, \n"
+ + " t2.time as time2, t2.device as device2, t2.value as
value2 \n"
+ + "FROM \n"
+ + "table1 t1 ASOF LEFT JOIN table2 t2\n"
+ + "ON\n"
+ + "t1.time>=t2.time\n"
+ + "order by time1",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ retArray =
+ new String[] {
+ "2020-01-01T00:00:01.000Z,d1,1,null,null,null,",
+ "2020-01-01T00:00:03.000Z,d1,3,2020-01-01T00:00:03.000Z,d1,30,",
+ "2020-01-01T00:00:05.000Z,d1,5,2020-01-01T00:00:03.000Z,d1,30,",
+ "2020-01-01T00:00:08.000Z,d2,8,2020-01-01T00:00:05.000Z,d2,50,"
+ };
+ tableResultSetEqualTest(
+ "SELECT t1.time as time1, t1.device as device1, t1.value as value1, \n"
+ + " t2.time as time2, t2.device as device2, t2.value as
value2 \n"
+ + "FROM \n"
+ + "table1 t1 ASOF LEFT JOIN table2 t2\n"
+ + "ON\n"
+ + "t1.device=t2.device AND t1.time>=t2.time\n"
+ + "order by time1",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ retArray =
+ new String[] {
+ "2020-01-01T00:00:01.000Z,d1,1,2020-01-01T00:00:02.000Z,d1,20,",
+ "2020-01-01T00:00:03.000Z,d1,3,2020-01-01T00:00:04.000Z,d2,40,",
+ "2020-01-01T00:00:05.000Z,d1,5,null,null,null,",
+ "2020-01-01T00:00:08.000Z,d2,8,null,null,null,"
+ };
+ tableResultSetEqualTest(
+ "SELECT t1.time as time1, t1.device as device1, t1.value as value1, \n"
+ + " t2.time as time2, t2.device as device2, t2.value as
value2 \n"
+ + "FROM \n"
+ + "table1 t1 ASOF LEFT JOIN table2 t2\n"
+ + "ON\n"
+ + "t1.time<t2.time\n"
+ + "order by time1",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ retArray =
+ new String[] {
+ "2020-01-01T00:00:01.000Z,d1,1,2020-01-01T00:00:02.000Z,d1,20,",
+ "2020-01-01T00:00:03.000Z,d1,3,null,null,null,",
+ "2020-01-01T00:00:05.000Z,d1,5,null,null,null,",
+ "2020-01-01T00:00:08.000Z,d2,8,null,null,null,"
+ };
+ tableResultSetEqualTest(
+ "SELECT t1.time as time1, t1.device as device1, t1.value as value1, \n"
+ + " t2.time as time2, t2.device as device2, t2.value as
value2 \n"
+ + "FROM \n"
+ + "table1 t1 ASOF LEFT JOIN table2 t2\n"
+ + "ON\n"
+ + "t1.device=t2.device AND t1.time<t2.time\n"
+ + "order by time1",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
index ea0628dd696..b0b70d5f0df 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
@@ -2511,7 +2511,7 @@ public class IoTDBMultiTAGsWithAttributesTableIT {
}
@Test
- public void asofJoinTest() {
+ public void asofInnerJoinTest() {
expectedHeader = new String[] {"time", "device", "level", "time",
"device", "level"};
retArray =
new String[] {
@@ -2625,6 +2625,130 @@ public class IoTDBMultiTAGsWithAttributesTableIT {
DATABASE_NAME);
}
+ @Test
+ public void asofLeftJoinTest() {
+ expectedHeader = new String[] {"time", "device", "level", "time",
"device", "level"};
+ retArray =
+ new String[] {
+ "1970-01-01T00:00:00.000Z,d1,l1,null,null,null,",
+ "1971-01-01T00:00:00.000Z,d1,l1,1970-01-01T00:00:00.100Z,d1,l5,",
+ "1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,d1,l1,",
+ "1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,d999,null,",
+ "1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,null,l999,",
+ "1970-01-01T00:00:00.020Z,d1,l2,1970-01-01T00:00:00.010Z,d11,l11,",
+ "1971-01-01T00:00:00.100Z,d1,l2,1971-01-01T00:00:00.000Z,d1,l1,",
+ "1971-01-01T00:00:00.100Z,d1,l2,1971-01-01T00:00:00.000Z,d999,null,",
+ "1971-01-01T00:00:00.100Z,d1,l2,1971-01-01T00:00:00.000Z,null,l999,",
+ "1971-04-26T17:46:40.000Z,d1,l2,1971-01-01T00:00:00.000Z,d1,l1,"
+ };
+ // test single join condition
+ tableResultSetEqualTest(
+ "select
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level
from table0 asof left join table1 on "
+ + "table0.time>table1.time "
+ + "order by
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ // test expr and '>=' in ASOF condition
+ tableResultSetEqualTest(
+ "select
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level
from table0 asof left join table1 on "
+ + "table0.time>=table1.time+1 "
+ + "order by
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ retArray =
+ new String[] {
+ "1970-01-01T00:00:00.000Z,d1,l1,null,null,null,",
+ "1971-01-01T00:00:00.000Z,d1,l1,1970-01-01T00:00:00.000Z,d1,l1,",
+ "1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,d1,l1,",
+ "1970-01-01T00:00:00.020Z,d1,l2,null,null,null,",
+ "1971-01-01T00:00:00.100Z,d1,l2,1970-01-01T00:00:00.020Z,d1,l2,",
+ "1971-04-26T17:46:40.000Z,d1,l2,1970-01-01T00:00:00.020Z,d1,l2,",
+ "1970-01-01T00:00:00.040Z,d1,l3,null,null,null,",
+ "1971-01-01T00:00:00.500Z,d1,l3,1970-01-01T00:00:00.040Z,d1,l3,",
+ "1971-04-26T17:46:40.020Z,d1,l3,1970-01-01T00:00:00.040Z,d1,l3,",
+ "1970-01-01T00:00:00.080Z,d1,l4,null,null,null,"
+ };
+ // test multi join conditions
+ tableResultSetEqualTest(
+ "select
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level
from table0 asof left join table1 on "
+ + "table0.device=table1.device and table1.level=table0.level and
table0.time>table1.time "
+ + "order by
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ // test expr and '>=' in ASOF condition
+ tableResultSetEqualTest(
+ "select
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level
from table0 asof left join table1 on "
+ + "table0.device=table1.device and table1.level=table0.level and
table0.time>=table1.time+1 "
+ + "order by
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ retArray =
+ new String[] {
+ "1970-01-01T00:00:00.000Z,d1,l1,1970-01-01T00:00:00.010Z,d11,l11,",
+ "1971-01-01T00:00:00.000Z,d1,l1,null,null,null,",
+ "1971-01-01T00:01:40.000Z,d1,l1,null,null,null,",
+ "1970-01-01T00:00:00.020Z,d1,l2,1970-01-01T00:00:00.030Z,d11,l11,",
+ "1971-01-01T00:00:00.100Z,d1,l2,null,null,null,",
+ "1971-04-26T17:46:40.000Z,d1,l2,null,null,null,",
+ "1970-01-01T00:00:00.040Z,d1,l3,1970-01-01T00:00:00.080Z,d1,l4,",
+ "1971-01-01T00:00:00.500Z,d1,l3,null,null,null,",
+ "1971-04-26T17:46:40.020Z,d1,l3,null,null,null,",
+ "1970-01-01T00:00:00.080Z,d1,l4,1970-01-01T00:00:00.100Z,d1,l5,"
+ };
+ // test single join condition
+ tableResultSetEqualTest(
+ "select
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level
from table0 asof left join table1 on "
+ + "table0.time<table1.time "
+ + "order by
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ // test expr and '<=' in ASOF condition
+ tableResultSetEqualTest(
+ "select
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level
from table0 asof left join table1 on "
+ + "table0.time<=table1.time-1 "
+ + "order by
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ retArray =
+ new String[] {
+ "1970-01-01T00:00:00.000Z,d1,l1,1971-01-01T00:00:00.000Z,d1,l1,",
+ "1971-01-01T00:00:00.000Z,d1,l1,null,null,null,",
+ "1971-01-01T00:01:40.000Z,d1,l1,null,null,null,",
+ "1970-01-01T00:00:00.020Z,d1,l2,null,null,null,",
+ "1971-01-01T00:00:00.100Z,d1,l2,null,null,null,",
+ "1971-04-26T17:46:40.000Z,d1,l2,null,null,null,",
+ "1970-01-01T00:00:00.040Z,d1,l3,null,null,null,",
+ "1971-01-01T00:00:00.500Z,d1,l3,null,null,null,",
+ "1971-04-26T17:46:40.020Z,d1,l3,null,null,null,",
+ "1970-01-01T00:00:00.080Z,d1,l4,null,null,null,"
+ };
+ // test multi join conditions
+ tableResultSetEqualTest(
+ "select
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level
from table0 asof left join table1 on "
+ + "table0.device=table1.device and table1.level=table0.level and
table0.time<table1.time "
+ + "order by
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ // test expr and '>=' in ASOF condition
+ tableResultSetEqualTest(
+ "select
table0.time,table0.device,table0.level,table1.time,table1.device,table1.level
from table0 asof left join table1 on "
+ + "table0.device=table1.device and table1.level=table0.level and
table0.time<=table1.time-1 "
+ + "order by
table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
@Test
public void exceptionTest() {
String errMsg = TSStatusCode.SEMANTIC_ERROR.getStatusCode() + ": " +
ONLY_SUPPORT_EQUI_JOIN;
@@ -2648,6 +2772,11 @@ public class IoTDBMultiTAGsWithAttributesTableIT {
"select * from table0 t0 full join table1 t1 on t0.device=t1.device OR
t0.time=t1.time",
errMsg,
DATABASE_NAME);
+
+ tableAssertTestFail(
+ "select * from table0 asof (tolerance 1s) left join table1 on
table0.time<=table1.time",
+ "Tolerance in ASOF JOIN is only support INNER type now",
+ DATABASE_NAME);
}
public static void repeatTest(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortInnerJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAsofMergeSortJoinOperator.java
similarity index 81%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortInnerJoinOperator.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAsofMergeSortJoinOperator.java
index 59762176934..0f880ba2f88 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortInnerJoinOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAsofMergeSortJoinOperator.java
@@ -28,12 +28,12 @@ import org.apache.tsfile.read.common.block.TsBlock;
import java.util.List;
-public class AsofMergeSortInnerJoinOperator extends MergeSortInnerJoinOperator
{
+public abstract class AbstractAsofMergeSortJoinOperator extends
AbstractMergeSortJoinOperator {
private final JoinKeyComparator asofComparator;
private final int leftAsofJoinKeyIndex;
private final int rightAsofJoinKeyIndex;
- public AsofMergeSortInnerJoinOperator(
+ public AbstractAsofMergeSortJoinOperator(
OperatorContext operatorContext,
Operator leftChild,
int[] leftJoinKeyPositions,
@@ -58,50 +58,6 @@ public class AsofMergeSortInnerJoinOperator extends
MergeSortInnerJoinOperator {
this.rightAsofJoinKeyIndex =
rightJoinKeyPositions[rightJoinKeyPositions.length - 1];
}
- @Override
- protected boolean processFinished() {
- // all the join keys in rightTsBlock are less or equal than leftTsBlock,
just skip right
- if (allRightLessOrEqualThanLeft()) {
- resetRightBlockList();
- return true;
- }
-
- // skip all NULL values in left, because NULL value can not appear in the
inner join result
- while (currentLeftHasNullValue()) {
- if (leftFinishedWithIncIndex()) {
- return true;
- }
- }
-
- // skip all NULL values in right, because NULL value can not appear in the
inner join result
- while (currentRightHasNullValue()) {
- if (rightFinishedWithIncIndex()) {
- return true;
- }
- }
-
- // find first candidate of right meets the conditions
- while (lessThanOrEqual(
- rightBlockList.get(rightBlockListIdx),
- rightJoinKeyPositions,
- rightIndex,
- leftBlock,
- leftJoinKeyPositions,
- leftIndex)) {
- if (rightFinishedWithIncIndex()) {
- return true;
- }
- }
- if (currentRoundNeedStop()) {
- return true;
- }
-
- // has right values meet condition, append to join result
- hasMatchedRightValueToProbeLeft();
- // always inc leftIndex after current left result appended
- return leftFinishedWithIncIndex();
- }
-
// check if the last value of the right is less or equal than left
protected boolean allRightLessOrEqualThanLeft() {
return lessThanOrEqual(
@@ -113,7 +69,7 @@ public class AsofMergeSortInnerJoinOperator extends
MergeSortInnerJoinOperator {
leftIndex);
}
- private boolean lessThanOrEqual(
+ protected boolean lessThanOrEqual(
TsBlock leftBlock,
int[] leftPositions,
int lIndex,
@@ -234,4 +190,9 @@ public class AsofMergeSortInnerJoinOperator extends
MergeSortInnerJoinOperator {
}
return true;
}
+
+ @Override
+ protected void recordsWhenDataMatches() {
+ // do nothing
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortInnerJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortInnerJoinOperator.java
index 59762176934..f8a57dd3c40 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortInnerJoinOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortInnerJoinOperator.java
@@ -19,19 +19,19 @@
package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator.JoinKeyComparator;
import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
import java.util.List;
-public class AsofMergeSortInnerJoinOperator extends MergeSortInnerJoinOperator
{
- private final JoinKeyComparator asofComparator;
- private final int leftAsofJoinKeyIndex;
- private final int rightAsofJoinKeyIndex;
+public class AsofMergeSortInnerJoinOperator extends
AbstractAsofMergeSortJoinOperator {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(AsofMergeSortInnerJoinOperator.class);
public AsofMergeSortInnerJoinOperator(
OperatorContext operatorContext,
@@ -53,9 +53,21 @@ public class AsofMergeSortInnerJoinOperator extends
MergeSortInnerJoinOperator {
rightOutputSymbolIdx,
joinKeyComparators,
dataTypes);
- this.asofComparator = joinKeyComparators.get(joinKeyComparators.size() -
1);
- this.leftAsofJoinKeyIndex =
leftJoinKeyPositions[leftJoinKeyPositions.length - 1];
- this.rightAsofJoinKeyIndex =
rightJoinKeyPositions[rightJoinKeyPositions.length - 1];
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ if (retainedTsBlock != null) {
+ return true;
+ }
+
+ return !leftFinished && !rightFinished;
+ }
+
+ @Override
+ protected boolean prepareInput() throws Exception {
+ gotCandidateBlocks();
+ return leftBlockNotEmpty() && rightBlockNotEmpty() && gotNextRightBlock();
}
@Override
@@ -102,136 +114,14 @@ public class AsofMergeSortInnerJoinOperator extends
MergeSortInnerJoinOperator {
return leftFinishedWithIncIndex();
}
- // check if the last value of the right is less or equal than left
- protected boolean allRightLessOrEqualThanLeft() {
- return lessThanOrEqual(
- rightBlockList.get(rightBlockList.size() - 1),
- rightJoinKeyPositions,
- rightBlockList.get(rightBlockList.size() - 1).getPositionCount() - 1,
- leftBlock,
- leftJoinKeyPositions,
- leftIndex);
- }
-
- private boolean lessThanOrEqual(
- TsBlock leftBlock,
- int[] leftPositions,
- int lIndex,
- TsBlock rightBlock,
- int[] rightPositions,
- int rIndex) {
- // if join key size equals to 1, can return true in inner join
- if (rightPositions.length == 1 &&
rightBlock.getColumn(rightPositions[0]).isNull(rIndex)) {
- return true;
- }
-
- int lastIndex = comparators.size() - 1;
- for (int i = 0; i < lastIndex; i++) {
- if (comparators
- .get(i)
- .lessThan(leftBlock, leftPositions[i], lIndex, rightBlock,
rightPositions[i], rIndex)
- .orElse(false)) {
- return true;
- } else if (!comparators
- .get(i)
- .equalsTo(leftBlock, leftPositions[i], lIndex, rightBlock,
rightPositions[i], rIndex)
- .orElse(false)) {
- return false;
- }
- }
-
- return comparators
- .get(lastIndex)
- .lessThanOrEqual(
- leftBlock,
- leftPositions[lastIndex],
- lIndex,
- rightBlock,
- rightPositions[lastIndex],
- rIndex)
- .orElse(false);
- }
-
- /**
- * Examine if stop this round and rebuild rightBlockLists.
- *
- * @return true if rightBlockListIdx more than zero.
- */
- protected boolean currentRoundNeedStop() {
- if (rightBlockListIdx > 0) {
- for (int i = 0; i < rightBlockListIdx; i++) {
- long size = rightBlockList.get(i).getRetainedSizeInBytes();
- usedMemory -= size;
- memoryReservationManager.releaseMemoryCumulatively(size);
- }
- rightBlockList = rightBlockList.subList(rightBlockListIdx,
rightBlockList.size());
- rightBlockListIdx = 0;
- return true;
- }
-
- return false;
- }
-
- public boolean hasMatchedRightValueToProbeLeft() {
- int tmpBlockIdx = rightBlockListIdx;
- int tmpIdx = rightIndex;
- boolean hasMatched = false;
- long matchedTime = Long.MIN_VALUE;
- while (equalsIgnoreAsof(
- leftBlock,
- leftJoinKeyPositions,
- leftIndex,
- rightBlockList.get(tmpBlockIdx),
- rightJoinKeyPositions,
- tmpIdx)
- && asofComparator
- .lessThan(
- leftBlock,
- leftAsofJoinKeyIndex,
- leftIndex,
- rightBlockList.get(rightBlockListIdx),
- rightAsofJoinKeyIndex,
- rightIndex)
- .orElse(false)) {
- long currentTime =
-
rightBlockList.get(tmpBlockIdx).getColumn(rightAsofJoinKeyIndex).getLong(tmpIdx);
- if (matchedTime == Long.MIN_VALUE) {
- matchedTime = currentTime;
- } else if (currentTime != matchedTime) {
- break;
- }
-
- hasMatched = true;
- appendValueToResultWhenMatches(tmpBlockIdx, tmpIdx);
-
- tmpIdx++;
- if (tmpIdx >= rightBlockList.get(tmpBlockIdx).getPositionCount()) {
- tmpIdx = 0;
- tmpBlockIdx++;
- }
-
- if (tmpBlockIdx >= rightBlockList.size()) {
- break;
- }
- }
- return hasMatched;
- }
-
- protected boolean equalsIgnoreAsof(
- TsBlock leftBlock,
- int[] leftPositions,
- int lIndex,
- TsBlock rightBlock,
- int[] rightPositions,
- int rIndex) {
- for (int i = 0; i < comparators.size() - 1; i++) {
- if (!comparators
- .get(i)
- .equalsTo(leftBlock, leftPositions[i], lIndex, rightBlock,
rightPositions[i], rIndex)
- .orElse(false)) {
- return false;
- }
- }
- return true;
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(leftChild)
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(rightChild)
+ + RamUsageEstimator.sizeOf(leftOutputSymbolIdx)
+ + RamUsageEstimator.sizeOf(rightOutputSymbolIdx)
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+ + resultBuilder.getRetainedSizeInBytes();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortLeftJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortLeftJoinOperator.java
new file mode 100644
index 00000000000..8307c242163
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AsofMergeSortLeftJoinOperator.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator.JoinKeyComparator;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+
+public class AsofMergeSortLeftJoinOperator extends
AbstractAsofMergeSortJoinOperator {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(AsofMergeSortLeftJoinOperator.class);
+
+ public AsofMergeSortLeftJoinOperator(
+ OperatorContext operatorContext,
+ Operator leftChild,
+ int[] leftJoinKeyPositions,
+ int[] leftOutputSymbolIdx,
+ Operator rightChild,
+ int[] rightJoinKeyPositions,
+ int[] rightOutputSymbolIdx,
+ List<JoinKeyComparator> joinKeyComparators,
+ List<TSDataType> dataTypes) {
+ super(
+ operatorContext,
+ leftChild,
+ leftJoinKeyPositions,
+ leftOutputSymbolIdx,
+ rightChild,
+ rightJoinKeyPositions,
+ rightOutputSymbolIdx,
+ joinKeyComparators,
+ dataTypes);
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ if (retainedTsBlock != null) {
+ return true;
+ }
+
+ return !leftFinished;
+ }
+
+ @Override
+ protected boolean prepareInput() throws Exception {
+ gotCandidateBlocks();
+ if (rightFinished) {
+ return leftBlockNotEmpty();
+ }
+ return leftBlockNotEmpty() && rightBlockNotEmpty() && gotNextRightBlock();
+ }
+
+ @Override
+ protected boolean processFinished() {
+ if (rightFinished) {
+ appendLeftWithEmptyRight();
+ return true;
+ }
+
+ // all the join keys in rightTsBlock are less or equal than leftTsBlock,
just skip right
+ if (allRightLessOrEqualThanLeft()) {
+ resetRightBlockList();
+ return true;
+ }
+
+ // skip all NULL values in left, because NULL value can not appear in the
inner join result
+ while (currentLeftHasNullValue()) {
+ appendOneLeftRowWithEmptyRight();
+ if (leftFinishedWithIncIndex()) {
+ return true;
+ }
+ }
+
+ // skip all NULL values in right, because NULL value can not appear in the
inner join result
+ while (currentRightHasNullValue()) {
+ if (rightFinishedWithIncIndex()) {
+ return true;
+ }
+ }
+
+ // find first candidate of right meets the conditions
+ while (lessThanOrEqual(
+ rightBlockList.get(rightBlockListIdx),
+ rightJoinKeyPositions,
+ rightIndex,
+ leftBlock,
+ leftJoinKeyPositions,
+ leftIndex)) {
+ if (rightFinishedWithIncIndex()) {
+ return true;
+ }
+ }
+ if (currentRoundNeedStop()) {
+ return true;
+ }
+
+ // has right values meet condition, append to join result
+ // else append left
+ if (!hasMatchedRightValueToProbeLeft()) {
+ appendLeftWithEmptyRight();
+ }
+
+ // always inc leftIndex after current left result appended
+ return leftFinishedWithIncIndex();
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(leftChild)
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(rightChild)
+ + RamUsageEstimator.sizeOf(leftOutputSymbolIdx)
+ + RamUsageEstimator.sizeOf(rightOutputSymbolIdx)
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+ + resultBuilder.getRetainedSizeInBytes();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index f5efc969cd3..3ad31ab62a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -93,6 +93,7 @@ import
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOpera
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractAggTableScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AsofMergeSortInnerJoinOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AsofMergeSortLeftJoinOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DefaultAggTableScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DeviceIteratorScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.InformationSchemaTableScanOperator;
@@ -2008,6 +2009,28 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
|| asofOperator ==
ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL,
!asofJoinClause.isOperatorContainsGreater()),
dataTypes);
+ } else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.LEFT)
{
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ AsofMergeSortLeftJoinOperator.class.getSimpleName());
+ return new AsofMergeSortLeftJoinOperator(
+ operatorContext,
+ leftChild,
+ leftJoinKeyPositions,
+ leftOutputSymbolIdx,
+ rightChild,
+ rightJoinKeyPositions,
+ rightOutputSymbolIdx,
+ JoinKeyComparatorFactory.getAsofComparators(
+ joinKeyTypes,
+ asofOperator ==
ComparisonExpression.Operator.LESS_THAN_OR_EQUAL
+ || asofOperator ==
ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL,
+ !asofJoinClause.isOperatorContainsGreater()),
+ dataTypes);
} else {
throw new IllegalStateException("Unsupported ASOF join type: " +
node.getJoinType());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index f96061b8b6e..5cda1274ab8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -753,6 +753,9 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
equiJoinClauses.add(new JoinNode.EquiJoinClause(leftSymbol,
rightSymbol));
} else {
+ if (conjunct.equals(TRUE_LITERAL)) {
+ continue;
+ }
if (node.getJoinType() != INNER) {
throw new SemanticException(ONLY_SUPPORT_EQUI_JOIN);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index 51b1b8afafb..db4ff12d74b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -2345,7 +2345,7 @@ public class AstBuilder extends
RelationalSqlBaseVisitor<Node> {
}
JoinCriteria criteria;
-
+ TimeDuration timeDuration = null;
if (ctx.NATURAL() != null) {
right = (Relation) visit(ctx.right);
criteria = new NaturalJoin();
@@ -2353,7 +2353,6 @@ public class AstBuilder extends
RelationalSqlBaseVisitor<Node> {
right = (Relation) visit(ctx.rightRelation);
if (ctx.joinCriteria().ON() != null) {
if (ctx.ASOF() != null) {
- TimeDuration timeDuration = null;
if (ctx.timeDuration() != null) {
timeDuration =
DateTimeUtils.constructTimeDuration(ctx.timeDuration().getText());
@@ -2386,8 +2385,8 @@ public class AstBuilder extends
RelationalSqlBaseVisitor<Node> {
joinType = Join.Type.INNER;
}
- if (criteria instanceof AsofJoinOn && joinType != Join.Type.INNER) {
- throw new SemanticException("ASOF JOIN is only support INNER type now");
+ if (criteria instanceof AsofJoinOn && joinType != Join.Type.INNER &&
timeDuration != null) {
+ throw new SemanticException("Tolerance in ASOF JOIN is only support
INNER type now");
}
return new Join(getLocation(ctx), joinType, left, right, criteria);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AsofJoinTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AsofJoinTest.java
index 6c92220780a..62ec85d2774 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AsofJoinTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AsofJoinTest.java
@@ -275,8 +275,5 @@ public class AsofJoinTest {
assertAnalyzeSemanticException(
"select * from table1 asof (tolerance 1ms) join table2 on table1.tag1
= table2.tag1 and table1.time > table2.s1",
"right child type of ASOF main JOIN expression must be TIMESTAMP:
actual type INT64");
- assertAnalyzeSemanticException(
- "select * from table1 asof (tolerance 1ms) left join table2 on
table1.time > table2.time",
- "ASOF JOIN is only support INNER type now");
}
}