This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.4 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0c23210049e73e89369919b9b239e1568eefe586 Author: Chen YZ <[email protected]> AuthorDate: Tue May 20 10:21:23 2025 +0900 Fix TVF return unexpected result when passing ORIGIN argument (cherry picked from commit 880c4335c9c89178839ce28d092cb97a15ee31d0) --- .../relational/it/db/it/IoTDBWindowTVFIT.java | 57 +++++++++++++++++++++- .../relational/tvf/CumulateTableFunction.java | 20 ++++---- .../builtin/relational/tvf/HOPTableFunction.java | 18 ++++--- .../relational/tvf/SessionTableFunction.java | 1 + .../relational/tvf/TumbleTableFunction.java | 14 +++--- 5 files changed, 86 insertions(+), 24 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java index c947722f26d..824d93c5fbe 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java @@ -130,6 +130,29 @@ public class IoTDBWindowTVFIT { expectedHeader, retArray, DATABASE_NAME); + expectedHeader = new String[] {"window_start", "window_end", "stock_id", "sum"}; + retArray = + new String[] { + "2021-01-01T09:07:00.000Z,2021-01-01T09:08:00.000Z,AAPL,103.0,", + "2021-01-01T09:09:00.000Z,2021-01-01T09:10:00.000Z,AAPL,102.0,", + "2021-01-01T09:07:00.000Z,2021-01-01T09:08:00.000Z,TESL,202.0,", + "2021-01-01T09:15:00.000Z,2021-01-01T09:16:00.000Z,TESL,195.0,", + }; + tableResultSetEqualTest( + "SELECT window_start, window_end, stock_id, sum(price) as sum FROM HOP(DATA => bid, TIMECOL => 'time', SLIDE => 1m, SIZE => 1m, ORIGIN => 2021-01-01T09:07:00) GROUP BY window_start, window_end, stock_id ORDER BY stock_id, window_start", + expectedHeader, + retArray, + DATABASE_NAME); + retArray = + new String[] { + "2021-01-01T09:07:00.000Z,2021-01-01T09:08:00.000Z,AAPL,103.0,", + "2021-01-01T09:07:00.000Z,2021-01-01T09:08:00.000Z,TESL,202.0,", + }; + tableResultSetEqualTest( + "SELECT window_start, window_end, stock_id, sum(price) as sum FROM HOP(DATA => bid, TIMECOL => 'time', SLIDE => 1h, SIZE => 1m, ORIGIN => 2021-01-01T09:07:00) GROUP BY window_start, window_end, stock_id ORDER BY stock_id, window_start", + expectedHeader, + retArray, + DATABASE_NAME); tableAssertTestFail( "SELECT * FROM HOP(DATA => bid, TIMECOL => 'time', SLIDE => -300000, SIZE => 600000) ORDER BY stock_id, time", "Invalid scalar argument SLIDE, should be a positive value", @@ -166,7 +189,7 @@ public class IoTDBWindowTVFIT { "2021-01-01T09:15:00.000Z,2021-01-01T09:15:00.000Z,TESL,195.0,", }; tableResultSetEqualTest( - "SELECT window_start, window_end, stock_id, sum(price) as sum FROM SESSION(DATA => bid PARTITION BY stock_id ORDER BY time, TIMECOL => 'time', GAP => 2m) GROUP BY window_start, window_end, stock_id ORDER BY stock_id, window_start", + "SELECT window_start, window_end, stock_id, sum(price) as sum FROM SESSION(DATA => bid PARTITION BY stock_id ORDER BY time, GAP => 2m) GROUP BY window_start, window_end, stock_id ORDER BY stock_id, window_start", expectedHeader, retArray, DATABASE_NAME); @@ -256,6 +279,19 @@ public class IoTDBWindowTVFIT { retArray, DATABASE_NAME); + // TUMBLE (10m) + origin + expectedHeader = new String[] {"window_start", "window_end", "time", "stock_id", "price", "s1"}; + retArray = + new String[] { + "2021-01-01T09:08:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,", + "2021-01-01T09:08:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,", + }; + tableResultSetEqualTest( + "SELECT * FROM TUMBLE(DATA => bid, TIMECOL => 'time', SIZE => 10m, ORIGIN => 2021-01-01T09:08:00) ORDER BY stock_id, time", + expectedHeader, + retArray, + DATABASE_NAME); + // TUMBLE (10m) + GROUP BY expectedHeader = new String[] {"window_start", "window_end", "stock_id", "sum"}; retArray = @@ -309,6 +345,25 @@ public class IoTDBWindowTVFIT { retArray, DATABASE_NAME); + expectedHeader = new String[] {"window_start", "window_end", "time", "stock_id", "price", "s1"}; + retArray = + new String[] { + "2021-01-01T09:06:00.000Z,2021-01-01T09:12:00.000Z,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,", + "2021-01-01T09:06:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,", + "2021-01-01T09:06:00.000Z,2021-01-01T09:12:00.000Z,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,", + "2021-01-01T09:06:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,", + "2021-01-01T09:06:00.000Z,2021-01-01T09:12:00.000Z,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,", + "2021-01-01T09:06:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,", + "2021-01-01T09:06:00.000Z,2021-01-01T09:12:00.000Z,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,", + "2021-01-01T09:06:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,", + "2021-01-01T09:06:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,", + }; + tableResultSetEqualTest( + "SELECT * FROM CUMULATE(DATA => bid, TIMECOL => 'time', STEP => 6m, SIZE => 12m, ORIGIN => 2021-01-01T09:06:00) ORDER BY stock_id, time", + expectedHeader, + retArray, + DATABASE_NAME); + expectedHeader = new String[] {"window_start", "window_end", "stock_id", "sum"}; retArray = new String[] { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java index b3290bf6cba..e6627491f2b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java @@ -146,13 +146,13 @@ public class CumulateTableFunction implements TableFunction { private final long step; private final long size; - private final long start; + private final long origin; private long curIndex = 0; public CumulateDataProcessor(long startTime, long step, long size) { this.step = step; this.size = size; - this.start = startTime; + this.origin = startTime; } @Override @@ -162,13 +162,15 @@ public class CumulateTableFunction implements TableFunction { ColumnBuilder passThroughIndexBuilder) { // find the first windows long timeValue = input.getLong(0); - long window_start = (timeValue - start) / size * size; - for (long steps = (timeValue - window_start + step) / step * step; - steps <= size; - steps += step) { - properColumnBuilders.get(0).writeLong(window_start); - properColumnBuilders.get(1).writeLong(window_start + steps); - passThroughIndexBuilder.writeLong(curIndex); + if (timeValue >= origin) { + long windowStart = origin + (timeValue - origin) / size * size; + for (long steps = (timeValue - windowStart + step) / step * step; + steps <= size; + steps += step) { + properColumnBuilders.get(0).writeLong(windowStart); + properColumnBuilders.get(1).writeLong(windowStart + steps); + passThroughIndexBuilder.writeLong(curIndex); + } } curIndex++; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java index d0b2852a51d..a411bda4a7c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java @@ -140,13 +140,13 @@ public class HOPTableFunction implements TableFunction { private final long slide; private final long size; - private final long start; + private final long origin; private long curIndex = 0; public HOPDataProcessor(long startTime, long slide, long size) { this.slide = slide; this.size = size; - this.start = startTime; + this.origin = startTime; } @Override @@ -157,12 +157,14 @@ public class HOPTableFunction implements TableFunction { // find the first windows that satisfy the condition: start + n*slide <= time < start + // n*slide + size long timeValue = input.getLong(0); - long window_start = (timeValue - start - size + slide) / slide * slide; - while (window_start <= timeValue && window_start + size > timeValue) { - properColumnBuilders.get(0).writeLong(window_start); - properColumnBuilders.get(1).writeLong(window_start + size); - passThroughIndexBuilder.writeLong(curIndex); - window_start += slide; + if (timeValue >= origin) { + long window_start = origin + (timeValue - origin - size + slide) / slide * slide; + while (window_start <= timeValue && window_start + size > timeValue) { + properColumnBuilders.get(0).writeLong(window_start); + properColumnBuilders.get(1).writeLong(window_start + size); + passThroughIndexBuilder.writeLong(curIndex); + window_start += slide; + } } curIndex++; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java index d4e4c6ad15f..19252a67df3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java @@ -60,6 +60,7 @@ public class SessionTableFunction implements TableFunction { ScalarParameterSpecification.builder() .name(TIMECOL_PARAMETER_NAME) .type(Type.STRING) + .defaultValue("time") .build(), ScalarParameterSpecification.builder().name(GAP_PARAMETER_NAME).type(Type.INT64).build()); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java index 9aa8e2167ab..b94520c8c5c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java @@ -129,12 +129,12 @@ public class TumbleTableFunction implements TableFunction { private static class TumbleDataProcessor implements TableFunctionDataProcessor { private final long size; - private final long start; + private final long origin; private long curIndex = 0; public TumbleDataProcessor(long startTime, long size) { this.size = size; - this.start = startTime; + this.origin = startTime; } @Override @@ -144,10 +144,12 @@ public class TumbleTableFunction implements TableFunction { ColumnBuilder passThroughIndexBuilder) { // find the proper window long timeValue = input.getLong(0); - long window_start = (timeValue - start) / size * size; - properColumnBuilders.get(0).writeLong(window_start); - properColumnBuilders.get(1).writeLong(window_start + size); - passThroughIndexBuilder.writeLong(curIndex); + if (timeValue >= origin) { + long windowStart = origin + (timeValue - origin) / size * size; + properColumnBuilders.get(0).writeLong(windowStart); + properColumnBuilders.get(1).writeLong(windowStart + size); + passThroughIndexBuilder.writeLong(curIndex); + } curIndex++; } }
