This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch capacity-slide-support
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 463d524d2a131c4bd48e0c52a78e49df4adb4421
Author: JackieTien97 <[email protected]>
AuthorDate: Fri Apr 10 14:36:54 2026 +0800

    Add SLIDE parameter support to CAPACITY table-valued function
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
---
 .../relational/it/db/it/IoTDBWindowTVFIT.java      | 101 +++++++++++++++++++++
 .../relational/tvf/CapacityTableFunction.java      |  69 ++++++++------
 2 files changed, 144 insertions(+), 26 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
index 9e83c8ac41b..91d7a0f5bc4 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
@@ -434,6 +434,107 @@ public class IoTDBWindowTVFIT {
         expectedHeader,
         retArray,
         DATABASE_NAME);
+
+    // CAPACITY with SLIDE=2 (same as SIZE=2, should behave identically to no 
SLIDE)
+    expectedHeader = new String[] {"window_index", "time", "stock_id", 
"price", "s1"};
+    retArray =
+        new String[] {
+          "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
+          "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+          "1,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+          "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
+          "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+          "1,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY 
time, SIZE => 2, SLIDE => 2) ORDER BY stock_id, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    // CAPACITY with SIZE=2, SLIDE=1 (overlapping windows)
+    expectedHeader = new String[] {"window_index", "time", "stock_id", 
"price", "s1"};
+    retArray =
+        new String[] {
+          "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
+          "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+          "1,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+          "1,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+          "2,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+          "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
+          "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+          "1,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+          "1,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+          "2,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY 
time, SIZE => 2, SLIDE => 1) ORDER BY stock_id, window_index, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    // CAPACITY with SIZE=3, SLIDE=2 (overlapping windows, different params)
+    expectedHeader = new String[] {"window_index", "time", "stock_id", 
"price", "s1"};
+    retArray =
+        new String[] {
+          "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
+          "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+          "0,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+          "1,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+          "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
+          "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+          "0,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+          "1,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY 
time, SIZE => 3, SLIDE => 2) ORDER BY stock_id, window_index, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    // CAPACITY with SIZE=2, SLIDE=3 (gap windows, some rows discarded)
+    expectedHeader = new String[] {"window_index", "time", "stock_id", 
"price", "s1"};
+    retArray =
+        new String[] {
+          "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
+          "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+          "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
+          "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY 
time, SIZE => 2, SLIDE => 3) ORDER BY stock_id, window_index, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    // CAPACITY with SIZE=2, SLIDE=1 + GROUP BY (verify aggregation with 
overlapping windows)
+    expectedHeader = new String[] {"stock_id", "window_index", "avg"};
+    retArray =
+        new String[] {
+          "AAPL,0,101.5,",
+          "AAPL,1,102.5,",
+          "AAPL,2,102.0,",
+          "TESL,0,201.0,",
+          "TESL,1,198.5,",
+          "TESL,2,195.0,",
+        };
+    tableResultSetEqualTest(
+        "SELECT stock_id, window_index, avg(price) as avg FROM CAPACITY(DATA 
=> bid PARTITION BY stock_id ORDER BY time, SIZE => 2, SLIDE => 1) GROUP BY 
window_index, stock_id ORDER BY stock_id, window_index",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    // CAPACITY with negative SLIDE (error case)
+    tableAssertTestFail(
+        "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY 
time, SIZE => 2, SLIDE => -1) ORDER BY stock_id, time",
+        "Invalid scalar argument SLIDE, should be a positive value",
+        DATABASE_NAME);
+
+    // CAPACITY with SLIDE=0 (error case)
+    tableAssertTestFail(
+        "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY 
time, SIZE => 2, SLIDE => 0) ORDER BY stock_id, time",
+        "Invalid scalar argument SLIDE, should be a positive value",
+        DATABASE_NAME);
   }
 
   @Test
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
index d26ba6d8376..5d59d8cfd70 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
@@ -42,9 +42,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.iotdb.udf.api.relational.table.argument.ScalarArgumentChecker.POSITIVE_LONG_CHECKER;
+
 public class CapacityTableFunction implements TableFunction {
   private static final String DATA_PARAMETER_NAME = "DATA";
   private static final String SIZE_PARAMETER_NAME = "SIZE";
+  private static final String SLIDE_PARAMETER_NAME = "SLIDE";
 
   @Override
   public List<ParameterSpecification> getArgumentsSpecifications() {
@@ -53,7 +56,17 @@ public class CapacityTableFunction implements TableFunction {
             .name(DATA_PARAMETER_NAME)
             .passThroughColumns()
             .build(),
-        
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build());
+        ScalarParameterSpecification.builder()
+            .name(SIZE_PARAMETER_NAME)
+            .addChecker(POSITIVE_LONG_CHECKER)
+            .type(Type.INT64)
+            .build(),
+        ScalarParameterSpecification.builder()
+            .name(SLIDE_PARAMETER_NAME)
+            .addChecker(POSITIVE_LONG_CHECKER)
+            .type(Type.INT64)
+            .defaultValue(-1L)
+            .build());
   }
 
   @Override
@@ -62,8 +75,16 @@ public class CapacityTableFunction implements TableFunction {
     if (size <= 0) {
       throw new UDFException("Size must be greater than 0");
     }
+    long slide = (long) ((ScalarArgument) 
arguments.get(SLIDE_PARAMETER_NAME)).getValue();
+    // default SLIDE to SIZE when not specified (sentinel value -1)
+    if (slide == -1L) {
+      slide = size;
+    }
     MapTableFunctionHandle handle =
-        new MapTableFunctionHandle.Builder().addProperty(SIZE_PARAMETER_NAME, 
size).build();
+        new MapTableFunctionHandle.Builder()
+            .addProperty(SIZE_PARAMETER_NAME, size)
+            .addProperty(SLIDE_PARAMETER_NAME, slide)
+            .build();
     return TableFunctionAnalysis.builder()
         .properColumnSchema(
             new DescribedSchema.Builder().addField("window_index", 
Type.INT64).build())
@@ -81,12 +102,13 @@ public class CapacityTableFunction implements 
TableFunction {
   @Override
   public TableFunctionProcessorProvider getProcessorProvider(
       TableFunctionHandle tableFunctionHandle) {
-    long sz =
-        (long) ((MapTableFunctionHandle) 
tableFunctionHandle).getProperty(SIZE_PARAMETER_NAME);
+    MapTableFunctionHandle handle = (MapTableFunctionHandle) 
tableFunctionHandle;
+    long size = (long) handle.getProperty(SIZE_PARAMETER_NAME);
+    long slide = (long) handle.getProperty(SLIDE_PARAMETER_NAME);
     return new TableFunctionProcessorProvider() {
       @Override
       public TableFunctionDataProcessor getDataProcessor() {
-        return new CapacityDataProcessor(sz);
+        return new CapacityDataProcessor(size, slide);
       }
     };
   }
@@ -94,12 +116,12 @@ public class CapacityTableFunction implements 
TableFunction {
   private static class CapacityDataProcessor implements 
TableFunctionDataProcessor {
 
     private final long size;
-    private long currentStartIndex = 0;
+    private final long slide;
     private long curIndex = 0;
-    private long windowIndex = 0;
 
-    public CapacityDataProcessor(long size) {
+    public CapacityDataProcessor(long size, long slide) {
       this.size = size;
+      this.slide = slide;
     }
 
     @Override
@@ -107,26 +129,21 @@ public class CapacityTableFunction implements 
TableFunction {
         Record input,
         List<ColumnBuilder> properColumnBuilders,
         ColumnBuilder passThroughIndexBuilder) {
-      if (curIndex - currentStartIndex == size) {
-        outputWindow(properColumnBuilders, passThroughIndexBuilder);
-        currentStartIndex = curIndex;
+      // For each row at curIndex, find all windows k such that:
+      //   k * slide <= curIndex < k * slide + size, and k >= 0
+      // The first valid k: max(0, ceil((curIndex - size + 1) / slide))
+      // The last valid k: floor(curIndex / slide)
+      long firstWindow = Math.max(0, (curIndex - size + slide) / slide);
+      long lastWindow = curIndex / slide;
+      for (long k = firstWindow; k <= lastWindow; k++) {
+        // Verify: k * slide <= curIndex < k * slide + size
+        long windowStart = k * slide;
+        if (windowStart <= curIndex && curIndex < windowStart + size) {
+          properColumnBuilders.get(0).writeLong(k);
+          passThroughIndexBuilder.writeLong(curIndex);
+        }
       }
       curIndex++;
     }
-
-    @Override
-    public void finish(
-        List<ColumnBuilder> properColumnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
-      outputWindow(properColumnBuilders, passThroughIndexBuilder);
-    }
-
-    private void outputWindow(
-        List<ColumnBuilder> properColumnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
-      for (long i = currentStartIndex; i < curIndex; i++) {
-        properColumnBuilders.get(0).writeLong(windowIndex);
-        passThroughIndexBuilder.writeLong(i);
-      }
-      windowIndex++;
-    }
   }
 }

Reply via email to