This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d7ed1c10e5 Add new sampling algorithm largest triangle to sample UDF
(#7462)
d7ed1c10e5 is described below
commit d7ed1c10e52deb720b0c207a19cda3f3ac29e46d
Author: Zhizhou Li <[email protected]>
AuthorDate: Wed Oct 12 14:46:06 2022 +0800
Add new sampling algorithm largest triangle to sample UDF (#7462)
---
docs/UserGuide/UDF-Library/Data-Profiling.md | 10 ++--
docs/zh/UserGuide/UDF-Library/Data-Profiling.md | 6 +-
library-udf/pom.xml | 12 ++++
.../apache/iotdb/library/dprofile/UDTFSample.java | 70 +++++++++++++++++++---
.../iotdb/library/dprofile/DProfileTests.java | 13 ++++
5 files changed, 97 insertions(+), 14 deletions(-)
diff --git a/docs/UserGuide/UDF-Library/Data-Profiling.md
b/docs/UserGuide/UDF-Library/Data-Profiling.md
index e2b2625ae4..7df66d245f 100644
--- a/docs/UserGuide/UDF-Library/Data-Profiling.md
+++ b/docs/UserGuide/UDF-Library/Data-Profiling.md
@@ -1162,11 +1162,13 @@ Output series:
This function is used to sample the input series,
that is, select a specified number of data points from the input series and
output them.
-Currently, two sampling methods are supported:
+Currently, three sampling methods are supported:
**Reservoir sampling** randomly selects data points.
All of the points have the same probability of being sampled.
**Isometric sampling** selects data points at equal index intervals.
-
+**Triangle sampling** assigns data points to the buckets based on the number
of sampling.
+Then it calculates the area of the triangle based on these points inside the
bucket and selects the point with the largest area of the triangle.
+For more detail, please read
[paper](http://skemman.is/stream/get/1946/15343/37285/3/SS_MSthesis.pdf)
**Name:** SAMPLE
@@ -1174,7 +1176,7 @@ All of the points have the same probability of being
sampled.
**Parameters:**
-+ `method`: The method of sampling, which is 'reservoir' or 'isometric'. By
default, reservoir sampling is used.
++ `method`: The method of sampling, which is 'reservoir', 'isometric' or
'triangle'. By default, reservoir sampling is used.
+ `k`: The number of sampling, which is a positive integer. By default, it's 1.
**Output Series:** Output a single series. The type is the same as the input.
The length of the output series is `k`. Each data point in the output series
comes from the input series.
@@ -1809,4 +1811,4 @@ Output series:
|1970-01-01T08:00:01.900+08:00| -1.033622788243404|
|1970-01-01T08:00:02.000+08:00|-0.20672455764868078|
+-----------------------------+--------------------+
-```
\ No newline at end of file
+```
diff --git a/docs/zh/UserGuide/UDF-Library/Data-Profiling.md
b/docs/zh/UserGuide/UDF-Library/Data-Profiling.md
index 859be2fb02..b5657ad6cf 100644
--- a/docs/zh/UserGuide/UDF-Library/Data-Profiling.md
+++ b/docs/zh/UserGuide/UDF-Library/Data-Profiling.md
@@ -1154,7 +1154,7 @@ select resample(s1,'every'='30m','start'='2021-03-06
15:00:00') from root.test.d
### 函数简介
-本函数对输入序列进行采样,即从输入序列中选取指定数量的数据点并输出。目前,本函数支持两种采样方法:**蓄水池采样法 (reservoir
sampling)** 对数据进行随机采样,所有数据点被采样的概率相同;**等距采样法 (isometric sampling)**
按照相等的索引间隔对数据进行采样。
+本函数对输入序列进行采样,即从输入序列中选取指定数量的数据点并输出。目前,本函数支持三种采样方法:**蓄水池采样法 (reservoir
sampling)** 对数据进行随机采样,所有数据点被采样的概率相同;**等距采样法 (isometric sampling)**
按照相等的索引间隔对数据进行采样,**最大三角采样法 (triangle sampling)**
对所有数据会按采样率分桶,每个桶内会计算数据点间三角形面积,并保留面积最大的点,该算法通常用于数据的可视化展示中,采用过程可以保证一些关键的突变点在采用中得到保留,更多抽样算法细节可以阅读论文
[here](http://skemman.is/stream/get/1946/15343/37285/3/SS_MSthesis.pdf)。
**函数名:** SAMPLE
@@ -1162,7 +1162,7 @@ select resample(s1,'every'='30m','start'='2021-03-06
15:00:00') from root.test.d
**参数:**
-+ `method`:采样方法,取值为 'reservoir' 或' isometric'。在缺省情况下,采用蓄水池采样法。
++ `method`:采样方法,取值为 'reservoir','isometric' 或 'triangle' 。在缺省情况下,采用蓄水池采样法。
+ `k`:采样数,它是一个正整数,在缺省情况下为 1。
**输出序列:** 输出单个序列,类型与输入序列相同。该序列的长度为采样数,序列中的每一个数据点都来自于输入序列。
@@ -1802,4 +1802,4 @@ select zscore(s1) from root.test
|1970-01-01T08:00:01.900+08:00| -1.033622788243404|
|1970-01-01T08:00:02.000+08:00|-0.20672455764868078|
+-----------------------------+--------------------+
-```
\ No newline at end of file
+```
diff --git a/library-udf/pom.xml b/library-udf/pom.xml
index 0d36053eb8..0842413876 100644
--- a/library-udf/pom.xml
+++ b/library-udf/pom.xml
@@ -27,6 +27,12 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>library-udf</artifactId>
+ <repositories>
+ <repository>
+ <id>jitpack.io</id>
+ <url>https://jitpack.io</url>
+ </repository>
+ </repositories>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
@@ -78,6 +84,12 @@
<groupId>com.github.wendykierp</groupId>
<artifactId>JTransforms</artifactId>
</dependency>
+ <!-- Sampling -->
+ <dependency>
+ <groupId>com.github.ggalmazor</groupId>
+ <artifactId>lt_downsampling_java8</artifactId>
+ <version>0.0.6</version>
+ </dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDTFSample.java
b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDTFSample.java
index 017a3458f0..b59932a98e 100644
---
a/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDTFSample.java
+++
b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDTFSample.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.library.dprofile;
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.library.util.NoNumberException;
import org.apache.iotdb.library.util.Util;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.udf.api.UDTF;
@@ -33,16 +34,27 @@ import
org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
import
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import com.github.ggalmazor.ltdownsampling.LTThreeBuckets;
+import com.github.ggalmazor.ltdownsampling.Point;
import org.apache.commons.lang3.tuple.Pair;
+import java.math.BigDecimal;
import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Random;
/** This function samples data by pool sampling. */
public class UDTFSample implements UDTF {
- private int k; // sample numbers
+ enum Method {
+ ISOMETRIC,
+ RESERVOIR,
+ TRIANGLE
+ }
+ private int k; // sample numbers
+ private Method method;
// These variables occurs in pool sampling
private Pair<Long, Object>[] samples; // sampled data
private int num = 0; // number of points already sampled
@@ -60,7 +72,8 @@ public class UDTFSample implements UDTF {
.validate(
method ->
"isometric".equalsIgnoreCase((String) method)
- || "reservoir".equalsIgnoreCase((String) method),
+ || "reservoir".equalsIgnoreCase((String) method)
+ || "triangle".equalsIgnoreCase((String) method),
"Illegal sampling method.",
validator.getParameters().getStringOrDefault("method",
"reservoir"));
}
@@ -71,7 +84,10 @@ public class UDTFSample implements UDTF {
this.k = parameters.getIntOrDefault("k", 1);
this.dataType =
UDFDataTypeTransformer.transformToTsDataType(parameters.getDataType(0));
String method = parameters.getStringOrDefault("method", "reservoir");
- if ("isometric".equalsIgnoreCase(method)) {
+ if ("triangle".equalsIgnoreCase(method)) this.method = Method.TRIANGLE;
+ else if ("isometric".equalsIgnoreCase(method)) this.method =
Method.ISOMETRIC;
+ else this.method = Method.RESERVOIR;
+ if (this.method == Method.ISOMETRIC || this.method == Method.TRIANGLE) {
configurations
.setAccessStrategy(new
SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE))
.setOutputDataType(parameters.getDataType(0));
@@ -105,11 +121,51 @@ public class UDTFSample implements UDTF {
public void transform(RowWindow rowWindow, PointCollector collector) throws
Exception {
// equal-distance sampling
int n = rowWindow.windowSize();
+
if (this.k < n) {
- for (long i = 0; i < this.k; i++) {
- long j = Math.floorDiv(i * (long) n, (long) k); // avoid intermediate
result overflows
- Row row = rowWindow.getRow((int) j);
- Util.putValue(collector, dataType, row.getTime(),
Util.getValueAsObject(row));
+ if (this.method == Method.TRIANGLE) {
+ List<Point> input = new LinkedList<>();
+ for (int i = 0; i < n; i++) {
+ Row row = rowWindow.getRow(i);
+ BigDecimal time = BigDecimal.valueOf(row.getTime());
+ BigDecimal data = BigDecimal.valueOf(Util.getValueAsDouble(row));
+ input.add(new Point(time, data));
+ }
+ if (k > 2) {
+ // The first and last element will always be sampled so the buckets
is k - 2
+ List<Point> output = LTThreeBuckets.sorted(input, k - 2);
+ for (Point p : output) {
+ switch (dataType) {
+ case INT32:
+ Util.putValue(collector, dataType, p.getX().longValue(),
p.getY().intValue());
+ break;
+ case INT64:
+ Util.putValue(collector, dataType, p.getX().longValue(),
p.getY().longValue());
+ break;
+ case FLOAT:
+ Util.putValue(collector, dataType, p.getX().longValue(),
p.getY().floatValue());
+ break;
+ case DOUBLE:
+ Util.putValue(collector, dataType, p.getX().longValue(),
p.getY().doubleValue());
+ break;
+ default:
+ throw new NoNumberException();
+ }
+ }
+ } else { // For corner case of k == 1 and k == 2
+ Row row = rowWindow.getRow(0); // Put first element
+ Util.putValue(collector, dataType, row.getTime(),
Util.getValueAsObject(row));
+ if (k == 2) {
+ row = rowWindow.getRow(n - 1); // Put last element
+ Util.putValue(collector, dataType, row.getTime(),
Util.getValueAsObject(row));
+ }
+ }
+ } else { // Method.ISOMETRIC
+ for (long i = 0; i < this.k; i++) {
+ long j = Math.floorDiv(i * (long) n, (long) k); // avoid
intermediate result overflows
+ Row row = rowWindow.getRow((int) j);
+ Util.putValue(collector, dataType, row.getTime(),
Util.getValueAsObject(row));
+ }
}
} else { // when k is larger than series length, output all points
RowIterator iterator = rowWindow.getRowIterator();
diff --git
a/library-udf/src/test/java/org/apache/iotdb/library/dprofile/DProfileTests.java
b/library-udf/src/test/java/org/apache/iotdb/library/dprofile/DProfileTests.java
index 20603caa4f..1618a184f4 100644
---
a/library-udf/src/test/java/org/apache/iotdb/library/dprofile/DProfileTests.java
+++
b/library-udf/src/test/java/org/apache/iotdb/library/dprofile/DProfileTests.java
@@ -575,6 +575,19 @@ public class DProfileTests {
}
}
+ @Test
+ public void testsample3() {
+ String sqlStr = "select sample(d1.s2, 'method'='triangle','k'='5') from
root.vehicle";
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sqlStr);
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
@Test
public void testSegment1() {
String sqlStr = "select segment(d2.s2,'error'='10') from root.vehicle";