This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/area-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 30c63392fcff1659ac8979a7858472911598f3e4 Author: Lei Rui <[email protected]> AuthorDate: Tue Jan 28 04:25:35 2025 +0800 minmax --- server/pom.xml | 12 +- server/sample_minmax-jar-with-dependencies.jar | Bin 0 -> 40761381 bytes .../org/apache/iotdb/db/query/eBUG/MinMax.java | 241 +++++++++++++++++++++ .../org/apache/iotdb/db/query/eBUG/sample_FSW.java | 4 - ...ample_bottomUpYdiff.java => sample_MinMax.java} | 33 +-- .../iotdb/db/query/eBUG/sample_bottomUpYdiff.java | 1 - .../apache/iotdb/db/query/eBUG/sample_eBUG.java | 3 +- 7 files changed, 270 insertions(+), 24 deletions(-) diff --git a/server/pom.xml b/server/pom.xml index f918a874eb2..1808c5f6791 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -206,6 +206,12 @@ <artifactId>caffeine</artifactId> <version>2.9.1</version> </dependency> + <dependency> + <groupId>org.datanucleus</groupId> + <artifactId>datanucleus-core</artifactId> + <version>4.1.17</version> + <scope>compile</scope> + </dependency> </dependencies> <build> <plugins> @@ -281,12 +287,14 @@ <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> - <finalName>sample_fsw</finalName> + <finalName>sample_minmax</finalName> + <!-- <finalName>sample_fsw</finalName>--> <!-- <finalName>sample_eBUG</finalName>--> <!-- <finalName>sample_BUYdiff</finalName>--> <archive> <manifest> - <mainClass>org.apache.iotdb.db.query.eBUG.sample_FSW</mainClass> + <mainClass>org.apache.iotdb.db.query.eBUG.sample_MinMax</mainClass> + <!-- <mainClass>org.apache.iotdb.db.query.eBUG.sample_FSW</mainClass>--> <!-- <mainClass>org.apache.iotdb.db.query.eBUG.sample_eBUG</mainClass>--> <!-- <mainClass>org.apache.iotdb.db.query.eBUG.sample_bottomUpYdiff</mainClass>--> </manifest> diff --git a/server/sample_minmax-jar-with-dependencies.jar b/server/sample_minmax-jar-with-dependencies.jar new file mode 100644 index 00000000000..cd09c6c3bbf Binary files /dev/null and b/server/sample_minmax-jar-with-dependencies.jar differ diff --git a/server/src/main/java/org/apache/iotdb/db/query/eBUG/MinMax.java b/server/src/main/java/org/apache/iotdb/db/query/eBUG/MinMax.java new file mode 100644 index 00000000000..5ee516f0f9f --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/eBUG/MinMax.java @@ -0,0 +1,241 @@ +package org.apache.iotdb.db.query.eBUG; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class MinMax { + public enum fixedBUCKETtype { + width, + frequency, + } + + public static List<Point> reducePoints_equalFrequencyBucket( + List<Point> points, int m, boolean debug) { + // 不算首尾点,还要(m-2)/2个分桶,每个分桶里采集MinMax两个点 + int numOfBuckets = (int) ((m - 2) / 2.0); + + // 等宽分桶 + int frequency = (int) Math.ceil(points.size() * 1.0 / numOfBuckets); + + if (debug) { + System.out.println("numOfBuckets=" + numOfBuckets); + System.out.println("frequency=" + frequency); + } + + List<Point> res = new ArrayList<>(); + res.add(points.get(0)); + + int currentBucketIdx = 0; + double currentBucketStartIdx = currentBucketIdx * frequency; + double currentBucketEndIdx = currentBucketStartIdx + frequency; + double minValue = Double.MAX_VALUE; + int minIdx = -1; + double maxValue = -Double.MAX_VALUE; // Double.MIN_VALUE is positive so do not use it!!! + int maxIdx = -1; + for (int i = 0; i < points.size() - 1; i++) { // 注意到倒数第二个点为止,因为如果到全局尾点,由于精度出入会有bug + Point p = points.get(i); + while (i >= currentBucketEndIdx) { + // record the results of the last bucket + if (minIdx < maxIdx) { + res.add(points.get(minIdx)); + res.add(points.get(maxIdx)); + } else { + res.add(points.get(maxIdx)); + res.add(points.get(minIdx)); + } + if (debug) { + System.out.println( + currentBucketStartIdx + + "," + + currentBucketEndIdx + + "," + + currentBucketIdx + + "," + + res.size()); + } + // find the bucket that holds p, and reset for this new bucket + currentBucketIdx++; + currentBucketStartIdx = currentBucketIdx * frequency; + currentBucketEndIdx = currentBucketStartIdx + frequency; + minValue = Double.MAX_VALUE; + maxValue = -Double.MAX_VALUE; + minIdx = -1; + maxIdx = -1; + } + if (p.y < minValue) { + minValue = p.y; + minIdx = i; + } + if (p.y > maxValue) { + maxValue = p.y; + maxIdx = i; + } + } + + // record the results of the last bucket + if (minIdx < maxIdx) { + res.add(points.get(minIdx)); + res.add(points.get(maxIdx)); + } else { + res.add(points.get(maxIdx)); + res.add(points.get(minIdx)); + } + if (debug) { + System.out.println( + currentBucketStartIdx + + "," + + currentBucketEndIdx + + "," + + currentBucketIdx + + "," + + res.size()); + } + + // record the last point + res.add(points.get(points.size() - 1)); + + return res; + } + + public static List<Point> reducePoints_equalWidthBucket( + List<Point> points, int m, boolean debug) { + double startTime = points.get(0).x; + double endTime = points.get(points.size() - 1).x; + + // 不算首尾点,还要(m-2)/2个分桶,每个分桶里采集MinMax两个点 + int numOfBuckets = (int) ((m - 2) / 2.0); + + // 等宽分桶 + double interval = (endTime - startTime) / numOfBuckets; + + if (debug) { + System.out.println("numOfBuckets=" + numOfBuckets); + System.out.println("width=" + interval); + } + + List<Point> res = new ArrayList<>(); + res.add(points.get(0)); + + int currentBucketIdx = 0; + double currentBucketStartTime = currentBucketIdx * interval; + double currentBucketEndTime = currentBucketStartTime + interval; + double minValue = Double.MAX_VALUE; + int minIdx = -1; + double maxValue = -Double.MAX_VALUE; // Double.MIN_VALUE is positive so do not use it!!! + int maxIdx = -1; + for (int i = 0; i < points.size() - 1; i++) { // 注意到倒数第二个点为止,因为如果到全局尾点,由于精度出入会有bug + Point p = points.get(i); + while (p.x >= currentBucketEndTime) { + // record the results of the last bucket + if (minIdx < maxIdx) { + res.add(points.get(minIdx)); + res.add(points.get(maxIdx)); + } else { + res.add(points.get(maxIdx)); + res.add(points.get(minIdx)); + } + if (debug) { + System.out.println( + currentBucketStartTime + + "," + + currentBucketEndTime + + "," + + currentBucketIdx + + "," + + res.size()); + } + // find the bucket that holds p, and reset for this new bucket + currentBucketIdx++; + currentBucketStartTime = currentBucketIdx * interval; + currentBucketEndTime = currentBucketStartTime + interval; + minValue = Double.MAX_VALUE; + maxValue = -Double.MAX_VALUE; + minIdx = -1; + maxIdx = -1; + } + if (p.y < minValue) { + minValue = p.y; + minIdx = i; + } + if (p.y > maxValue) { + maxValue = p.y; + maxIdx = i; + } + } + + // record the results of the last bucket + if (minIdx < maxIdx) { + res.add(points.get(minIdx)); + res.add(points.get(maxIdx)); + } else { + res.add(points.get(maxIdx)); + res.add(points.get(minIdx)); + } + if (debug) { + System.out.println( + currentBucketStartTime + + "," + + currentBucketEndTime + + "," + + currentBucketIdx + + "," + + res.size()); + } + + // record the last point + res.add(points.get(points.size() - 1)); + + return res; + } + + public static void main(String[] args) { + Random rand = new Random(10); + String input = "D:\\datasets\\regular\\tmp2.csv"; + boolean hasHeader = true; + int timeIdx = 0; + int valueIdx = 1; + int N = -1; + List<Point> points = Tool.readFromFile(input, hasHeader, timeIdx, valueIdx, N); + // Polyline polyline = new Polyline(); + // for (int i = 0; i < N; i += 1) { + // double v = rand.nextInt(1000); + // polyline.addVertex(new Point(i, v)); + // } + // List<Point> points = polyline.getVertices(); + try (FileWriter writer = new FileWriter("raw.csv")) { + // 写入CSV头部 + writer.append("x,y,z\n"); + + // 写入每个点的数据 + for (Point point : points) { + writer.append(point.x + "," + point.y + "," + point.z + "\n"); + } + System.out.println(points.size() + " Data has been written"); + } catch (IOException e) { + System.out.println("Error writing to CSV file: " + e.getMessage()); + } + int m = 10; + + long startTime = System.currentTimeMillis(); + List<Point> sampled = reducePoints_equalFrequencyBucket(points, m, false); + long endTime = System.currentTimeMillis(); + System.out.println("Time taken: " + (endTime - startTime) + "ms"); + + // for (Point p : sampled) { + // System.out.println(p); + // } + System.out.println(sampled.size()); + + try (PrintWriter writer = new PrintWriter(new File("output.csv"))) { + // 写入字符串 + for (int i = 0; i < sampled.size(); i++) { + writer.println(sampled.get(i).x + "," + sampled.get(i).y); + } + + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_FSW.java b/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_FSW.java index 426c99f4638..098cf1e36ff 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_FSW.java +++ b/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_FSW.java @@ -9,9 +9,6 @@ import static org.apache.iotdb.db.query.eBUG.Tool.generateOutputFileName; import static org.apache.iotdb.db.query.eBUG.Tool.getParam; public class sample_FSW { - // 输入一条时间序列 t,v - // 输出按照bottom-up淘汰顺序排列的dominated significance,t,v。 - // 用于后期在线采样时选取倒数m个点(也就是DS最大的m个点,或者最晚淘汰的m个点)作为采样结果(选出之后要自行把这m个点重新按照时间戳x递增排列) public static void main(String[] args) throws IOException { if (args.length < 7) { System.out.println( @@ -65,7 +62,6 @@ public class sample_FSW { + (endTime - startTime) + "ms"); - // 输出结果到csv,按照z,x,y三列,因为results结果已经按照z(即DS)递增排序,对应bottom-up的淘汰顺序,越小代表越早被淘汰 try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputFile))) { writer.write("x,y"); writer.newLine(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_bottomUpYdiff.java b/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_MinMax.java similarity index 71% copy from server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_bottomUpYdiff.java copy to server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_MinMax.java index ec96d1d45e8..767b7f7151e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_bottomUpYdiff.java +++ b/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_MinMax.java @@ -7,11 +7,11 @@ import java.util.List; import static org.apache.iotdb.db.query.eBUG.Tool.generateOutputFileName; -public class sample_bottomUpYdiff { +public class sample_MinMax { public static void main(String[] args) throws IOException { if (args.length < 7) { System.out.println( - "Usage: Please provide arguments: inputFilePath,hasHeader,timeIdx,valueIdx,N,m,errorType,(outDir)"); + "Usage: Please provide arguments: inputFilePath,hasHeader,timeIdx,valueIdx,N,m,bucketType,(outDir)"); } String input = args[0]; boolean hasHeader = Boolean.parseBoolean(args[1]); @@ -19,16 +19,14 @@ public class sample_bottomUpYdiff { int valueIdx = Integer.parseInt(args[3]); int N = Integer.parseInt(args[4]); // N<=0表示读全部行,N>0表示读最多N行 int m = Integer.parseInt(args[5]); - String errorTypeStr = args[6]; - DP.ERRORtype errorType; - if (errorTypeStr.equals("L1")) { - errorType = DP.ERRORtype.L1; - } else if (errorTypeStr.equals("L2")) { - errorType = DP.ERRORtype.L2; - } else if (errorTypeStr.equals("L_infy")) { - errorType = DP.ERRORtype.L_infy; + String bucketTypeStr = args[6]; + MinMax.fixedBUCKETtype bucketType; + if (bucketTypeStr.equals("width")) { + bucketType = MinMax.fixedBUCKETtype.width; + } else if (bucketTypeStr.equals("frequency")) { + bucketType = MinMax.fixedBUCKETtype.frequency; } else { - throw new IOException("please input errorType as L1/L2/L_infy"); + throw new IOException("please input fixed bucket type as width/frequency"); } String outDir; if (args.length > 7) { @@ -44,20 +42,24 @@ public class sample_bottomUpYdiff { System.out.println("Value index: " + valueIdx); System.out.println("N: " + N); System.out.println("m: " + m); - System.out.println("errorType: " + errorType); + System.out.println("bucketType: " + bucketType); System.out.println("outDir: " + outDir); // 读取原始序列 List<Point> points = Tool.readFromFile(input, hasHeader, timeIdx, valueIdx, N); String outputFile = - generateOutputFileName(input, outDir, "-BU" + errorType + "-n" + points.size() + "-m" + m); + generateOutputFileName(input, outDir, "-" + bucketType + "-n" + points.size() + "-m" + m); System.out.println("Output file: " + outputFile); // do not modify this hint string log // 采样 + List<Point> results; long startTime = System.currentTimeMillis(); - // List<Point> results = SWAB.seg_bottomUp_m_withTimestamps(points, m, null, false); - List<Point> results = BottomUpYdiff.reducePoints(points, m, errorType, false); + if (bucketType == MinMax.fixedBUCKETtype.width) { + results = MinMax.reducePoints_equalWidthBucket(points, m, false); + } else { + results = MinMax.reducePoints_equalFrequencyBucket(points, m, false); + } long endTime = System.currentTimeMillis(); System.out.println("result point number: " + results.size()); @@ -70,7 +72,6 @@ public class sample_bottomUpYdiff { + (endTime - startTime) + "ms"); - // 输出结果到csv,按照z,x,y三列,因为results结果已经按照z(即DS)递增排序,对应bottom-up的淘汰顺序,越小代表越早被淘汰 try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputFile))) { writer.write("x,y"); writer.newLine(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_bottomUpYdiff.java b/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_bottomUpYdiff.java index ec96d1d45e8..47c4f1a21e1 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_bottomUpYdiff.java +++ b/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_bottomUpYdiff.java @@ -70,7 +70,6 @@ public class sample_bottomUpYdiff { + (endTime - startTime) + "ms"); - // 输出结果到csv,按照z,x,y三列,因为results结果已经按照z(即DS)递增排序,对应bottom-up的淘汰顺序,越小代表越早被淘汰 try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputFile))) { writer.write("x,y"); writer.newLine(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_eBUG.java b/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_eBUG.java index ca44730456c..d8a4b5ea198 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_eBUG.java +++ b/server/src/main/java/org/apache/iotdb/db/query/eBUG/sample_eBUG.java @@ -65,7 +65,7 @@ public class sample_eBUG { + (endTime - startTime) + "ms"); - // 输出结果到csv,按照z,x,y三列,因为results结果已经按照z(即DS)递增排序,对应bottom-up的淘汰顺序,越小代表越早被淘汰 + // 输出结果到csv try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputFile))) { // 写入表头 if (m <= 2) { @@ -75,6 +75,7 @@ public class sample_eBUG { writer.newLine(); // 写入数据行,按顺序 z, x, y + // 按照z,x,y三列,因为results结果已经按照z(即DS)递增排序,对应bottom-up的淘汰顺序,越小代表越早被淘汰 for (Point point : results) { writer.write(point.z + "," + point.x + "," + point.y); writer.newLine();
