This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d7ed1c10e5 Add new sampling algorithm largest triangle to sample UDF 
(#7462)
d7ed1c10e5 is described below

commit d7ed1c10e52deb720b0c207a19cda3f3ac29e46d
Author: Zhizhou Li <[email protected]>
AuthorDate: Wed Oct 12 14:46:06 2022 +0800

    Add new sampling algorithm largest triangle to sample UDF (#7462)
---
 docs/UserGuide/UDF-Library/Data-Profiling.md       | 10 ++--
 docs/zh/UserGuide/UDF-Library/Data-Profiling.md    |  6 +-
 library-udf/pom.xml                                | 12 ++++
 .../apache/iotdb/library/dprofile/UDTFSample.java  | 70 +++++++++++++++++++---
 .../iotdb/library/dprofile/DProfileTests.java      | 13 ++++
 5 files changed, 97 insertions(+), 14 deletions(-)

diff --git a/docs/UserGuide/UDF-Library/Data-Profiling.md 
b/docs/UserGuide/UDF-Library/Data-Profiling.md
index e2b2625ae4..7df66d245f 100644
--- a/docs/UserGuide/UDF-Library/Data-Profiling.md
+++ b/docs/UserGuide/UDF-Library/Data-Profiling.md
@@ -1162,11 +1162,13 @@ Output series:
 
 This function is used to sample the input series,
 that is, select a specified number of data points from the input series and 
output them.
-Currently, two sampling methods are supported:
+Currently, three sampling methods are supported:
 **Reservoir sampling** randomly selects data points.
 All of the points have the same probability of being sampled.
 **Isometric sampling** selects data points at equal index intervals.
-
+**Triangle sampling** assigns data points to the buckets based on the number 
of sampling. 
+Then it calculates the area of the triangle based on these points inside the 
bucket and selects the point with the largest area of the triangle. 
+For more detail, please read 
[paper](http://skemman.is/stream/get/1946/15343/37285/3/SS_MSthesis.pdf)
 
 **Name:** SAMPLE
 
@@ -1174,7 +1176,7 @@ All of the points have the same probability of being 
sampled.
 
 **Parameters:**
 
-+ `method`: The method of sampling, which is 'reservoir' or 'isometric'. By 
default, reservoir sampling is used.
++ `method`: The method of sampling, which is 'reservoir', 'isometric' or 
'triangle'. By default, reservoir sampling is used.
 + `k`: The number of sampling, which is a positive integer. By default, it's 1.
 
 **Output Series:** Output a single series. The type is the same as the input. 
The length of the output series is `k`. Each data point in the output series 
comes from the input series.
@@ -1809,4 +1811,4 @@ Output series:
 |1970-01-01T08:00:01.900+08:00|  -1.033622788243404|
 |1970-01-01T08:00:02.000+08:00|-0.20672455764868078|
 +-----------------------------+--------------------+
-```
\ No newline at end of file
+```
diff --git a/docs/zh/UserGuide/UDF-Library/Data-Profiling.md 
b/docs/zh/UserGuide/UDF-Library/Data-Profiling.md
index 859be2fb02..b5657ad6cf 100644
--- a/docs/zh/UserGuide/UDF-Library/Data-Profiling.md
+++ b/docs/zh/UserGuide/UDF-Library/Data-Profiling.md
@@ -1154,7 +1154,7 @@ select resample(s1,'every'='30m','start'='2021-03-06 
15:00:00') from root.test.d
 
 ### 函数简介
 
-本函数对输入序列进行采样,即从输入序列中选取指定数量的数据点并输出。目前,本函数支持两种采样方法:**蓄水池采样法 (reservoir 
sampling)** 对数据进行随机采样,所有数据点被采样的概率相同;**等距采样法 (isometric sampling)** 
按照相等的索引间隔对数据进行采样。
+本函数对输入序列进行采样,即从输入序列中选取指定数量的数据点并输出。目前,本函数支持三种采样方法:**蓄水池采样法 (reservoir 
sampling)** 对数据进行随机采样,所有数据点被采样的概率相同;**等距采样法 (isometric sampling)** 
按照相等的索引间隔对数据进行采样,**最大三角采样法 (triangle sampling)** 
对所有数据会按采样率分桶,每个桶内会计算数据点间三角形面积,并保留面积最大的点,该算法通常用于数据的可视化展示中,采用过程可以保证一些关键的突变点在采用中得到保留,更多抽样算法细节可以阅读论文
 [here](http://skemman.is/stream/get/1946/15343/37285/3/SS_MSthesis.pdf)。
 
 **函数名:** SAMPLE
 
@@ -1162,7 +1162,7 @@ select resample(s1,'every'='30m','start'='2021-03-06 
15:00:00') from root.test.d
 
 **参数:**
 
-+ `method`:采样方法,取值为 'reservoir' 或' isometric'。在缺省情况下,采用蓄水池采样法。
++ `method`:采样方法,取值为 'reservoir','isometric' 或 'triangle' 。在缺省情况下,采用蓄水池采样法。
 + `k`:采样数,它是一个正整数,在缺省情况下为 1。
 
 **输出序列:** 输出单个序列,类型与输入序列相同。该序列的长度为采样数,序列中的每一个数据点都来自于输入序列。
@@ -1802,4 +1802,4 @@ select zscore(s1) from root.test
 |1970-01-01T08:00:01.900+08:00|  -1.033622788243404|
 |1970-01-01T08:00:02.000+08:00|-0.20672455764868078|
 +-----------------------------+--------------------+
-```
\ No newline at end of file
+```
diff --git a/library-udf/pom.xml b/library-udf/pom.xml
index 0d36053eb8..0842413876 100644
--- a/library-udf/pom.xml
+++ b/library-udf/pom.xml
@@ -27,6 +27,12 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>library-udf</artifactId>
+    <repositories>
+        <repository>
+            <id>jitpack.io</id>
+            <url>https://jitpack.io</url>
+        </repository>
+    </repositories>
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
@@ -78,6 +84,12 @@
             <groupId>com.github.wendykierp</groupId>
             <artifactId>JTransforms</artifactId>
         </dependency>
+        <!-- Sampling  -->
+        <dependency>
+            <groupId>com.github.ggalmazor</groupId>
+            <artifactId>lt_downsampling_java8</artifactId>
+            <version>0.0.6</version>
+        </dependency>
         <dependency>
             <groupId>org.jetbrains</groupId>
             <artifactId>annotations</artifactId>
diff --git 
a/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDTFSample.java 
b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDTFSample.java
index 017a3458f0..b59932a98e 100644
--- 
a/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDTFSample.java
+++ 
b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDTFSample.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.library.dprofile;
 
 import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.library.util.NoNumberException;
 import org.apache.iotdb.library.util.Util;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.udf.api.UDTF;
@@ -33,16 +34,27 @@ import 
org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
 import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
 import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 
+import com.github.ggalmazor.ltdownsampling.LTThreeBuckets;
+import com.github.ggalmazor.ltdownsampling.Point;
 import org.apache.commons.lang3.tuple.Pair;
 
+import java.math.BigDecimal;
 import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Random;
 
 /** This function samples data by pool sampling. */
 public class UDTFSample implements UDTF {
 
-  private int k; // sample numbers
+  enum Method {
+    ISOMETRIC,
+    RESERVOIR,
+    TRIANGLE
+  }
 
+  private int k; // sample numbers
+  private Method method;
   // These variables occurs in pool sampling
   private Pair<Long, Object>[] samples; // sampled data
   private int num = 0; // number of points already sampled
@@ -60,7 +72,8 @@ public class UDTFSample implements UDTF {
         .validate(
             method ->
                 "isometric".equalsIgnoreCase((String) method)
-                    || "reservoir".equalsIgnoreCase((String) method),
+                    || "reservoir".equalsIgnoreCase((String) method)
+                    || "triangle".equalsIgnoreCase((String) method),
             "Illegal sampling method.",
             validator.getParameters().getStringOrDefault("method", 
"reservoir"));
   }
@@ -71,7 +84,10 @@ public class UDTFSample implements UDTF {
     this.k = parameters.getIntOrDefault("k", 1);
     this.dataType = 
UDFDataTypeTransformer.transformToTsDataType(parameters.getDataType(0));
     String method = parameters.getStringOrDefault("method", "reservoir");
-    if ("isometric".equalsIgnoreCase(method)) {
+    if ("triangle".equalsIgnoreCase(method)) this.method = Method.TRIANGLE;
+    else if ("isometric".equalsIgnoreCase(method)) this.method = 
Method.ISOMETRIC;
+    else this.method = Method.RESERVOIR;
+    if (this.method == Method.ISOMETRIC || this.method == Method.TRIANGLE) {
       configurations
           .setAccessStrategy(new 
SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE))
           .setOutputDataType(parameters.getDataType(0));
@@ -105,11 +121,51 @@ public class UDTFSample implements UDTF {
   public void transform(RowWindow rowWindow, PointCollector collector) throws 
Exception {
     // equal-distance sampling
     int n = rowWindow.windowSize();
+
     if (this.k < n) {
-      for (long i = 0; i < this.k; i++) {
-        long j = Math.floorDiv(i * (long) n, (long) k); // avoid intermediate 
result overflows
-        Row row = rowWindow.getRow((int) j);
-        Util.putValue(collector, dataType, row.getTime(), 
Util.getValueAsObject(row));
+      if (this.method == Method.TRIANGLE) {
+        List<Point> input = new LinkedList<>();
+        for (int i = 0; i < n; i++) {
+          Row row = rowWindow.getRow(i);
+          BigDecimal time = BigDecimal.valueOf(row.getTime());
+          BigDecimal data = BigDecimal.valueOf(Util.getValueAsDouble(row));
+          input.add(new Point(time, data));
+        }
+        if (k > 2) {
+          // The first and last element will always be sampled so the buckets 
is k - 2
+          List<Point> output = LTThreeBuckets.sorted(input, k - 2);
+          for (Point p : output) {
+            switch (dataType) {
+              case INT32:
+                Util.putValue(collector, dataType, p.getX().longValue(), 
p.getY().intValue());
+                break;
+              case INT64:
+                Util.putValue(collector, dataType, p.getX().longValue(), 
p.getY().longValue());
+                break;
+              case FLOAT:
+                Util.putValue(collector, dataType, p.getX().longValue(), 
p.getY().floatValue());
+                break;
+              case DOUBLE:
+                Util.putValue(collector, dataType, p.getX().longValue(), 
p.getY().doubleValue());
+                break;
+              default:
+                throw new NoNumberException();
+            }
+          }
+        } else { // For corner case of k == 1 and k == 2
+          Row row = rowWindow.getRow(0); // Put first element
+          Util.putValue(collector, dataType, row.getTime(), 
Util.getValueAsObject(row));
+          if (k == 2) {
+            row = rowWindow.getRow(n - 1); // Put last element
+            Util.putValue(collector, dataType, row.getTime(), 
Util.getValueAsObject(row));
+          }
+        }
+      } else { // Method.ISOMETRIC
+        for (long i = 0; i < this.k; i++) {
+          long j = Math.floorDiv(i * (long) n, (long) k); // avoid 
intermediate result overflows
+          Row row = rowWindow.getRow((int) j);
+          Util.putValue(collector, dataType, row.getTime(), 
Util.getValueAsObject(row));
+        }
       }
     } else { // when k is larger than series length, output all points
       RowIterator iterator = rowWindow.getRowIterator();
diff --git 
a/library-udf/src/test/java/org/apache/iotdb/library/dprofile/DProfileTests.java
 
b/library-udf/src/test/java/org/apache/iotdb/library/dprofile/DProfileTests.java
index 20603caa4f..1618a184f4 100644
--- 
a/library-udf/src/test/java/org/apache/iotdb/library/dprofile/DProfileTests.java
+++ 
b/library-udf/src/test/java/org/apache/iotdb/library/dprofile/DProfileTests.java
@@ -575,6 +575,19 @@ public class DProfileTests {
     }
   }
 
+  @Test
+  public void testsample3() {
+    String sqlStr = "select sample(d1.s2, 'method'='triangle','k'='5') from 
root.vehicle";
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
   @Test
   public void testSegment1() {
     String sqlStr = "select segment(d2.s2,'error'='10') from root.vehicle";

Reply via email to