This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c131da2 [IOTDB-1254] Trigger module: windowing utility (#2891)
c131da2 is described below
commit c131da2acd85a19cbdd42b8cf848aee567d63ed9
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Apr 19 19:20:08 2021 +0800
[IOTDB-1254] Trigger module: windowing utility (#2891)
Trigger module: windowing utility
---
.../{UDF => Advanced-Features}/Triggers.md | 181 ++++++++++++++++-
.../UDF-User-Defined-Function.md | 0
.../DML-Data-Manipulation-Language.md | 2 +-
.../{UDF => Advanced-Features}/Triggers.md | 182 ++++++++++++++++-
.../UDF-User-Defined-Function.md | 0
.../DML-Data-Manipulation-Language.md | 2 +-
server/pom.xml | 6 +
.../resources/conf/iotdb-engine.properties | 7 +
.../org/apache/iotdb/db/concurrent/ThreadName.java | 5 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 25 +++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 18 ++
.../iotdb/db/utils/windowing/api/Evaluator.java | 32 +++
.../iotdb/db/utils/windowing/api/Window.java | 70 +++++++
.../windowing/configuration/Configuration.java | 38 ++++
.../SlidingSizeWindowConfiguration.java | 61 ++++++
.../SlidingTimeWindowConfiguration.java | 61 ++++++
.../windowing/exception/WindowingException.java | 31 +++
.../SlidingSizeWindowEvaluationHandler.java | 58 ++++++
.../SlidingTimeWindowEvaluationHandler.java | 78 +++++++
.../handler/SlidingWindowEvaluationHandler.java | 124 +++++++++++
.../windowing/runtime/WindowEvaluationTask.java | 44 ++++
.../runtime/WindowEvaluationTaskPoolManager.java | 99 +++++++++
.../utils/windowing/window/EvictableBatchList.java | 173 ++++++++++++++++
.../db/utils/windowing/window/WindowImpl.java | 204 +++++++++++++++++++
.../SlidingSizeWindowEvaluationHandlerTest.java | 184 +++++++++++++++++
.../SlidingTimeWindowEvaluationHandlerTest.java | 226 +++++++++++++++++++++
site/src/main/.vuepress/config.js | 46 +----
27 files changed, 1915 insertions(+), 42 deletions(-)
diff --git a/docs/UserGuide/UDF/Triggers.md
b/docs/UserGuide/Advanced-Features/Triggers.md
similarity index 66%
rename from docs/UserGuide/UDF/Triggers.md
rename to docs/UserGuide/Advanced-Features/Triggers.md
index d084d0c..a4f4df8 100644
--- a/docs/UserGuide/UDF/Triggers.md
+++ b/docs/UserGuide/Advanced-Features/Triggers.md
@@ -310,7 +310,186 @@ For more information, refer to [Authority Management
Statement](../Operation%20M
-### Important Notes
+## Utilities
+
+Utility classes provide programming paradigms and execution frameworks for the
common requirements, which can simplify part of your work of implementing
triggers.
+
+
+
+### Windowing Utility
+
+The windowing utility can help you define sliding windows and the data
processing logic on them. It can construct two types of sliding windows: one
has a fixed time interval (`SlidingTimeWindowEvaluationHandler`), and the other
has fixed number of data points (`SlidingSizeWindowEvaluationHandler`).
+
+The windowing utility allows you to define a hook (`Evaluator`) on the window
(`Window`). Whenever a new window is formed, the hook you defined will be
called once. You can define any data processing-related logic in the hook. The
call of the hook is asynchronous. Therefore, the current thread will not be
blocked when the window processing logic is executed.
+
+It is worth noting that whether it is `SlidingTimeWindowEvaluationHandler` or
`SlidingSizeWindowEvaluationHandler`, **they can only handle sequences with
strictly monotonically increasing timestamps**, and incoming data points that
do not meet the requirements will be discarded.
+
+For the definition of `Window` and `Evaluator`, please refer to the
`org.apache.iotdb.db.utils.windowing.api` package.
+
+
+
+#### SlidingSizeWindowEvaluationHandler
+
+##### Window Construction
+
+There are two construction methods.
+
+The first method requires you to provide the type of data points that the
window collects, the window size, the sliding step, and a hook (`Evaluator`).
+
+``` java
+final TSDataType dataType = TSDataType.INT32;
+final int windowSize = 10;
+final int slidingStep = 5;
+
+SlidingSizeWindowEvaluationHandler handler =
+ new SlidingSizeWindowEvaluationHandler(
+ new SlidingSizeWindowConfiguration(dataType, windowSize, slidingStep),
+ window -> {
+ // do something
+ });
+```
+
+The second method requires you to provide the type of data points that the
window collects, the window size, and a hook (`Evaluator`). The sliding step is
equal to the window size by default.
+
+``` java
+final TSDataType dataType = TSDataType.INT32;
+final int windowSize = 10;
+
+SlidingSizeWindowEvaluationHandler handler =
+ new SlidingSizeWindowEvaluationHandler(
+ new SlidingSizeWindowConfiguration(dataType, windowSize),
+ window -> {
+ // do something
+ });
+```
+
+The window size and the sliding step must be positive.
+
+
+
+##### Datapoint Collection
+
+``` java
+final long timestamp = 0;
+final int value = 0;
+hander.collect(timestamp, value);
+```
+
+Note that the type of the second parameter accepted by the `collect` method
needs to be consistent with the `dataType` parameter provided during
construction.
+
+In addition, the `collect` method will only respond to data points whose
timestamps are monotonically increasing. If the time stamp of the data point
collected by the `collect` method is less than or equal to the time stamp of
the data point collected by the previous `collect` method call, the data point
collected this time will be discarded.
+
+Also note that the `collect` method is not thread-safe.
+
+
+
+#### SlidingTimeWindowEvaluationHandler
+
+##### Window Construction
+
+There are two construction methods.
+
+The first method requires you to provide the type of data points that the
window collects, the time interval, the sliding step, and a hook (`Evaluator`).
+
+``` java
+final TSDataType dataType = TSDataType.INT32;
+final long timeInterval = 1000;
+final long slidingStep = 500;
+
+SlidingTimeWindowEvaluationHandler handler =
+ new SlidingTimeWindowEvaluationHandler(
+ new SlidingTimeWindowConfiguration(dataType, timeInterval,
slidingStep),
+ window -> {
+ // do something
+ });
+```
+
+The second method requires you to provide the type of data points that the
window collects, the time interval, and a hook (`Evaluator`). The sliding step
is equal to the time interval by default.
+
+``` java
+final TSDataType dataType = TSDataType.INT32;
+final long timeInterval = 1000;
+
+SlidingTimeWindowEvaluationHandler handler =
+ new SlidingTimeWindowEvaluationHandler(
+ new SlidingTimeWindowConfiguration(dataType, timeInterval),
+ window -> {
+ // do something
+ });
+```
+
+The time interval and the sliding step must be positive.
+
+
+
+##### Datapoint Collection
+
+``` java
+final long timestamp = 0;
+final int value = 0;
+hander.collect(timestamp, value);
+```
+
+Note that the type of the second parameter accepted by the `collect` method
needs to be consistent with the `dataType` parameter provided during
construction.
+
+In addition, the `collect` method will only respond to data points whose
timestamps are monotonically increasing. If the time stamp of the data point
collected by the `collect` method is less than or equal to the time stamp of
the data point collected by the previous `collect` method call, the data point
collected this time will be discarded.
+
+Also note that the `collect` method is not thread-safe.
+
+
+
+#### Rejection Policy
+
+The execution of window evaluation tasks is asynchronous.
+
+When asynchronous tasks cannot be consumed by the execution thread pool in
time, tasks will accumulate. In extreme cases, the accumulation of asynchronous
tasks can cause the system OOM. Therefore, the number of tasks that the window
evaluation thread pool allows to accumulate is set to a finite value.
+
+When the number of accumulated tasks exceeds the limit, the newly submitted
tasks will not be able to enter the thread pool for execution. At this time,
the system will call the rejection policy hook `onRejection` that you have
implemented in the listening hook (`Evaluator`) for processing.
+
+The default behavior of `onRejection` is as follows.
+
+````java
+default void onRejection(Window window) {
+ throw new RejectedExecutionException();
+}
+````
+
+The way to implement a rejection strategy hook is as follows.
+
+```java
+SlidingTimeWindowEvaluationHandler handler =
+ new SlidingTimeWindowEvaluationHandler(
+ new SlidingTimeWindowConfiguration(TSDataType.INT32, 1, 1),
+ new Evaluator() {
+ @Override
+ public void evaluate(Window window) {
+ // do something
+ }
+
+ @Override
+ public void onRejection(Window window) {
+ // do something
+ }
+ });
+```
+
+
+
+#### Configurable Properties
+
+##### concurrent_window_evaluation_thread
+
+The number of threads that can be used for evaluating sliding windows. The
value is equals to CPU core number by default.
+
+
+
+##### max_pending_window_evaluation_tasks
+
+The maximum number of window evaluation tasks that can be pending for
execution. The value is 64 by default.
+
+
+
+## Important Notes
* The trigger is implemented based on the reflection mechanism. Triggers can
be dynamically registered and dropped without restarting the server.
diff --git a/docs/UserGuide/UDF/UDF-User-Defined-Function.md
b/docs/UserGuide/Advanced-Features/UDF-User-Defined-Function.md
similarity index 100%
rename from docs/UserGuide/UDF/UDF-User-Defined-Function.md
rename to docs/UserGuide/Advanced-Features/UDF-User-Defined-Function.md
diff --git
a/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
b/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
index c7540d2..8128b42 100644
--- a/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
+++ b/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
@@ -372,7 +372,7 @@ It costs 0.014s
#### User Defined Timeseries Generating Functions
-Please refer to [UDF (User Defined
Function)](../UDF/UDF-User-Defined-Function.md).
+Please refer to [UDF (User Defined
Function)](../Advanced-Features/UDF-User-Defined-Function.md).
### Aggregate Query
diff --git a/docs/zh/UserGuide/UDF/Triggers.md
b/docs/zh/UserGuide/Advanced-Features/Triggers.md
similarity index 68%
rename from docs/zh/UserGuide/UDF/Triggers.md
rename to docs/zh/UserGuide/Advanced-Features/Triggers.md
index 4668f5a..3c4ac28 100644
--- a/docs/zh/UserGuide/UDF/Triggers.md
+++ b/docs/zh/UserGuide/Advanced-Features/Triggers.md
@@ -326,7 +326,187 @@ SHOW TRIGGERS
-### 重要注意事项
+
+## 实用工具类
+
+实用工具类为常见的需求提供了编程范式和执行框架,它能够简化您编写触发器的一部分工作。
+
+
+
+### 窗口工具类
+
+窗口工具类能够辅助您定义滑动窗口以及窗口上的数据处理逻辑。它能够构造两类滑动窗口:一种滑动窗口是固定窗口内时间长度的(`SlidingTimeWindowEvaluationHandler`),另一种滑动窗口是固定窗口内数据点数的(`SlidingSizeWindowEvaluationHandler`)。
+
+窗口工具类允许您在窗口(`Window`)上定义侦听钩子(`Evaluator`)。每当一个新的窗口形成,您定义的侦听钩子就会被调用一次。您可以在这个侦听钩子内定义任何数据处理相关的逻辑。侦听钩子的调用是异步的,因此,在执行钩子内窗口处理逻辑的时候,是不会阻塞当前线程的。
+
+值得注意的是,不论是`SlidingTimeWindowEvaluationHandler`还是`SlidingSizeWindowEvaluationHandler`,他们都**只能够处理时间戳严格单调递增的序列**,传入的不符合要求的数据点会被工具类抛弃。
+
+`Window`与`Evaluator`接口的定义见`org.apache.iotdb.db.utils.windowing.api`包。
+
+
+
+#### 固定窗口内数据点数的滑动窗口
+
+##### 窗口构造
+
+共两种构造方法。
+
+第一种方法需要您提供窗口接受数据点的类型、窗口大小、滑动步长和一个侦听钩子(`Evaluator`)。
+
+``` java
+final TSDataType dataType = TSDataType.INT32;
+final int windowSize = 10;
+final int slidingStep = 5;
+
+SlidingSizeWindowEvaluationHandler handler =
+ new SlidingSizeWindowEvaluationHandler(
+ new SlidingSizeWindowConfiguration(dataType, windowSize, slidingStep),
+ window -> {
+ // do something
+ });
+```
+
+第二种方法需要您提供窗口接受数据点的类型、窗口大小和一个侦听钩子(`Evaluator`)。这种构造方法下的窗口滑动步长等于窗口大小。
+
+``` java
+final TSDataType dataType = TSDataType.INT32;
+final int windowSize = 10;
+
+SlidingSizeWindowEvaluationHandler handler =
+ new SlidingSizeWindowEvaluationHandler(
+ new SlidingSizeWindowConfiguration(dataType, windowSize),
+ window -> {
+ // do something
+ });
+```
+
+窗口大小、滑动步长必须为正数。
+
+
+
+##### 数据接收
+
+``` java
+final long timestamp = 0;
+final int value = 0;
+hander.collect(timestamp, value);
+```
+
+注意,`collect`方法接受的第二个参数类型需要与构造时传入的`dataType`声明一致。
+
+此外,`collect`方法只会对时间戳是单调递增的数据点产生响应。如果某一次`collect`方法采集到的数据点的时间戳小于等于上一次`collect`方法采集到的数据点时间戳,那么这一次采集的数据点将会被抛弃。
+
+还需要注意的是,`collect`方法不是线程安全的。
+
+
+
+#### 固定窗口内时间长度的滑动窗口
+
+##### 窗口构造
+
+共两种构造方法。
+
+第一种方法需要您提供窗口接受数据点的类型、窗口内时间长度、滑动步长和一个侦听钩子(`Evaluator`)。
+
+``` java
+final TSDataType dataType = TSDataType.INT32;
+final long timeInterval = 1000;
+final long slidingStep = 500;
+
+SlidingTimeWindowEvaluationHandler handler =
+ new SlidingTimeWindowEvaluationHandler(
+ new SlidingTimeWindowConfiguration(dataType, timeInterval,
slidingStep),
+ window -> {
+ // do something
+ });
+```
+
+第二种方法需要您提供窗口接受数据点的类型、窗口内时间长度和一个侦听钩子(`Evaluator`)。这种构造方法下的窗口滑动步长等于窗口内时间长度。
+
+``` java
+final TSDataType dataType = TSDataType.INT32;
+final long timeInterval = 1000;
+
+SlidingTimeWindowEvaluationHandler handler =
+ new SlidingTimeWindowEvaluationHandler(
+ new SlidingTimeWindowConfiguration(dataType, timeInterval),
+ window -> {
+ // do something
+ });
+```
+
+窗口内时间长度、滑动步长必须为正数。
+
+
+
+##### 数据接收
+
+``` java
+final long timestamp = 0;
+final int value = 0;
+hander.collect(timestamp, value);
+```
+
+注意,`collect`方法接受的第二个参数类型需要与构造时传入的`dataType`声明一致。
+
+此外,`collect`方法只会对时间戳是单调递增的数据点产生响应。如果某一次`collect`方法采集到的数据点的时间戳小于等于上一次`collect`方法采集到的数据点时间戳,那么这一次采集的数据点将会被抛弃。
+
+还需要注意的是,`collect`方法不是线程安全的。
+
+
+
+#### 拒绝策略
+
+窗口计算的任务执行是异步的。
+
+当异步任务无法被执行线程池及时消费时,会产生任务堆积。在极端情况下,异步任务的堆积会导致系统OOM。因此,窗口计算线程池允许堆积的任务数量被设定为有限值。
+
+当堆积的任务数量超出限值时,新提交的任务将无法进入线程池执行,此时,系统会调用您在侦听钩子(`Evaluator`)中制定的拒绝策略钩子`onRejection`进行处理。
+
+`onRejection`的默认行为如下。
+
+````java
+default void onRejection(Window window) {
+ throw new RejectedExecutionException();
+}
+````
+
+制定拒绝策略钩子的方式如下。
+
+```java
+SlidingTimeWindowEvaluationHandler handler =
+ new SlidingTimeWindowEvaluationHandler(
+ new SlidingTimeWindowConfiguration(TSDataType.INT32, 1, 1),
+ new Evaluator() {
+ @Override
+ public void evaluate(Window window) {
+ // do something
+ }
+
+ @Override
+ public void onRejection(Window window) {
+ // do something
+ }
+ });
+```
+
+
+
+#### 配置参数
+
+##### concurrent_window_evaluation_thread
+
+窗口计算线程池的默认线程数。默认为CPU核数。
+
+
+
+##### max_pending_window_evaluation_tasks
+
+最多允许堆积的窗口计算任务。默认为64个。
+
+
+
+## 重要注意事项
* 触发器是通过反射技术动态装载的,因此您在装载过程中无需启停服务器。
diff --git a/docs/zh/UserGuide/UDF/UDF-User-Defined-Function.md
b/docs/zh/UserGuide/Advanced-Features/UDF-User-Defined-Function.md
similarity index 100%
rename from docs/zh/UserGuide/UDF/UDF-User-Defined-Function.md
rename to docs/zh/UserGuide/Advanced-Features/UDF-User-Defined-Function.md
diff --git
a/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
b/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
index d4a4e74..adfe72e 100644
--- a/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
+++ b/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
@@ -1355,7 +1355,7 @@ It costs 0.014s
* 自定义序列生成函数
-请参考 [UDF (用户定义函数)](../UDF/UDF-User-Defined-Function.md)。
+请参考 [UDF (用户定义函数)](../Advanced-Features/UDF-User-Defined-Function.md)。
#### 错误处理
diff --git a/server/pom.xml b/server/pom.xml
index dee952f..fecd1d9 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -157,6 +157,12 @@
<artifactId>powermock-api-mockito2</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>4.0.3</version>
+ <scope>test</scope>
+ </dependency>
<!-- compile group: 'io.jsonwebtoken', name: 'jjwt', version:
'0.9.1'-->
<dependency>
<groupId>io.jsonwebtoken</groupId>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 25ee617..e9c3e50 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -654,6 +654,13 @@ tlog_buffer_size=1048576
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# trigger_root_dir=ext/trigger
+# How many threads can be used for evaluating sliding windows. When <= 0, use
CPU core number.
+concurrent_window_evaluation_thread=0
+
+# Max number of window evaluation tasks that can be pending for execution.
When <= 0, the value is
+# 64 by default.
+max_pending_window_evaluation_tasks = 64
+
####################
### Index Configuration
####################
diff --git
a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 5f83c12..5f5c093 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -43,9 +43,10 @@ public enum ThreadName {
SYNC_MONITOR("Sync-Monitor"),
LOAD_TSFILE("Load-TsFile"),
TIME_COST_STATISTIC("TIME_COST_STATISTIC"),
- QUERY_SERVICE("Query");
+ QUERY_SERVICE("Query"),
+ WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager");
- private String name;
+ private final String name;
ThreadName(String name) {
this.name = name;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a089b2d..92d1cf3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -247,6 +247,15 @@ public class IoTDBConfig {
/** How many threads can concurrently query. When <= 0, use CPU core number.
*/
private int concurrentQueryThread =
Runtime.getRuntime().availableProcessors();
+ /** How many threads can concurrently evaluate windows. When <= 0, use CPU
core number. */
+ private int concurrentWindowEvaluationThread =
Runtime.getRuntime().availableProcessors();
+
+ /**
+ * Max number of window evaluation tasks that can be pending for execution.
When <= 0, the value
+ * is 64 by default.
+ */
+ private int maxPendingWindowEvaluationTasks = 64;
+
/** Is the write mem control for writing enable. */
private boolean enableMemControl = true;
@@ -1047,6 +1056,22 @@ public class IoTDBConfig {
this.concurrentQueryThread = concurrentQueryThread;
}
+ public int getConcurrentWindowEvaluationThread() {
+ return concurrentWindowEvaluationThread;
+ }
+
+ public void setConcurrentWindowEvaluationThread(int
concurrentWindowEvaluationThread) {
+ this.concurrentWindowEvaluationThread = concurrentWindowEvaluationThread;
+ }
+
+ public int getMaxPendingWindowEvaluationTasks() {
+ return maxPendingWindowEvaluationTasks;
+ }
+
+ public void setMaxPendingWindowEvaluationTasks(int
maxPendingWindowEvaluationTasks) {
+ this.maxPendingWindowEvaluationTasks = maxPendingWindowEvaluationTasks;
+ }
+
public long getTsFileSizeThreshold() {
return tsFileSizeThreshold;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 237d56b..988f865 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -646,6 +646,24 @@ public class IoTDBDescriptor {
properties.getProperty(
"virtual_storage_group_num",
String.valueOf(conf.getVirtualStorageGroupNum()))));
+ conf.setConcurrentWindowEvaluationThread(
+ Integer.parseInt(
+ properties.getProperty(
+ "concurrent_window_evaluation_thread",
+
Integer.toString(conf.getConcurrentWindowEvaluationThread()))));
+ if (conf.getConcurrentWindowEvaluationThread() <= 0) {
+
conf.setConcurrentWindowEvaluationThread(Runtime.getRuntime().availableProcessors());
+ }
+
+ conf.setMaxPendingWindowEvaluationTasks(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_pending_window_evaluation_tasks",
+
Integer.toString(conf.getMaxPendingWindowEvaluationTasks()))));
+ if (conf.getMaxPendingWindowEvaluationTasks() <= 0) {
+ conf.setMaxPendingWindowEvaluationTasks(64);
+ }
+
// mqtt
if (properties.getProperty(IoTDBConstant.MQTT_HOST_NAME) != null) {
conf.setMqttHost(properties.getProperty(IoTDBConstant.MQTT_HOST_NAME));
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/windowing/api/Evaluator.java
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/api/Evaluator.java
new file mode 100644
index 0000000..7e2047a
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/api/Evaluator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.windowing.api;
+
+import java.util.concurrent.RejectedExecutionException;
+
+@FunctionalInterface
+public interface Evaluator {
+
+ void evaluate(Window window);
+
+ default void onRejection(Window window) {
+ throw new RejectedExecutionException();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/windowing/api/Window.java
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/api/Window.java
new file mode 100644
index 0000000..e1860cf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/windowing/api/Window.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.windowing.api;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public interface Window {
+
+ int size();
+
+ TSDataType getDataType();
+
+ long getTime(int index);
+
+ int getInt(int index);
+
+ long getLong(int index);
+
+ float getFloat(int index);
+
+ double getDouble(int index);
+
+ boolean getBoolean(int index);
+
+ Binary getBinary(int index);
+
+ long[] getTimeArray();
+
+ int[] getIntArray();
+
+ long[] getLongArray();
+
+ float[] getFloatArray();
+
+ double[] getDoubleArray();
+
+ boolean[] getBooleanArray();
+
+ Binary[] getBinaryArray();
+
+ void setInt(int index, int value);
+
+ void setLong(int index, long value);
+
+ void setFloat(int index, float value);
+
+ void setDouble(int index, double value);
+
+ void setBoolean(int index, boolean value);
+
+ void setBinary(int index, Binary value);
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/windowing/configuration/Configuration.java
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/configuration/Configuration.java
new file mode 100644
index 0000000..9a11886
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/configuration/Configuration.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.windowing.configuration;
+
+import org.apache.iotdb.db.utils.windowing.exception.WindowingException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public abstract class Configuration {
+
+ private final TSDataType dataType;
+
+ protected Configuration(TSDataType dataType) {
+ this.dataType = dataType;
+ }
+
+ public TSDataType getDataType() {
+ return dataType;
+ }
+
+ public abstract void check() throws WindowingException;
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/windowing/configuration/SlidingSizeWindowConfiguration.java
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/configuration/SlidingSizeWindowConfiguration.java
new file mode 100644
index 0000000..2a0dc81
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/configuration/SlidingSizeWindowConfiguration.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.windowing.configuration;
+
+import org.apache.iotdb.db.utils.windowing.exception.WindowingException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class SlidingSizeWindowConfiguration extends Configuration {
+
+ private final int windowSize;
+ private final int slidingStep;
+
+ public SlidingSizeWindowConfiguration(TSDataType dataType, int windowSize,
int slidingStep) {
+ super(dataType);
+ this.windowSize = windowSize;
+ this.slidingStep = slidingStep;
+ }
+
+ public SlidingSizeWindowConfiguration(TSDataType dataType, int windowSize) {
+ super(dataType);
+ this.windowSize = windowSize;
+ this.slidingStep = windowSize;
+ }
+
+ @Override
+ public void check() throws WindowingException {
+ if (windowSize <= 0) {
+ throw new WindowingException(
+ String.format("Parameter windowSize(%d) should be positive.",
windowSize));
+ }
+ if (slidingStep <= 0) {
+ throw new WindowingException(
+ String.format("Parameter slidingStep(%d) should be positive.",
slidingStep));
+ }
+ }
+
+ public int getWindowSize() {
+ return windowSize;
+ }
+
+ public int getSlidingStep() {
+ return slidingStep;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/windowing/configuration/SlidingTimeWindowConfiguration.java
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/configuration/SlidingTimeWindowConfiguration.java
new file mode 100644
index 0000000..9b8b01a
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/configuration/SlidingTimeWindowConfiguration.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.windowing.configuration;
+
+import org.apache.iotdb.db.utils.windowing.exception.WindowingException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class SlidingTimeWindowConfiguration extends Configuration {
+
+ private final long timeInterval;
+ private final long slidingStep;
+
+ public SlidingTimeWindowConfiguration(TSDataType dataType, long
timeInterval, long slidingStep) {
+ super(dataType);
+ this.timeInterval = timeInterval;
+ this.slidingStep = slidingStep;
+ }
+
+ public SlidingTimeWindowConfiguration(TSDataType dataType, long
timeInterval) {
+ super(dataType);
+ this.timeInterval = timeInterval;
+ this.slidingStep = timeInterval;
+ }
+
+ @Override
+ public void check() throws WindowingException {
+ if (timeInterval <= 0) {
+ throw new WindowingException(
+ String.format("Parameter timeInterval(%d) should be positive.",
timeInterval));
+ }
+ if (slidingStep <= 0) {
+ throw new WindowingException(
+ String.format("Parameter slidingStep(%d) should be positive.",
slidingStep));
+ }
+ }
+
+ public long getTimeInterval() {
+ return timeInterval;
+ }
+
+ public long getSlidingStep() {
+ return slidingStep;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/windowing/exception/WindowingException.java
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/exception/WindowingException.java
new file mode 100644
index 0000000..1dcfefd
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/exception/WindowingException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.windowing.exception;
+
+public class WindowingException extends Exception {
+
+ public WindowingException(String message) {
+ super(message);
+ }
+
+ public WindowingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/windowing/handler/SlidingSizeWindowEvaluationHandler.java
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/handler/SlidingSizeWindowEvaluationHandler.java
new file mode 100644
index 0000000..7cca0e6
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/handler/SlidingSizeWindowEvaluationHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.windowing.handler;
+
+import org.apache.iotdb.db.utils.windowing.api.Evaluator;
+import
org.apache.iotdb.db.utils.windowing.configuration.SlidingSizeWindowConfiguration;
+import org.apache.iotdb.db.utils.windowing.exception.WindowingException;
+import org.apache.iotdb.db.utils.windowing.runtime.WindowEvaluationTask;
+import org.apache.iotdb.db.utils.windowing.window.WindowImpl;
+
+public class SlidingSizeWindowEvaluationHandler extends
SlidingWindowEvaluationHandler {
+
+ private final int windowSize;
+ private final int slidingStep;
+
+ private int nextTriggerPoint;
+
+ public SlidingSizeWindowEvaluationHandler(
+ SlidingSizeWindowConfiguration configuration, Evaluator evaluator)
throws WindowingException {
+ super(configuration, evaluator);
+
+ windowSize = configuration.getWindowSize();
+ slidingStep = configuration.getSlidingStep();
+
+ nextTriggerPoint = windowSize;
+ }
+
+ @Override
+ protected void createEvaluationTaskIfNecessary(long timestamp) {
+ if (data.size() != nextTriggerPoint) {
+ return;
+ }
+
+ TASK_POOL_MANAGER.submit(
+ new WindowEvaluationTask(
+ evaluator, new WindowImpl(data, nextTriggerPoint - windowSize,
windowSize)));
+ data.setEvictionUpperBound(nextTriggerPoint - windowSize + 1);
+
+ nextTriggerPoint += slidingStep;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/windowing/handler/SlidingTimeWindowEvaluationHandler.java
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/handler/SlidingTimeWindowEvaluationHandler.java
new file mode 100644
index 0000000..51698ad
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/handler/SlidingTimeWindowEvaluationHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.windowing.handler;
+
+import org.apache.iotdb.db.utils.windowing.api.Evaluator;
+import
org.apache.iotdb.db.utils.windowing.configuration.SlidingTimeWindowConfiguration;
+import org.apache.iotdb.db.utils.windowing.exception.WindowingException;
+import org.apache.iotdb.db.utils.windowing.runtime.WindowEvaluationTask;
+import org.apache.iotdb.db.utils.windowing.window.WindowImpl;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+public class SlidingTimeWindowEvaluationHandler extends
SlidingWindowEvaluationHandler {
+
+ private final long timeInterval;
+ private final long slidingStep;
+
+ private final Queue<Integer> windowBeginIndexQueue;
+
+ /** window: [begin, end) */
+ private long currentWindowEndTime;
+
+ /** window: [begin, end) */
+ private long nextWindowBeginTime;
+
+ public SlidingTimeWindowEvaluationHandler(
+ SlidingTimeWindowConfiguration configuration, Evaluator evaluator)
throws WindowingException {
+ super(configuration, evaluator);
+
+ timeInterval = configuration.getTimeInterval();
+ slidingStep = configuration.getSlidingStep();
+
+ windowBeginIndexQueue = new LinkedList<>();
+ }
+
+ @Override
+ protected void createEvaluationTaskIfNecessary(long timestamp) {
+ if (data.size() == 1) {
+ windowBeginIndexQueue.add(0);
+ currentWindowEndTime = timestamp + timeInterval;
+ nextWindowBeginTime = timestamp + slidingStep;
+ return;
+ }
+
+ while (nextWindowBeginTime <= timestamp) {
+ windowBeginIndexQueue.add(data.size() - 1);
+ nextWindowBeginTime += slidingStep;
+ }
+
+ while (currentWindowEndTime <= timestamp) {
+ int windowBeginIndex = windowBeginIndexQueue.remove();
+ TASK_POOL_MANAGER.submit(
+ new WindowEvaluationTask(
+ evaluator,
+ new WindowImpl(data, windowBeginIndex, data.size() - 1 -
windowBeginIndex)));
+ data.setEvictionUpperBound(windowBeginIndex);
+ currentWindowEndTime += slidingStep;
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/windowing/handler/SlidingWindowEvaluationHandler.java
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/handler/SlidingWindowEvaluationHandler.java
new file mode 100644
index 0000000..b7a4c43
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/handler/SlidingWindowEvaluationHandler.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.windowing.handler;
+
+import org.apache.iotdb.db.utils.windowing.api.Evaluator;
+import org.apache.iotdb.db.utils.windowing.configuration.Configuration;
+import org.apache.iotdb.db.utils.windowing.exception.WindowingException;
+import
org.apache.iotdb.db.utils.windowing.runtime.WindowEvaluationTaskPoolManager;
+import org.apache.iotdb.db.utils.windowing.window.EvictableBatchList;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public abstract class SlidingWindowEvaluationHandler {
+
+ protected static final WindowEvaluationTaskPoolManager TASK_POOL_MANAGER =
+ WindowEvaluationTaskPoolManager.getInstance();
+
+ protected final Configuration configuration;
+ protected final Evaluator evaluator;
+
+ protected final EvictableBatchList data;
+
+ private long maxTime;
+
+ protected SlidingWindowEvaluationHandler(Configuration configuration,
Evaluator evaluator)
+ throws WindowingException {
+ this.configuration = configuration;
+ this.evaluator = evaluator;
+
+ configuration.check();
+
+ data = new EvictableBatchList(configuration.getDataType());
+
+ maxTime = Long.MIN_VALUE;
+ }
+
+ protected abstract void createEvaluationTaskIfNecessary(long timestamp);
+
+ public final void collect(long timestamp, int value) {
+ if (timestamp <= maxTime) {
+ return;
+ }
+ maxTime = timestamp;
+
+ data.putInt(timestamp, value);
+ createEvaluationTaskIfNecessary(timestamp);
+ }
+
+ public final void collect(long timestamp, long value) {
+ if (timestamp <= maxTime) {
+ return;
+ }
+ maxTime = timestamp;
+
+ data.putLong(timestamp, value);
+ createEvaluationTaskIfNecessary(timestamp);
+ }
+
+ public final void collect(long timestamp, float value) {
+ if (timestamp <= maxTime) {
+ return;
+ }
+ maxTime = timestamp;
+
+ data.putFloat(timestamp, value);
+ createEvaluationTaskIfNecessary(timestamp);
+ }
+
+ public final void collect(long timestamp, double value) {
+ if (timestamp <= maxTime) {
+ return;
+ }
+ maxTime = timestamp;
+
+ data.putDouble(timestamp, value);
+ createEvaluationTaskIfNecessary(timestamp);
+ }
+
+ public final void collect(long timestamp, boolean value) {
+ if (timestamp <= maxTime) {
+ return;
+ }
+ maxTime = timestamp;
+
+ data.putBoolean(timestamp, value);
+ createEvaluationTaskIfNecessary(timestamp);
+ }
+
+ public final void collect(long timestamp, String value) {
+ if (timestamp <= maxTime) {
+ return;
+ }
+ maxTime = timestamp;
+
+ data.putBinary(timestamp, Binary.valueOf(value));
+ createEvaluationTaskIfNecessary(timestamp);
+ }
+
+ public final void collect(long timestamp, Binary value) {
+ if (timestamp <= maxTime) {
+ return;
+ }
+ maxTime = timestamp;
+
+ data.putBinary(timestamp, value);
+ createEvaluationTaskIfNecessary(timestamp);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/windowing/runtime/WindowEvaluationTask.java
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/runtime/WindowEvaluationTask.java
new file mode 100644
index 0000000..794b794
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/runtime/WindowEvaluationTask.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.windowing.runtime;
+
+import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.utils.windowing.api.Evaluator;
+import org.apache.iotdb.db.utils.windowing.api.Window;
+
+public class WindowEvaluationTask extends WrappedRunnable {
+
+ private final Evaluator evaluator;
+ private final Window window;
+
+ public WindowEvaluationTask(Evaluator evaluator, Window window) {
+ this.evaluator = evaluator;
+ this.window = window;
+ }
+
+ @Override
+ public void runMayThrow() {
+ evaluator.evaluate(window);
+ }
+
+ public void onRejection() {
+ evaluator.onRejection(window);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/windowing/runtime/WindowEvaluationTaskPoolManager.java
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/runtime/WindowEvaluationTaskPoolManager.java
new file mode 100644
index 0000000..c6d7223
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/runtime/WindowEvaluationTaskPoolManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.windowing.runtime;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.IoTThreadFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class WindowEvaluationTaskPoolManager extends AbstractPoolManager {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(WindowEvaluationTaskPoolManager.class);
+
+ private WindowEvaluationTaskPoolManager() {
+ final int nThreads =
+
IoTDBDescriptor.getInstance().getConfig().getConcurrentWindowEvaluationThread();
+ LOGGER.info("WindowEvaluationTaskPoolManager is initializing, thread
number: {}", nThreads);
+ pool =
+ new ThreadPoolExecutor(
+ nThreads,
+ nThreads,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(
+
IoTDBDescriptor.getInstance().getConfig().getMaxPendingWindowEvaluationTasks()),
+ new
IoTThreadFactory(ThreadName.WINDOW_EVALUATION_SERVICE.getName()));
+ }
+
+ public void submit(WindowEvaluationTask task) {
+ try {
+ super.submit(task);
+ } catch (RejectedExecutionException e) {
+ task.onRejection();
+ }
+ }
+
+ @Override
+ public Logger getLogger() {
+ return LOGGER;
+ }
+
+ @Override
+ public String getName() {
+ return "window evaluation task";
+ }
+
+ @Override
+ public void start() {
+ if (pool != null) {
+ return;
+ }
+
+ pool =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+
IoTDBDescriptor.getInstance().getConfig().getConcurrentWindowEvaluationThread(),
+ ThreadName.WINDOW_EVALUATION_SERVICE.getName());
+ }
+
+ public static WindowEvaluationTaskPoolManager getInstance() {
+ return WindowEvaluationTaskPoolManager.InstanceHolder.INSTANCE;
+ }
+
+ private static class InstanceHolder {
+
+ private InstanceHolder() {
+ // nothing to do
+ }
+
+ private static final WindowEvaluationTaskPoolManager INSTANCE =
+ new WindowEvaluationTaskPoolManager();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/windowing/window/EvictableBatchList.java
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/window/EvictableBatchList.java
new file mode 100644
index 0000000..7129708
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/window/EvictableBatchList.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.windowing.window;
+
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class EvictableBatchList {
+
+ private static int internalBatchSize =
+ TSFileConfig.ARRAY_CAPACITY_THRESHOLD *
TSFileConfig.ARRAY_CAPACITY_THRESHOLD;
+
+ private final TSDataType dataType;
+
+ private List<BatchData> batchList;
+ private int size;
+
+ private int actualOuterIndexAt0;
+
+ public EvictableBatchList(TSDataType dataType) {
+ this.dataType = dataType;
+ batchList = new ArrayList<>();
+ size = 0;
+ actualOuterIndexAt0 = 0;
+ }
+
+ public void putInt(long t, int v) {
+ if (size % internalBatchSize == 0) {
+ batchList.add(new BatchData(dataType));
+ }
+
+ batchList.get(size / internalBatchSize - actualOuterIndexAt0).putInt(t, v);
+ ++size;
+ }
+
+ public void putLong(long t, long v) {
+ if (size % internalBatchSize == 0) {
+ batchList.add(new BatchData(dataType));
+ }
+
+ batchList.get(size / internalBatchSize - actualOuterIndexAt0).putLong(t,
v);
+ ++size;
+ }
+
+ public void putFloat(long t, float v) {
+ if (size % internalBatchSize == 0) {
+ batchList.add(new BatchData(dataType));
+ }
+
+ batchList.get(size / internalBatchSize - actualOuterIndexAt0).putFloat(t,
v);
+ ++size;
+ }
+
+ public void putDouble(long t, double v) {
+ if (size % internalBatchSize == 0) {
+ batchList.add(new BatchData(dataType));
+ }
+
+ batchList.get(size / internalBatchSize - actualOuterIndexAt0).putDouble(t,
v);
+ ++size;
+ }
+
+ public void putBoolean(long t, boolean v) {
+ if (size % internalBatchSize == 0) {
+ batchList.add(new BatchData(dataType));
+ }
+
+ batchList.get(size / internalBatchSize -
actualOuterIndexAt0).putBoolean(t, v);
+ ++size;
+ }
+
+ public void putBinary(long t, Binary v) {
+ if (size % internalBatchSize == 0) {
+ batchList.add(new BatchData(dataType));
+ }
+
+ batchList.get(size / internalBatchSize - actualOuterIndexAt0).putBinary(t,
v);
+ ++size;
+ }
+
+ public long getTimeByIndex(int index) {
+ return batchList
+ .get(index / internalBatchSize - actualOuterIndexAt0)
+ .getTimeByIndex(index % internalBatchSize);
+ }
+
+ public int getIntByIndex(int index) {
+ return batchList
+ .get(index / internalBatchSize - actualOuterIndexAt0)
+ .getIntByIndex(index % internalBatchSize);
+ }
+
+ public long getLongByIndex(int index) {
+ return batchList
+ .get(index / internalBatchSize - actualOuterIndexAt0)
+ .getLongByIndex(index % internalBatchSize);
+ }
+
+ public float getFloatByIndex(int index) {
+ return batchList
+ .get(index / internalBatchSize - actualOuterIndexAt0)
+ .getFloatByIndex(index % internalBatchSize);
+ }
+
+ public double getDoubleByIndex(int index) {
+ return batchList
+ .get(index / internalBatchSize - actualOuterIndexAt0)
+ .getDoubleByIndex(index % internalBatchSize);
+ }
+
+ public boolean getBooleanByIndex(int index) {
+ return batchList
+ .get(index / internalBatchSize - actualOuterIndexAt0)
+ .getBooleanByIndex(index % internalBatchSize);
+ }
+
+ public Binary getBinaryByIndex(int index) {
+ return batchList
+ .get(index / internalBatchSize - actualOuterIndexAt0)
+ .getBinaryByIndex(index % internalBatchSize);
+ }
+
+ /** @param evictionUpperBound valid elements [evictionUpperBound, size) */
+ public void setEvictionUpperBound(int evictionUpperBound) {
+ int outerEvictionUpperBound = evictionUpperBound / internalBatchSize;
+ if (actualOuterIndexAt0 < outerEvictionUpperBound) {
+ doEviction(outerEvictionUpperBound);
+ }
+ }
+
+ private void doEviction(int outerEvictionUpperBound) {
+ batchList =
+ new ArrayList<>(
+ batchList.subList(outerEvictionUpperBound - actualOuterIndexAt0,
batchList.size()));
+ actualOuterIndexAt0 = outerEvictionUpperBound;
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public TSDataType getDataType() {
+ return dataType;
+ }
+
+ @TestOnly
+ public static void setInternalBatchSize(int internalBatchSize) {
+ EvictableBatchList.internalBatchSize = internalBatchSize;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/windowing/window/WindowImpl.java
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/window/WindowImpl.java
new file mode 100644
index 0000000..96c71a7
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/utils/windowing/window/WindowImpl.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.windowing.window;
+
+import org.apache.iotdb.db.utils.windowing.api.Window;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public class WindowImpl implements Window {
+
+ private final int size;
+ private final TSDataType dataType;
+
+ private long[] timestamps = null;
+
+ private int[] intValues = null;
+ private long[] longValues = null;
+ private float[] floatValues = null;
+ private double[] doubleValues = null;
+ private boolean[] booleanValues = null;
+ private Binary[] binaryValues = null;
+
+ public WindowImpl(EvictableBatchList list, int begin, int size) {
+ this.size = size;
+ dataType = list.getDataType();
+ init(list, begin);
+ }
+
+ private void init(EvictableBatchList list, int begin) {
+ timestamps = new long[size];
+ for (int i = 0; i < size; ++i) {
+ timestamps[i] = list.getTimeByIndex(begin + i);
+ }
+
+ switch (dataType) {
+ case INT32:
+ intValues = new int[size];
+ for (int i = 0; i < size; ++i) {
+ intValues[i] = list.getIntByIndex(begin + i);
+ }
+ break;
+ case INT64:
+ longValues = new long[size];
+ for (int i = 0; i < size; ++i) {
+ longValues[i] = list.getLongByIndex(begin + i);
+ }
+ break;
+ case FLOAT:
+ floatValues = new float[size];
+ for (int i = 0; i < size; ++i) {
+ floatValues[i] = list.getFloatByIndex(begin + i);
+ }
+ break;
+ case DOUBLE:
+ doubleValues = new double[size];
+ for (int i = 0; i < size; ++i) {
+ doubleValues[i] = list.getDoubleByIndex(begin + i);
+ }
+ break;
+ case BOOLEAN:
+ booleanValues = new boolean[size];
+ for (int i = 0; i < size; ++i) {
+ booleanValues[i] = list.getBooleanByIndex(begin + i);
+ }
+ break;
+ case TEXT:
+ binaryValues = new Binary[size];
+ for (int i = 0; i < size; ++i) {
+ binaryValues[i] = list.getBinaryByIndex(begin + i);
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(dataType.toString());
+ }
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public TSDataType getDataType() {
+ return dataType;
+ }
+
+ @Override
+ public long getTime(int index) {
+ return timestamps[index];
+ }
+
+ @Override
+ public int getInt(int index) {
+ return intValues[index];
+ }
+
+ @Override
+ public long getLong(int index) {
+ return longValues[index];
+ }
+
+ @Override
+ public float getFloat(int index) {
+ return floatValues[index];
+ }
+
+ @Override
+ public double getDouble(int index) {
+ return doubleValues[index];
+ }
+
+ @Override
+ public boolean getBoolean(int index) {
+ return booleanValues[index];
+ }
+
+ @Override
+ public Binary getBinary(int index) {
+ return binaryValues[index];
+ }
+
+ @Override
+ public long[] getTimeArray() {
+ return timestamps;
+ }
+
+ @Override
+ public int[] getIntArray() {
+ return intValues;
+ }
+
+ @Override
+ public long[] getLongArray() {
+ return longValues;
+ }
+
+ @Override
+ public float[] getFloatArray() {
+ return floatValues;
+ }
+
+ @Override
+ public double[] getDoubleArray() {
+ return doubleValues;
+ }
+
+ @Override
+ public boolean[] getBooleanArray() {
+ return booleanValues;
+ }
+
+ @Override
+ public Binary[] getBinaryArray() {
+ return binaryValues;
+ }
+
+ @Override
+ public void setInt(int index, int value) {
+ intValues[index] = value;
+ }
+
+ @Override
+ public void setLong(int index, long value) {
+ longValues[index] = value;
+ }
+
+ @Override
+ public void setFloat(int index, float value) {
+ floatValues[index] = value;
+ }
+
+ @Override
+ public void setDouble(int index, double value) {
+ doubleValues[index] = value;
+ }
+
+ @Override
+ public void setBoolean(int index, boolean value) {
+ booleanValues[index] = value;
+ }
+
+ @Override
+ public void setBinary(int index, Binary value) {
+ binaryValues[index] = value;
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/utils/windowing/SlidingSizeWindowEvaluationHandlerTest.java
b/server/src/test/java/org/apache/iotdb/db/utils/windowing/SlidingSizeWindowEvaluationHandlerTest.java
new file mode 100644
index 0000000..be88ad7
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/utils/windowing/SlidingSizeWindowEvaluationHandlerTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils.windowing;
+
+import
org.apache.iotdb.db.utils.windowing.configuration.SlidingSizeWindowConfiguration;
+import org.apache.iotdb.db.utils.windowing.exception.WindowingException;
+import
org.apache.iotdb.db.utils.windowing.handler.SlidingSizeWindowEvaluationHandler;
+import org.apache.iotdb.db.utils.windowing.window.EvictableBatchList;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+public class SlidingSizeWindowEvaluationHandlerTest {
+
+ @Before
+ public void setUp() throws Exception {
+ EvictableBatchList.setInternalBatchSize(2);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EvictableBatchList.setInternalBatchSize(
+ TSFileConfig.ARRAY_CAPACITY_THRESHOLD *
TSFileConfig.ARRAY_CAPACITY_THRESHOLD);
+ }
+
+ @Test
+ public void test00() throws WindowingException {
+ doTest(1, 1, 0);
+ }
+
+ @Test
+ public void test01() throws WindowingException {
+ doTest(1, 1, 1);
+ }
+
+ @Test
+ public void test02() throws WindowingException {
+ doTest(1, 1, 2);
+ }
+
+ @Test
+ public void test03() throws WindowingException {
+ doTest(1, 1, 5);
+ }
+
+ @Test
+ public void test04() throws WindowingException {
+ doTest(1, 2, 0);
+ }
+
+ @Test
+ public void test05() throws WindowingException {
+ doTest(1, 2, 1);
+ }
+
+ @Test
+ public void test06() throws WindowingException {
+ doTest(1, 2, 2);
+ }
+
+ @Test
+ public void test07() throws WindowingException {
+ doTest(1, 2, 5);
+ }
+
+ @Test
+ public void test08() throws WindowingException {
+ doTest(7, 2, 5);
+ }
+
+ @Test
+ public void test09() throws WindowingException {
+ doTest(7, 3, 7);
+ }
+
+ @Test
+ public void test10() throws WindowingException {
+ doTest(7, 3, 24);
+ }
+
+ @Test
+ public void test11() throws WindowingException {
+ doTest(7, 10, 75);
+ }
+
+ @Test
+ public void test12() throws WindowingException {
+ doTest(7, 10, 76);
+ }
+
+ @Test
+ public void test13() throws WindowingException {
+ doTest(7, 10, 77);
+ }
+
+ @Test
+ public void test14() throws WindowingException {
+ doTest(7, 7, 75);
+ }
+
+ @Test
+ public void test15() throws WindowingException {
+ doTest(7, 7, 76);
+ }
+
+ @Test
+ public void test16() throws WindowingException {
+ doTest(7, 7, 77);
+ }
+
+ private void doTest(int windowSize, int slidingStep, int totalPointNumber)
+ throws WindowingException {
+ final AtomicInteger count = new AtomicInteger(0);
+ final ConcurrentHashMap<Integer, Integer> actualTVMap = new
ConcurrentHashMap<>();
+
+ SlidingSizeWindowEvaluationHandler handler =
+ new SlidingSizeWindowEvaluationHandler(
+ new SlidingSizeWindowConfiguration(TSDataType.INT32, windowSize,
slidingStep),
+ window -> {
+ for (int i = 0; i < window.size(); ++i) {
+ actualTVMap.put((int) window.getTime(i), window.getInt(i));
+ }
+
+ count.incrementAndGet();
+ });
+
+ for (int i = 0; i < totalPointNumber; ++i) {
+ handler.collect(i, i);
+
+ // the following data points will be ignored
+ handler.collect(i, i);
+ handler.collect(i - 1, i);
+ }
+
+ await()
+ .atMost(30, SECONDS)
+ .until(
+ () ->
+ (totalPointNumber < windowSize
+ ? 0
+ : 1 + (totalPointNumber - windowSize) / slidingStep)
+ == count.get());
+
+ final ConcurrentHashMap<Integer, Integer> expectedTVMap = new
ConcurrentHashMap<>();
+ final int windowCount = count.get();
+ collection:
+ for (int i = 0; i < windowCount; ++i) {
+ for (int j = 0; j < windowSize; ++j) {
+ final int tv = i * slidingStep + j;
+ if (totalPointNumber <= tv) {
+ break collection;
+ }
+ expectedTVMap.put(tv, tv);
+ }
+ }
+ Assert.assertEquals(expectedTVMap, actualTVMap);
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/utils/windowing/SlidingTimeWindowEvaluationHandlerTest.java
b/server/src/test/java/org/apache/iotdb/db/utils/windowing/SlidingTimeWindowEvaluationHandlerTest.java
new file mode 100644
index 0000000..d0e3fe5
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/utils/windowing/SlidingTimeWindowEvaluationHandlerTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils.windowing;
+
+import
org.apache.iotdb.db.utils.windowing.configuration.SlidingTimeWindowConfiguration;
+import org.apache.iotdb.db.utils.windowing.exception.WindowingException;
+import
org.apache.iotdb.db.utils.windowing.handler.SlidingTimeWindowEvaluationHandler;
+import org.apache.iotdb.db.utils.windowing.window.EvictableBatchList;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+public class SlidingTimeWindowEvaluationHandlerTest {
+
+ @Before
+ public void setUp() throws Exception {
+ EvictableBatchList.setInternalBatchSize(2);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EvictableBatchList.setInternalBatchSize(
+ TSFileConfig.ARRAY_CAPACITY_THRESHOLD *
TSFileConfig.ARRAY_CAPACITY_THRESHOLD);
+ }
+
+ @Test
+ public void test00() throws WindowingException {
+ doTest(1, 1, 0);
+ }
+
+ @Test
+ public void test01() throws WindowingException {
+ doTest(1, 1, 1);
+ }
+
+ @Test
+ public void test02() throws WindowingException {
+ doTest(1, 1, 2);
+ }
+
+ @Test
+ public void test03() throws WindowingException {
+ doTest(1, 1, 5);
+ }
+
+ @Test
+ public void test04() throws WindowingException {
+ doTest(1, 2, 0);
+ }
+
+ @Test
+ public void test05() throws WindowingException {
+ doTest(1, 2, 1);
+ }
+
+ @Test
+ public void test06() throws WindowingException {
+ doTest(1, 2, 2);
+ }
+
+ @Test
+ public void test07() throws WindowingException {
+ doTest(1, 2, 5);
+ }
+
+ @Test
+ public void test08() throws WindowingException {
+ doTest(7, 2, 5);
+ }
+
+ @Test
+ public void test09() throws WindowingException {
+ doTest(7, 3, 7);
+ }
+
+ @Test
+ public void test10() throws WindowingException {
+ doTest(7, 3, 24);
+ }
+
+ @Test
+ public void test11() throws WindowingException {
+ doTest(7, 10, 75);
+ }
+
+ @Test
+ public void test12() throws WindowingException {
+ doTest(7, 10, 76);
+ }
+
+ @Test
+ public void test13() throws WindowingException {
+ doTest(7, 10, 77);
+ }
+
+ @Test
+ public void test14() throws WindowingException {
+ doTest(7, 7, 75);
+ }
+
+ @Test
+ public void test15() throws WindowingException {
+ doTest(7, 7, 76);
+ }
+
+ @Test
+ public void test16() throws WindowingException {
+ doTest(7, 7, 77);
+ }
+
+ @Test
+ public void test17() throws WindowingException {
+ doTest(7, 33, 77);
+ }
+
+ @Test
+ public void test18() throws WindowingException {
+ doTest(4, 16, 128);
+ }
+
+ @Test
+ public void test19() throws WindowingException {
+ doTest(1, 100, 101);
+ }
+
+ private void doTest(long timeInterval, long slidingStep, long totalTime)
+ throws WindowingException {
+ final AtomicInteger count = new AtomicInteger(0);
+ final ConcurrentHashMap<Integer, Integer> actualTVMap = new
ConcurrentHashMap<>();
+
+ SlidingTimeWindowEvaluationHandler handler =
+ new SlidingTimeWindowEvaluationHandler(
+ new SlidingTimeWindowConfiguration(TSDataType.INT32, timeInterval,
slidingStep),
+ window -> {
+ for (int i = 0; i < window.size(); ++i) {
+ actualTVMap.put((int) window.getTime(i), window.getInt(i));
+ }
+
+ count.incrementAndGet();
+ });
+
+ for (int i = 0; i < totalTime; ++i) {
+ handler.collect(i, i);
+
+ // the following data points will be ignored
+ handler.collect(i, i);
+ handler.collect(i - 1, i);
+ }
+
+ await()
+ .atMost(30, SECONDS)
+ .until(
+ () ->
+ (totalTime < timeInterval
+ ? 0
+ : 1
+ + (totalTime - timeInterval) / slidingStep
+ - ((totalTime - timeInterval) % slidingStep == 0 ?
1 : 0))
+ == count.get());
+
+ final ConcurrentHashMap<Integer, Integer> expectedTVMap = new
ConcurrentHashMap<>();
+ final int windowCount = count.get();
+ collection:
+ for (int i = 0; i < windowCount; ++i) {
+ for (int j = 0; j < timeInterval; ++j) {
+ final int tv = (int) (i * slidingStep + j);
+ if (totalTime <= tv) {
+ break collection;
+ }
+ expectedTVMap.put(tv, tv);
+ }
+ }
+ Assert.assertEquals(expectedTVMap, actualTVMap);
+ }
+
+ @Test
+ public void testWithEmptyWindows() throws WindowingException {
+ final AtomicInteger countTotal = new AtomicInteger(0);
+ final AtomicInteger countEmpty = new AtomicInteger(0);
+
+ SlidingTimeWindowEvaluationHandler handler =
+ new SlidingTimeWindowEvaluationHandler(
+ new SlidingTimeWindowConfiguration(TSDataType.INT32, 3, 7),
+ window -> {
+ Assert.assertTrue(window.size() == 0 || window.size() == 1);
+
+ countTotal.incrementAndGet();
+
+ if (window.size() == 0) {
+ countEmpty.incrementAndGet();
+ }
+ });
+
+ for (int i = 0; i < 10; ++i) {
+ handler.collect(21 * i, 21 * i);
+ }
+
+ await().atMost(30, SECONDS).until(() -> countTotal.get() == 27 &&
countEmpty.get() == 18);
+ }
+}
diff --git a/site/src/main/.vuepress/config.js
b/site/src/main/.vuepress/config.js
index d113888..d35d8f4 100644
--- a/site/src/main/.vuepress/config.js
+++ b/site/src/main/.vuepress/config.js
@@ -563,14 +563,6 @@ var config = {
['UDF/UDF-User-Defined-Function','UDF (User Defined Function)']
]
},
- // The trigger module has not been
implemented yet,
- // so the website should not show users
how to use it to avoid misleading.
- // {
- // title: 'Trigger',
- // children: [
- //
['UDF/Triggers','Trigger']
- // ]
- // },
{
title: 'Communication Service
Protocol',
children: [
@@ -702,19 +694,14 @@ var config = {
]
},
{
- title: 'UDF',
+ title: 'Advanced Features',
children: [
-
['UDF/UDF-User-Defined-Function','UDF (User Defined Function)']
+
['Advanced-Features/UDF-User-Defined-Function','UDF (User Defined Function)'],
+ // The trigger module
has not been implemented yet,
+ // so the website
should not show users how to use it to avoid misleading.
+ //
['Advanced-Features/Triggers','Trigger']
]
},
- // The trigger module has not been
implemented yet,
- // so the website should not show users
how to use it to avoid misleading.
- // {
- // title: 'Trigger',
- // children: [
- //
['UDF/Triggers','Trigger']
- // ]
- // },
{
title: 'Communication Service
Protocol',
children: [
@@ -1367,14 +1354,6 @@ var config = {
['UDF/UDF-User-Defined-Function','用户定义函数(UDF)']
]
},
- // The trigger module has not been
implemented yet,
- // so the website should not show users
how to use it to avoid misleading.
- // {
- // title: 'Trigger',
- // children: [
- //
['UDF/Triggers','Trigger']
- // ]
- // },
{
title: '通信服务协议',
children: [
@@ -1500,19 +1479,14 @@ var config = {
]
},
{
- title: '用户定义函数(UDF)',
+ title: '高级功能',
children: [
-
['UDF/UDF-User-Defined-Function','用户定义函数(UDF)']
+
['Advanced-Features/UDF-User-Defined-Function','用户定义函数(UDF)'],
+ // The trigger module has not been implemented yet,
+ // so the website should not show users how to use it to avoid
misleading.
+ // ['Advanced-Features/Triggers','触发器']
]
},
- // The trigger module has not been
implemented yet,
- // so the website should not show users
how to use it to avoid misleading.
- // {
- // title: 'Trigger',
- // children: [
- //
['UDF/Triggers','Trigger']
- // ]
- // },
{
title: '通信服务协议',
children: [