This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a437cd469b7 Add SLIDE parameter support to CAPACITY table-valued
function (#17456)
a437cd469b7 is described below
commit a437cd469b7c6da87cc11003ca78371c5fa2e539
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jun 9 15:50:45 2026 +0800
Add SLIDE parameter support to CAPACITY table-valued function (#17456)
---
.../relational/it/db/it/IoTDBWindowTVFIT.java | 101 ++++++++++
.../relational/tvf/CapacityTableFunction.java | 69 ++++---
.../relational/tvf/CapacityTableFunctionTest.java | 204 +++++++++++++++++++++
3 files changed, 348 insertions(+), 26 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
index 20339d4bf1f..d47800704ae 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
@@ -434,6 +434,107 @@ public class IoTDBWindowTVFIT {
expectedHeader,
retArray,
DATABASE_NAME);
+
+ // CAPACITY with SLIDE=2 (same as SIZE=2, should behave identically to no
SLIDE)
+ expectedHeader = new String[] {"window_index", "time", "stock_id",
"price", "s1"};
+ retArray =
+ new String[] {
+ "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
+ "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+ "1,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+ "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
+ "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+ "1,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY
time, SIZE => 2, SLIDE => 2) ORDER BY stock_id, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ // CAPACITY with SIZE=2, SLIDE=1 (overlapping windows)
+ expectedHeader = new String[] {"window_index", "time", "stock_id",
"price", "s1"};
+ retArray =
+ new String[] {
+ "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
+ "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+ "1,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+ "1,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+ "2,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+ "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
+ "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+ "1,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+ "1,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+ "2,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY
time, SIZE => 2, SLIDE => 1) ORDER BY stock_id, window_index, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ // CAPACITY with SIZE=3, SLIDE=2 (overlapping windows, different params)
+ expectedHeader = new String[] {"window_index", "time", "stock_id",
"price", "s1"};
+ retArray =
+ new String[] {
+ "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
+ "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+ "0,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+ "1,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+ "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
+ "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+ "0,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+ "1,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY
time, SIZE => 3, SLIDE => 2) ORDER BY stock_id, window_index, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ // CAPACITY with SIZE=2, SLIDE=3 (gap windows, some rows discarded)
+ expectedHeader = new String[] {"window_index", "time", "stock_id",
"price", "s1"};
+ retArray =
+ new String[] {
+ "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
+ "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+ "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
+ "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY
time, SIZE => 2, SLIDE => 3) ORDER BY stock_id, window_index, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ // CAPACITY with SIZE=2, SLIDE=1 + GROUP BY (verify aggregation with
overlapping windows)
+ expectedHeader = new String[] {"stock_id", "window_index", "avg"};
+ retArray =
+ new String[] {
+ "AAPL,0,101.5,",
+ "AAPL,1,102.5,",
+ "AAPL,2,102.0,",
+ "TESL,0,201.0,",
+ "TESL,1,198.5,",
+ "TESL,2,195.0,",
+ };
+ tableResultSetEqualTest(
+ "SELECT stock_id, window_index, avg(price) as avg FROM CAPACITY(DATA
=> bid PARTITION BY stock_id ORDER BY time, SIZE => 2, SLIDE => 1) GROUP BY
window_index, stock_id ORDER BY stock_id, window_index",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ // CAPACITY with negative SLIDE (error case)
+ tableAssertTestFail(
+ "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY
time, SIZE => 2, SLIDE => -1) ORDER BY stock_id, time",
+ "Invalid scalar argument SLIDE, should be a positive value",
+ DATABASE_NAME);
+
+ // CAPACITY with SLIDE=0 (error case)
+ tableAssertTestFail(
+ "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY
time, SIZE => 2, SLIDE => 0) ORDER BY stock_id, time",
+ "Invalid scalar argument SLIDE, should be a positive value",
+ DATABASE_NAME);
}
@Test
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
index 7ea8d751413..a037500c006 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
@@ -43,9 +43,12 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import static
org.apache.iotdb.udf.api.relational.table.argument.ScalarArgumentChecker.POSITIVE_LONG_CHECKER;
+
public class CapacityTableFunction implements TableFunction {
private static final String DATA_PARAMETER_NAME = "DATA";
private static final String SIZE_PARAMETER_NAME = "SIZE";
+ private static final String SLIDE_PARAMETER_NAME = "SLIDE";
@Override
public List<ParameterSpecification> getArgumentsSpecifications() {
@@ -54,7 +57,17 @@ public class CapacityTableFunction implements TableFunction {
.name(DATA_PARAMETER_NAME)
.passThroughColumns()
.build(),
-
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build());
+ ScalarParameterSpecification.builder()
+ .name(SIZE_PARAMETER_NAME)
+ .addChecker(POSITIVE_LONG_CHECKER)
+ .type(Type.INT64)
+ .build(),
+ ScalarParameterSpecification.builder()
+ .name(SLIDE_PARAMETER_NAME)
+ .addChecker(POSITIVE_LONG_CHECKER)
+ .type(Type.INT64)
+ .defaultValue(-1L)
+ .build());
}
@Override
@@ -63,8 +76,16 @@ public class CapacityTableFunction implements TableFunction {
if (size <= 0) {
throw new UDFException(CommonMessages.SIZE_MUST_BE_POSITIVE);
}
+ long slide = (long) ((ScalarArgument)
arguments.get(SLIDE_PARAMETER_NAME)).getValue();
+ // default SLIDE to SIZE when not specified (sentinel value -1)
+ if (slide == -1L) {
+ slide = size;
+ }
MapTableFunctionHandle handle =
- new MapTableFunctionHandle.Builder().addProperty(SIZE_PARAMETER_NAME,
size).build();
+ new MapTableFunctionHandle.Builder()
+ .addProperty(SIZE_PARAMETER_NAME, size)
+ .addProperty(SLIDE_PARAMETER_NAME, slide)
+ .build();
return TableFunctionAnalysis.builder()
.properColumnSchema(
new DescribedSchema.Builder().addField("window_index",
Type.INT64).build())
@@ -82,12 +103,13 @@ public class CapacityTableFunction implements
TableFunction {
@Override
public TableFunctionProcessorProvider getProcessorProvider(
TableFunctionHandle tableFunctionHandle) {
- long sz =
- (long) ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(SIZE_PARAMETER_NAME);
+ MapTableFunctionHandle handle = (MapTableFunctionHandle)
tableFunctionHandle;
+ long size = (long) handle.getProperty(SIZE_PARAMETER_NAME);
+ long slide = (long) handle.getProperty(SLIDE_PARAMETER_NAME);
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
- return new CapacityDataProcessor(sz);
+ return new CapacityDataProcessor(size, slide);
}
};
}
@@ -95,12 +117,12 @@ public class CapacityTableFunction implements
TableFunction {
private static class CapacityDataProcessor implements
TableFunctionDataProcessor {
private final long size;
- private long currentStartIndex = 0;
+ private final long slide;
private long curIndex = 0;
- private long windowIndex = 0;
- public CapacityDataProcessor(long size) {
+ public CapacityDataProcessor(long size, long slide) {
this.size = size;
+ this.slide = slide;
}
@Override
@@ -108,26 +130,21 @@ public class CapacityTableFunction implements
TableFunction {
Record input,
List<ColumnBuilder> properColumnBuilders,
ColumnBuilder passThroughIndexBuilder) {
- if (curIndex - currentStartIndex == size) {
- outputWindow(properColumnBuilders, passThroughIndexBuilder);
- currentStartIndex = curIndex;
+ // For each row at curIndex, find all windows k such that:
+ // k * slide <= curIndex < k * slide + size, and k >= 0
+ // The first valid k: max(0, ceil((curIndex - size + 1) / slide))
+ // The last valid k: floor(curIndex / slide)
+ long firstWindow = Math.max(0, (curIndex - size + slide) / slide);
+ long lastWindow = curIndex / slide;
+ for (long k = firstWindow; k <= lastWindow; k++) {
+ // Verify: k * slide <= curIndex < k * slide + size
+ long windowStart = k * slide;
+ if (windowStart <= curIndex && curIndex < windowStart + size) {
+ properColumnBuilders.get(0).writeLong(k);
+ passThroughIndexBuilder.writeLong(curIndex);
+ }
}
curIndex++;
}
-
- @Override
- public void finish(
- List<ColumnBuilder> properColumnBuilders, ColumnBuilder
passThroughIndexBuilder) {
- outputWindow(properColumnBuilders, passThroughIndexBuilder);
- }
-
- private void outputWindow(
- List<ColumnBuilder> properColumnBuilders, ColumnBuilder
passThroughIndexBuilder) {
- for (long i = currentStartIndex; i < curIndex; i++) {
- properColumnBuilders.get(0).writeLong(windowIndex);
- passThroughIndexBuilder.writeLong(i);
- }
- windowIndex++;
- }
}
}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunctionTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunctionTest.java
new file mode 100644
index 00000000000..4f40e737586
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunctionTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
+import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class CapacityTableFunctionTest {
+
+ private final CapacityTableFunction function = new CapacityTableFunction();
+
+ // ======================== analyze() tests ========================
+
+ @Test
+ public void testAnalyzeWithSlideDefault() throws UDFException {
+ Map<String, Argument> args = new HashMap<>();
+ args.put("SIZE", new ScalarArgument(Type.INT64, 5L));
+ args.put("SLIDE", new ScalarArgument(Type.INT64, -1L));
+
+ TableFunctionAnalysis analysis = function.analyze(args);
+ assertNotNull(analysis);
+ }
+
+ @Test
+ public void testAnalyzeWithExplicitSlide() throws UDFException {
+ Map<String, Argument> args = new HashMap<>();
+ args.put("SIZE", new ScalarArgument(Type.INT64, 4L));
+ args.put("SLIDE", new ScalarArgument(Type.INT64, 2L));
+
+ TableFunctionAnalysis analysis = function.analyze(args);
+ assertNotNull(analysis);
+ }
+
+ @Test(expected = UDFException.class)
+ public void testAnalyzeSizeZero() throws UDFException {
+ Map<String, Argument> args = new HashMap<>();
+ args.put("SIZE", new ScalarArgument(Type.INT64, 0L));
+ args.put("SLIDE", new ScalarArgument(Type.INT64, -1L));
+
+ function.analyze(args);
+ }
+
+ @Test(expected = UDFException.class)
+ public void testAnalyzeSizeNegative() throws UDFException {
+ Map<String, Argument> args = new HashMap<>();
+ args.put("SIZE", new ScalarArgument(Type.INT64, -3L));
+ args.put("SLIDE", new ScalarArgument(Type.INT64, -1L));
+
+ function.analyze(args);
+ }
+
+ // ======================== processor tests ========================
+
+ /**
+ * Helper: builds the processor from analyze() -> getProcessorProvider()
chain, then feeds N rows
+ * through process(). Returns captured (windowIndex, passThroughIndex) pairs.
+ */
+ private List<long[]> runProcessor(long size, long slide, int rowCount)
throws UDFException {
+ Map<String, Argument> args = new HashMap<>();
+ args.put("SIZE", new ScalarArgument(Type.INT64, size));
+ args.put("SLIDE", new ScalarArgument(Type.INT64, slide == -1 ? -1L :
slide));
+
+ TableFunctionAnalysis analysis = function.analyze(args);
+ TableFunctionHandle handle = analysis.getTableFunctionHandle();
+
+ TableFunctionProcessorProvider provider =
function.getProcessorProvider(handle);
+ TableFunctionDataProcessor processor = provider.getDataProcessor();
+
+ Record mockRecord = Mockito.mock(Record.class);
+ List<long[]> results = new ArrayList<>();
+
+ for (int i = 0; i < rowCount; i++) {
+ ArgumentCaptor<Long> windowCaptor = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<Long> indexCaptor = ArgumentCaptor.forClass(Long.class);
+
+ ColumnBuilder properBuilder = Mockito.mock(ColumnBuilder.class);
+ ColumnBuilder passThroughBuilder = Mockito.mock(ColumnBuilder.class);
+
+ processor.process(mockRecord, Collections.singletonList(properBuilder),
passThroughBuilder);
+
+ Mockito.verify(properBuilder,
Mockito.atLeast(0)).writeLong(windowCaptor.capture());
+ Mockito.verify(passThroughBuilder,
Mockito.atLeast(0)).writeLong(indexCaptor.capture());
+
+ List<Long> windows = windowCaptor.getAllValues();
+ List<Long> indices = indexCaptor.getAllValues();
+ for (int j = 0; j < windows.size(); j++) {
+ results.add(new long[] {windows.get(j), indices.get(j)});
+ }
+ }
+ return results;
+ }
+
+ @Test
+ public void testSlideEqualsSize() throws UDFException {
+ // SIZE=2, SLIDE=2 (non-overlapping), 5 rows
+ // window0: rows 0,1; window1: rows 2,3; window2: row 4
+ List<long[]> results = runProcessor(2, 2, 5);
+ long[][] expected = {{0, 0}, {0, 1}, {1, 2}, {1, 3}, {2, 4}};
+ assertResultsEqual(expected, results);
+ }
+
+ @Test
+ public void testSlideDefault() throws UDFException {
+ // SIZE=2, SLIDE=-1 (defaults to SIZE=2), 5 rows — same as above
+ List<long[]> results = runProcessor(2, -1, 5);
+ long[][] expected = {{0, 0}, {0, 1}, {1, 2}, {1, 3}, {2, 4}};
+ assertResultsEqual(expected, results);
+ }
+
+ @Test
+ public void testSlideLessThanSize() throws UDFException {
+ // SIZE=2, SLIDE=1 (overlapping), 3 rows
+ // row0: window 0
+ // row1: window 0, 1
+ // row2: window 1, 2
+ List<long[]> results = runProcessor(2, 1, 3);
+ long[][] expected = {{0, 0}, {0, 1}, {1, 1}, {1, 2}, {2, 2}};
+ assertResultsEqual(expected, results);
+ }
+
+ @Test
+ public void testSlideGreaterThanSize() throws UDFException {
+ // SIZE=2, SLIDE=3 (gap), 6 rows
+ // window0: rows 0,1; row2: gap; window1: rows 3,4; row5: gap
+ List<long[]> results = runProcessor(2, 3, 6);
+ long[][] expected = {{0, 0}, {0, 1}, {1, 3}, {1, 4}};
+ assertResultsEqual(expected, results);
+ }
+
+ @Test
+ public void testOverlappingLargeSize() throws UDFException {
+ // SIZE=3, SLIDE=2 (overlapping), 3 rows
+ // row0: window 0
+ // row1: window 0
+ // row2: window 0, 1
+ List<long[]> results = runProcessor(3, 2, 3);
+ long[][] expected = {{0, 0}, {0, 1}, {0, 2}, {1, 2}};
+ assertResultsEqual(expected, results);
+ }
+
+ @Test
+ public void testSingleRow() throws UDFException {
+ // SIZE=3, SLIDE=1, 1 row — row0 belongs to window 0 only
+ List<long[]> results = runProcessor(3, 1, 1);
+ long[][] expected = {{0, 0}};
+ assertResultsEqual(expected, results);
+ }
+
+ @Test
+ public void testGetArgumentsSpecifications() {
+ assertEquals(3, function.getArgumentsSpecifications().size());
+ }
+
+ @Test
+ public void testCreateTableFunctionHandle() {
+ assertNotNull(function.createTableFunctionHandle());
+ }
+
+ private void assertResultsEqual(long[][] expected, List<long[]> actual) {
+ assertEquals("Result count mismatch", expected.length, actual.size());
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals("Window index mismatch at position " + i, expected[i][0],
actual.get(i)[0]);
+ assertEquals("PassThrough index mismatch at position " + i,
expected[i][1], actual.get(i)[1]);
+ }
+ }
+}