This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/area-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f2f408ba529c373284bcb12798dc0c08605d25ee Author: Lei Rui <[email protected]> AuthorDate: Wed Jan 29 19:38:17 2025 +0800 add --- server/sample_ltd-jar-with-dependencies.jar | Bin 40785602 -> 40794458 bytes .../java/org/apache/iotdb/db/query/eBUG/LTD.java | 282 ++++++++++++++------- .../org/apache/iotdb/db/query/eBUG/LTDBucket.java | 68 +++++ .../eBUG/{LTD.java => LTD_slow_deprecated.java} | 37 ++- 4 files changed, 279 insertions(+), 108 deletions(-) diff --git a/server/sample_ltd-jar-with-dependencies.jar b/server/sample_ltd-jar-with-dependencies.jar index f4b6c621844..db15b02bb3b 100644 Binary files a/server/sample_ltd-jar-with-dependencies.jar and b/server/sample_ltd-jar-with-dependencies.jar differ diff --git a/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD.java b/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD.java index 6eb2051411c..bd91813b720 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD.java +++ b/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD.java @@ -1,10 +1,7 @@ package org.apache.iotdb.db.query.eBUG; import java.io.*; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Random; +import java.util.*; public class LTD { public static Point calculateAveragePoint(List<Point> points, int startClosed, int endClosed) { @@ -52,108 +49,200 @@ public class LTD { return sse; } - public static List<Integer> getLtdBinIdxs(List<Point> points, int m, int maxIter, boolean debug) { - int numOfIterations = (int) (points.size() * 1.0 / m * 10); + public static List<Integer> getLtdBinIdxs(List<Point> points, int m, int maxIter, boolean debug) + throws IOException { + if (m < 6) { + throw new IOException("m at least 6"); + } + int n = points.size(); + int numOfIterations; if (maxIter >= 0) { - // numOfIterations = Math.min(numOfIterations, maxIter); numOfIterations = maxIter; // overwrite + } else { + numOfIterations = (int) (n * 1.0 / m * 10); } - double blockSize = (points.size() - 3) * 1.0 / (m - 2); + if (debug) { + System.out.println("numOfIterations=" + numOfIterations); + } + + int nbins = m - 2; // 留出全局首尾点 - List<Integer> offset = new LinkedList<>(); - for (double i = 1; i < points.size(); i += blockSize) { + double blockSize = (n - 3) * 1.0 / nbins; + List<Integer> offset = new LinkedList<>(); // nbins个分桶,nbins+1个桶边界 + for (double i = 1; i < n; i += blockSize) { offset.add((int) i); // 1~n-2, 这样最后一个offset+1才不会超出边界 } if (debug) { - System.out.println("numOfIterations=" + numOfIterations); - System.out.println(offset); + System.out.println("init bins: " + offset); } - List<Double> sse = new LinkedList<>(); - - // Initialization - for (int i = 0; i < m - 2; i++) { - // with one extra point overlapping for each adjacent bucket - sse.add(calculateSSEForBucket(points, offset.get(i) - 1, offset.get(i + 1) + 1)); + LTDBucket[] buckets = new LTDBucket[nbins]; + double lastSSE = -1; + for (int i = 0; i < nbins - 1; i++) { + double sse; + if (i == 0) { + sse = calculateSSEForBucket(points, offset.get(i) - 1, offset.get(i + 1) + 1); + } else { + sse = lastSSE; + } + double sse_next = calculateSSEForBucket(points, offset.get(i + 1) - 1, offset.get(i + 2) + 1); + double sum = sse + sse_next; + buckets[i] = new LTDBucket(offset.get(i), offset.get(i + 1), sse, sum); + lastSSE = sse_next; + } + // the last bucket sumOf2SSE set as infinity meaning never merged + buckets[nbins - 1] = + new LTDBucket(offset.get(nbins - 1), offset.get(nbins), lastSSE, Double.MAX_VALUE); + + // 设置前后关系 + LTDBucket starter = new LTDBucket(0, 0, 0, 0); + starter.next = buckets[0]; + for (int i = 0; i < nbins; i++) { + buckets[i].prev = (i == 0 ? starter : buckets[i - 1]); + buckets[i].next = (i == nbins - 1 ? null : buckets[i + 1]); } + // System.out.println("begin creating heap..."); + + // 使用优先队列构建 + PriorityQueue<LTDBucket> splitHeap = + new PriorityQueue<>((p1, p2) -> Double.compare(p2.sse, p1.sse)); + // 越大的排在前面 + Collections.addAll(splitHeap, buckets); + PriorityQueue<LTDBucket> mergeHeap = + new PriorityQueue<>(Comparator.comparingDouble(p -> p.sumOf2SSE)); + // 越小的排在前面 + Collections.addAll(mergeHeap, buckets); + + // System.out.println("begin iterating..."); + for (int c = 0; c < numOfIterations; c++) { - // Find the bucket to be split - int maxSSEIndex = -1; - double maxSSE = Double.NEGATIVE_INFINITY; - for (int i = 0; i < m - 2; i++) { - if (offset.get(i + 1) - offset.get(i) <= 1) { - continue; - } - if (sse.get(i) > maxSSE) { - maxSSE = sse.get(i); - maxSSEIndex = i; - } + if (debug) { + System.out.println("--------------[" + c + "]----------------"); } - if (maxSSEIndex < 0) { - if (debug) { - System.out.println(c); - System.out.println(maxSSEIndex); - System.out.println("break max"); - } + if (splitHeap.isEmpty() || mergeHeap.isEmpty()) { break; } - - // Find the buckets to be merged - int minSSEIndex = -1; - double minSSE = Double.POSITIVE_INFINITY; - for (int i = 0; i < m - 3; i++) { - if (i == maxSSEIndex || i + 1 == maxSSEIndex) { - continue; - } - if (sse.get(i) + sse.get(i + 1) < minSSE) { - minSSE = sse.get(i) + sse.get(i + 1); - minSSEIndex = i; + // Find the bucket to be split and buckets to be merged + LTDBucket bucket_split = splitHeap.poll(); + if (bucket_split.isDeleted) { + c--; + continue; + } + LTDBucket buckets_merged = mergeHeap.poll(); + if (buckets_merged.isDeleted) { + c--; + splitHeap.add(bucket_split); // 前面poll出来的未被删除的要加回去! + continue; + } + // TODO + if (bucket_split.startIdx == buckets_merged.startIdx) { + if (buckets_merged.next.next != null) { + buckets_merged = buckets_merged.next; + } else { + buckets_merged = buckets_merged.prev; } } - if (minSSEIndex < 0) { - if (debug) { - System.out.println(c); - System.out.println(minSSEIndex); - System.out.println("break min"); + if (bucket_split.endIdx == buckets_merged.getMergedEndIdx()) { + if (buckets_merged.prev != starter) { + buckets_merged = buckets_merged.prev; + } else { // 假设nbins至少>=4, 于是buckets_merged.next.next不为null + buckets_merged = buckets_merged.next.next; } - break; } - // Split - int startIdx = offset.get(maxSSEIndex); - int endIdx = offset.get(maxSSEIndex + 1); - int middleIdx = (startIdx + endIdx) / 2; - offset.add(maxSSEIndex + 1, middleIdx); - - // Update SSE affected by split - sse.set( - maxSSEIndex, - calculateSSEForBucket( - points, offset.get(maxSSEIndex) - 1, offset.get(maxSSEIndex + 1) + 1)); - - double newSse = - calculateSSEForBucket( - points, offset.get(maxSSEIndex + 1) - 1, offset.get(maxSSEIndex + 2) + 1); + // split + if (debug) { + System.out.println("+++To split bucket: " + bucket_split); + } + int startIdx = bucket_split.startIdx; + int endIdx = bucket_split.endIdx; + int middleIdx = (int) ((startIdx + endIdx) * 1.0 / 2); + double sse1 = calculateSSEForBucket(points, startIdx - 1, middleIdx + 1); + double sse2 = calculateSSEForBucket(points, middleIdx - 1, endIdx + 1); + + LTDBucket newBucket = + new LTDBucket(middleIdx, endIdx, sse2, sse2 + bucket_split.getNextSSE()); + newBucket.prev = bucket_split; + newBucket.next = bucket_split.next; + if (newBucket.next != null) { + newBucket.next.prev = newBucket; + } + bucket_split.next = newBucket; + + bucket_split.isDeleted = true; + LTDBucket replaceBucket = new LTDBucket(bucket_split); + replaceBucket.endIdx = middleIdx; + replaceBucket.sse = sse1; + replaceBucket.sumOf2SSE = sse1 + sse2; + replaceBucket.next = newBucket; + // 更新前一个桶的sumOf2SSE 注意这意味着前一个桶也要更新heap! + if (replaceBucket.prev != starter) { + replaceBucket.prev.isDeleted = true; + LTDBucket preReplaceBucket = new LTDBucket(replaceBucket.prev); + preReplaceBucket.sumOf2SSE = preReplaceBucket.sse + sse1; + splitHeap.add(preReplaceBucket); + mergeHeap.add(preReplaceBucket); + } - sse.add(maxSSEIndex + 1, newSse); + splitHeap.add(newBucket); + splitHeap.add(replaceBucket); + mergeHeap.add(newBucket); + mergeHeap.add(replaceBucket); - // Merge - if (minSSEIndex > maxSSEIndex) { - minSSEIndex += 1; // Adjust index + if (debug) { + System.out.println("\tsplit into: " + replaceBucket + "," + newBucket); } - offset.remove(minSSEIndex + 1); - - sse.set( - minSSEIndex, - calculateSSEForBucket( - points, offset.get(minSSEIndex) - 1, offset.get(minSSEIndex + 1) + 1)); - sse.remove(minSSEIndex + 1); + // merge + if (debug) { + System.out.println("---To merge bucket: " + buckets_merged + "," + buckets_merged.next); + } + startIdx = buckets_merged.startIdx; + endIdx = buckets_merged.getMergedEndIdx(); + double sse3 = calculateSSEForBucket(points, startIdx - 1, endIdx + 1); + buckets_merged.isDeleted = true; + buckets_merged.next.isDeleted = true; + LTDBucket mergedBucket = new LTDBucket(buckets_merged); + mergedBucket.endIdx = endIdx; + mergedBucket.sse = sse3; + mergedBucket.next = buckets_merged.next.next; + if (mergedBucket.next != null) { + mergedBucket.next.prev = mergedBucket; + } + // 更新自己的sumOf2SSE + mergedBucket.sumOf2SSE = sse3 + mergedBucket.getNextSSE(); // 如果next为null,这一项就是Infinity + // 更新前一个分桶的sumOf2SSE 注意这意味着前一个桶也要更新heap! + if (mergedBucket.prev != starter) { + mergedBucket.prev.isDeleted = true; + LTDBucket preReplaceBucket = new LTDBucket(mergedBucket.prev); + preReplaceBucket.sumOf2SSE = preReplaceBucket.sse + sse3; + splitHeap.add(preReplaceBucket); + mergeHeap.add(preReplaceBucket); + } + splitHeap.add(mergedBucket); + mergeHeap.add(mergedBucket); + if (debug) { + System.out.println("\tmerged bucket: " + mergedBucket); + } + // System.out.println("----------" + splitHeap.size() + "-------------"); + // System.out.println("----------" + mergeHeap.size() + "-------------"); } - // Convert ArrayList to int[] and return - return offset; + List<Integer> res = new ArrayList<>(); + LTDBucket bucket = starter.next; + while (true) { + res.add(bucket.startIdx); + if (bucket.next == null) { + res.add(bucket.endIdx); // 收尾 + break; + } + bucket = bucket.next; + } + if (debug) { + System.out.println(res); + } + return res; } public static List<Point> LTTB(List<Point> points, List<Integer> bins) { @@ -185,25 +274,26 @@ public class LTD { return res; } - public static List<Point> LTD(List<Point> points, int m, int maxIter, boolean debug) { + public static List<Point> LTD(List<Point> points, int m, int maxIter, boolean debug) + throws IOException { List<Integer> bins = getLtdBinIdxs(points, m, maxIter, debug); return LTTB(points, bins); } - public static void main(String[] args) { + public static void main(String[] args) throws IOException { Random rand = new Random(10); String input = "D:\\datasets\\regular\\tmp2.csv"; boolean hasHeader = true; int timeIdx = 0; int valueIdx = 1; - int N = 10000; - List<Point> points = Tool.readFromFile(input, hasHeader, timeIdx, valueIdx, N); - // Polyline polyline = new Polyline(); - // for (int i = 0; i < N; i += 1) { - // double v = rand.nextInt(1000); - // polyline.addVertex(new Point(i, v)); - // } - // List<Point> points = polyline.getVertices(); + int N = 1000_0; + // List<Point> points = Tool.readFromFile(input, hasHeader, timeIdx, valueIdx, N); + Polyline polyline = new Polyline(); + for (int i = 0; i < N; i += 1) { + double v = rand.nextInt(1000); + polyline.addVertex(new Point(i, v)); + } + List<Point> points = polyline.getVertices(); try (FileWriter writer = new FileWriter("raw.csv")) { // 写入CSV头部 writer.append("x,y,z\n"); @@ -217,17 +307,17 @@ public class LTD { System.out.println("Error writing to CSV file: " + e.getMessage()); } - int m = 10; - int maxIter = 10; + int m = 1000; + int maxIter = -1; long startTime = System.currentTimeMillis(); // List<Integer> bins = getLtdBinIdxs(points, m, true); - List<Point> sampled = LTD(points, m, maxIter, true); + List<Point> sampled = LTD(points, m, maxIter, false); long endTime = System.currentTimeMillis(); System.out.println("Time taken: " + (endTime - startTime) + "ms"); - for (Point p : sampled) { - System.out.println(p); - } + // for (Point p : sampled) { + // System.out.println(p); + // } try (PrintWriter writer = new PrintWriter(new File("output.csv"))) { // 写入字符串 diff --git a/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTDBucket.java b/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTDBucket.java new file mode 100644 index 00000000000..59998b599ea --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTDBucket.java @@ -0,0 +1,68 @@ +package org.apache.iotdb.db.query.eBUG; + +public class LTDBucket { + + public int startIdx, endIdx; + public double sse, sumOf2SSE; + + public LTDBucket prev; + public LTDBucket next; + + public boolean isDeleted; + + public LTDBucket(int startIdx, int endIdx, double sse, double sumOf2SSE) { + this.startIdx = startIdx; + this.endIdx = endIdx; + this.sse = sse; + this.sumOf2SSE = sumOf2SSE; + this.prev = null; + this.next = null; + this.isDeleted = false; + } + + public LTDBucket(LTDBucket ltdBucket) { + this.startIdx = ltdBucket.startIdx; + this.endIdx = ltdBucket.endIdx; + this.sse = ltdBucket.sse; + this.sumOf2SSE = ltdBucket.sumOf2SSE; + this.prev = ltdBucket.prev; + this.next = ltdBucket.next; + this.isDeleted = false; + + if (ltdBucket.prev != null) { + ltdBucket.prev.next = this; + } + if (ltdBucket.next != null) { + ltdBucket.next.prev = this; + } + } + + public int getStartIdx() { + return startIdx; + } + + public int getEndIdx() { + return endIdx; + } + + public int getMergedEndIdx() { + if (this.next == null) { + return this.endIdx; + } else { + return this.next.endIdx; + } + } + + public double getNextSSE() { + if (this.next == null) { + return Double.MAX_VALUE; + } else { + return this.next.sse; + } + } + + @Override + public String toString() { + return "[" + startIdx + ", " + endIdx + "]: " + sse + "," + sumOf2SSE; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD.java b/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD_slow_deprecated.java similarity index 89% copy from server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD.java copy to server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD_slow_deprecated.java index 6eb2051411c..7746a5c8f8d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD.java +++ b/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD_slow_deprecated.java @@ -6,7 +6,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Random; -public class LTD { +public class LTD_slow_deprecated { public static Point calculateAveragePoint(List<Point> points, int startClosed, int endClosed) { double sumX = 0; double sumY = 0; @@ -78,6 +78,9 @@ public class LTD { } for (int c = 0; c < numOfIterations; c++) { + if (debug) { + System.out.println("--------------[" + c + "]----------------"); + } // Find the bucket to be split int maxSSEIndex = -1; double maxSSE = Double.NEGATIVE_INFINITY; @@ -123,6 +126,9 @@ public class LTD { // Split int startIdx = offset.get(maxSSEIndex); int endIdx = offset.get(maxSSEIndex + 1); + if (debug) { + System.out.println("+++To split bucket: " + startIdx + "," + endIdx + ":" + maxSSE); + } int middleIdx = (startIdx + endIdx) / 2; offset.add(maxSSEIndex + 1, middleIdx); @@ -144,6 +150,10 @@ public class LTD { } offset.remove(minSSEIndex + 1); + if (debug) { + System.out.println("---To merge bucket: " + offset.get(minSSEIndex) + ":" + minSSE); + } + sse.set( minSSEIndex, calculateSSEForBucket( @@ -153,6 +163,9 @@ public class LTD { } // Convert ArrayList to int[] and return + if (debug) { + System.out.println(offset); + } return offset; } @@ -196,14 +209,14 @@ public class LTD { boolean hasHeader = true; int timeIdx = 0; int valueIdx = 1; - int N = 10000; - List<Point> points = Tool.readFromFile(input, hasHeader, timeIdx, valueIdx, N); - // Polyline polyline = new Polyline(); - // for (int i = 0; i < N; i += 1) { - // double v = rand.nextInt(1000); - // polyline.addVertex(new Point(i, v)); - // } - // List<Point> points = polyline.getVertices(); + int N = 100_0000; + // List<Point> points = Tool.readFromFile(input, hasHeader, timeIdx, valueIdx, N); + Polyline polyline = new Polyline(); + for (int i = 0; i < N; i += 1) { + double v = rand.nextInt(1000); + polyline.addVertex(new Point(i, v)); + } + List<Point> points = polyline.getVertices(); try (FileWriter writer = new FileWriter("raw.csv")) { // 写入CSV头部 writer.append("x,y,z\n"); @@ -217,11 +230,11 @@ public class LTD { System.out.println("Error writing to CSV file: " + e.getMessage()); } - int m = 10; - int maxIter = 10; + int m = 1000; + int maxIter = -1; long startTime = System.currentTimeMillis(); // List<Integer> bins = getLtdBinIdxs(points, m, true); - List<Point> sampled = LTD(points, m, maxIter, true); + List<Point> sampled = LTD(points, m, maxIter, false); long endTime = System.currentTimeMillis(); System.out.println("Time taken: " + (endTime - startTime) + "ms");
