This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/LTS-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/research/LTS-visualization by this push: new fe5b2d7dbb6 readme comment fe5b2d7dbb6 is described below commit fe5b2d7dbb60ba53d257f7a50b4eaf4e9f4f705f Author: Lei Rui <1010953...@qq.com> AuthorDate: Mon Feb 12 19:35:54 2024 +0800 readme comment --- .../groupby/GroupByWithoutValueFilterDataSet.java | 295 +------------- .../groupby/LocalGroupByExecutorTri_ILTS.java | 207 ++-------- .../LocalGroupByExecutorTri_ILTS_noacc.java | 317 --------------- .../groupby/LocalGroupByExecutorTri_LTTB.java | 27 +- .../groupby/LocalGroupByExecutorTri_M4.java | 20 +- .../LocalGroupByExecutorTri_M4_deprecated.java | 446 --------------------- .../groupby/LocalGroupByExecutorTri_MinMax.java | 22 +- ...LocalGroupByExecutorTri_MinMaxPreselection.java | 12 - .../iotdb/db/integration/tri/MyTest_ILTS.java | 66 --- .../iotdb/db/integration/tri/MyTest_LTTB.java | 6 - .../apache/iotdb/db/integration/tri/MyTest_M4.java | 19 - .../iotdb/db/integration/tri/MyTest_MinMax.java | 13 - .../db/integration/tri/MyTest_MinMaxLTTB.java | 4 - .../file/metadata/statistics/Statistics.java | 3 - 14 files changed, 49 insertions(+), 1408 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java index e674e4bf7bc..d1d42ac41aa 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java @@ -163,11 +163,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { List<AggregateResult> aggregations = executor.calcResult(startTime, startTime + interval, startTime, endTime, interval); MinMaxInfo minMaxInfo = (MinMaxInfo) aggregations.get(0).getResult(); - series.append(minMaxInfo.val); // MinValueAggrResult的[t]也无意义,因为只需要val + series.append(minMaxInfo.val); - // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) - // 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) - // 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 record.addField(series, TSDataType.MIN_MAX_INT64); } catch (QueryProcessException e) { @@ -210,13 +207,11 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { } for (long localCurStartTime = startTime; localCurStartTime + interval <= endTime; - // 注意有等号!因为左闭右开 // + interval to make the last bucket complete // e.g, T=11,nout=3,interval=floor(11/3)=3, // [0,3),[3,6),[6,9), no need incomplete [9,11) // then the number of buckets must be Math.floor((endTime-startTime)/interval) localCurStartTime += interval) { - // System.out.println(localCurStartTime); // not change real curStartTime&curEndTime // attention the returned aggregations need deep copy if using directly List<AggregateResult> aggregations = @@ -243,22 +238,17 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { } // Second step: apply LTTB on the MinMax preselection result - // System.out.println(times); - // System.out.println(values); int N1 = (int) Math.floor((endTime * 1.0 - startTime) / interval); // MinMax桶数 - int N2 = N1 / (rps / divide); // LTTB桶数 - // 全局首点 + int N2 = N1 / (rps / divide); series.append(p1v).append("[").append(p1t).append("]").append(","); long lt = p1t; // left fixed t double lv = p1v; // left fixed v - // 找第一个不为空的LTTB当前桶 int currentBucket = 0; for (; currentBucket < N2; currentBucket++) { boolean emptyBucket = true; for (int j = currentBucket * rps; j < (currentBucket + 1) * rps; j++) { - // 一个LTTB桶里有rps个MinMax预选点(包含重复和null) if (times.get(j) != null) { - emptyBucket = false; // 只要有一个MinMax预选点不是null,这个LTTB桶就不是空桶 + emptyBucket = false; break; } } @@ -266,29 +256,25 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { break; } } - // 现在找到了不为空的LTTB当前桶,下面找第一个不为空的LTTB右边桶 for (int nextBucket = currentBucket + 1; nextBucket < N2; nextBucket++) { boolean emptyBucket = true; for (int j = nextBucket * rps; j < (nextBucket + 1) * rps; j++) { - // 一个LTTB桶里有rps个MinMax预选点(包含重复和null) if (times.get(j) != null) { - emptyBucket = false; // 只要有一个MinMax预选点不是null,这个LTTB桶就不是空桶 + emptyBucket = false; break; } } if (emptyBucket) { - continue; // 继续往右边找非空桶 + continue; } - // 现在计算右边非空LTTB桶的平均点 double rt = 0; double rv = 0; int cnt = 0; for (int j = nextBucket * rps; j < (nextBucket + 1) * rps; j++) { - // 一个LTTB桶里有rps个MinMax预选点(包含重复和null) if (times.get(j) != null) { rt += times.get(j); - rv += (double) values.get(j); // TODO + rv += (double) values.get(j); cnt++; } } @@ -298,18 +284,14 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { rt = rt / cnt; rv = rv / cnt; - // 现在找到当前非空桶里距离lr垂直距离最远的点 double maxArea = -1; long select_t = -1; double select_v = -1; for (int j = currentBucket * rps; j < (currentBucket + 1) * rps; j++) { - // 一个LTTB桶里有rps个MinMax预选点(包含重复和null) if (times.get(j) != null) { long t = times.get(j); double v = values.get(j); double area = IOMonitor2.calculateTri(lt, lv, t, v, rt, rv); - // System.out.printf("curr=%d,t=%d,area=%f,lt=%d%n", currentBucket, t, area, - // lt); if (area > maxArea) { maxArea = area; select_t = t; @@ -320,27 +302,21 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { if (select_t < 0) { throw new IOException("something is wrong"); } - // 记录结果 series.append(select_v).append("[").append(select_t).append("]").append(","); - // 现在更新当前桶和左边固定点,并且把结果点加到series里 currentBucket = nextBucket; lt = select_t; lv = select_v; } - // ==========下面处理最后一个桶=============== - // 现在找到当前非空桶里距离lr垂直距离最远的点 double maxArea = -1; long select_t = -1; double select_v = -1; for (int j = currentBucket * rps; j < (currentBucket + 1) * rps; j++) { - // 一个LTTB桶里有rps个MinMax预选点(包含重复和null) if (times.get(j) != null) { long t = times.get(j); double v = values.get(j); double area = IOMonitor2.calculateTri(lt, lv, t, v, pnt, pnv); // 全局尾点作为右边固定点 - // System.out.printf("curr=%d,t=%d,area=%f,lt=%d%n", currentBucket, t, area, lt); if (area > maxArea) { maxArea = area; select_t = t; @@ -353,12 +329,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { } series.append(select_v).append("[").append(select_t).append("]").append(","); - // 全局尾点 series.append(pnv).append("[").append(pnt).append("]").append(","); - // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) - // 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) - // 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 record.addField(series, TSDataType.MIN_MAX_INT64); } catch (QueryProcessException e) { @@ -374,215 +346,6 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { return record; } - // public RowRecord nextWithoutConstraintTri_M4() throws IOException { - // RowRecord record; - // try { - // GroupByExecutor executor = null; - // for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { - // executor = pathToExecutorEntry.getValue(); // assume only one series here - // break; - // } - // - // // concat results into a string - // record = new RowRecord(0); - // StringBuilder series = new StringBuilder(); - // // 全局首点(对于M4来说全局首尾点只是输出不会影响到其它桶的采点) - // - // series.append(CONFIG.getP1v()).append("[").append(CONFIG.getP1t()).append("]").append(","); - // - // for (long localCurStartTime = startTime; - // localCurStartTime + interval <= endTime; - // // 注意有等号!因为左闭右开 - // // + interval to make the last bucket complete - // // e.g, T=11,nout=3,interval=floor(11/3)=3, - // // [0,3),[3,6),[6,9), no need incomplete [9,11) - // // then the number of buckets must be Math.floor((endTime-startTime)/interval) - // localCurStartTime += interval) { // not change real curStartTime&curEndTime - // // attention the returned aggregations need deep copy if using directly - // List<AggregateResult> aggregations = - // executor.calcResult( - // localCurStartTime, - // localCurStartTime + interval, - // startTime, - // endTime, - // interval); // attention - // - // // min_value(s0), max_value(s0),min_time(s0), max_time(s0), first_value(s0), - // last_value(s0) - // // minValue[bottomTime] - // series.append(aggregations.get(0).getResult()).append(","); - // // maxValue[topTime] - // series.append(aggregations.get(1).getResult()).append(","); - // // firstValue[firstTime] - // series - // .append(aggregations.get(4).getResult()) - // .append("[") - // .append(aggregations.get(2).getResult()) - // .append("]") - // .append(","); - // // lastValue[lastTime] - // series - // .append(aggregations.get(5).getResult()) - // .append("[") - // .append(aggregations.get(3).getResult()) - // .append("]") - // .append(","); - // } - // - // // 全局尾点(对于M4来说全局首尾点只是输出不会影响到其它桶的采点) - // - // series.append(CONFIG.getPnv()).append("[").append(CONFIG.getPnt()).append("]").append(","); - // - // // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) - // // 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) - // // 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 - // record.addField(series, TSDataType.MIN_MAX_INT64); - // - // } catch (QueryProcessException e) { - // logger.error("GroupByWithoutValueFilterDataSet execute has error", e); - // throw new IOException(e.getMessage(), e); - // } - // - // // in the end, make the next hasNextWithoutConstraint() false - // // as we already fetch all here - // curStartTime = endTime; - // hasCachedTimeInterval = false; - // - // return record; - // } - - // public RowRecord nextWithoutConstraintTri_MinMax() throws IOException { - // RowRecord record; - // try { - // GroupByExecutor executor = null; - // for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { - // executor = pathToExecutorEntry.getValue(); // assume only one series here - // break; - // } - // - // // concat results into a string - // record = new RowRecord(0); - // StringBuilder series = new StringBuilder(); - // - // // all bucket results as string in value of MinValueAggrResult - // List<AggregateResult> aggregations = - // executor.calcResult(startTime, startTime + interval, startTime, endTime, interval); - // MinMaxInfo minMaxInfo = (MinMaxInfo) aggregations.get(0).getResult(); - // series.append(minMaxInfo.val); // MinValueAggrResult的[t]也无意义,因为只需要val - // - // // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) - // // 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) - // // 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 - // record.addField(series, TSDataType.MIN_MAX_INT64); - // - // } catch (QueryProcessException e) { - // logger.error("GroupByWithoutValueFilterDataSet execute has error", e); - // throw new IOException(e.getMessage(), e); - // } - // - // // in the end, make the next hasNextWithoutConstraint() false - // // as we already fetch all here - // curStartTime = endTime; - // hasCachedTimeInterval = false; - // - // return record; - // } - - // public RowRecord nextWithoutConstraintTri_M4() throws IOException { - // RowRecord record; - // try { - // GroupByExecutor executor = null; - // for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { - // executor = pathToExecutorEntry.getValue(); // assume only one series here - // break; - // } - // - // // concat results into a string - // record = new RowRecord(0); - // StringBuilder series = new StringBuilder(); - // - // // all bucket results as string in value of MinValueAggrResult - // List<AggregateResult> aggregations = - // executor.calcResult(startTime, startTime + interval, startTime, endTime, interval); - // MinMaxInfo minMaxInfo = (MinMaxInfo) aggregations.get(0).getResult(); - // series.append(minMaxInfo.val); // MinValueAggrResult的[t]也无意义,因为只需要val - // - // // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) - // // 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) - // // 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 - // record.addField(series, TSDataType.MIN_MAX_INT64); - // - // } catch (QueryProcessException e) { - // logger.error("GroupByWithoutValueFilterDataSet execute has error", e); - // throw new IOException(e.getMessage(), e); - // } - // - // // in the end, make the next hasNextWithoutConstraint() false - // // as we already fetch all here - // curStartTime = endTime; - // hasCachedTimeInterval = false; - // - // return record; - // } - - // public RowRecord nextWithoutConstraintTri_MinMax() throws IOException { - // RowRecord record; - // try { - // GroupByExecutor executor = null; - // for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { - // executor = pathToExecutorEntry.getValue(); // assume only one series here - // break; - // } - // - // // concat results into a string - // record = new RowRecord(0); - // StringBuilder series = new StringBuilder(); - // // 全局首点(对于MinMax来说全局首尾点只是输出不会影响到其它桶的采点) - // - // series.append(CONFIG.getP1v()).append("[").append(CONFIG.getP1t()).append("]").append(","); - // - // for (long localCurStartTime = startTime; - // localCurStartTime + interval <= endTime; - // // 注意有等号!因为左闭右开 - // // + interval to make the last bucket complete - // // e.g, T=11,nout=3,interval=floor(11/3)=3, - // // [0,3),[3,6),[6,9), no need incomplete [9,11) - // // then the number of buckets must be Math.floor((endTime-startTime)/interval) - // localCurStartTime += interval) { // not change real curStartTime&curEndTime - // // attention the returned aggregations need deep copy if using directly - // List<AggregateResult> aggregations = - // executor.calcResult( - // localCurStartTime, - // localCurStartTime + interval, - // startTime, - // endTime, - // interval); // attention - // series.append(aggregations.get(0).getResult()).append(","); // BPv[BPt] - // series.append(aggregations.get(1).getResult()).append(","); // TPv[TPt] - // } - // - // // 全局尾点(对于MinMax来说全局首尾点只是输出不会影响到其它桶的采点) - // - // series.append(CONFIG.getPnv()).append("[").append(CONFIG.getPnt()).append("]").append(","); - // - // // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) - // // 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) - // // 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 - // record.addField(series, TSDataType.MIN_MAX_INT64); - // - // } catch (QueryProcessException e) { - // logger.error("GroupByWithoutValueFilterDataSet execute has error", e); - // throw new IOException(e.getMessage(), e); - // } - // - // // in the end, make the next hasNextWithoutConstraint() false - // // as we already fetch all here - // curStartTime = endTime; - // hasCachedTimeInterval = false; - // - // return record; - // } - public RowRecord nextWithoutConstraint_raw() throws IOException { if (!hasCachedTimeInterval) { throw new IOException( @@ -601,10 +364,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { try { for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { GroupByExecutor executor = pathToExecutorEntry.getValue(); - // long start = System.nanoTime(); List<AggregateResult> aggregations = executor.calcResult(curStartTime, curEndTime, startTime, endTime, interval); - // IOMonitor.incTotalTime(System.nanoTime() - start); for (int i = 0; i < aggregations.size(); i++) { int resultIndex = resultIndexes.get(pathToExecutorEntry.getKey()).get(i); fields[resultIndex] = aggregations.get(i); @@ -675,49 +436,5 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { return new LocalGroupByExecutor( path, allSensors, dataType, context, timeFilter, fileFilter, ascending); } - - // // deprecated below, note that time&value index are also deprecated in Statistics - // else if (CONFIG.isEnableCPV()) { - // if (TSFileDescriptor.getInstance().getConfig().isEnableMinMaxLSM()) { // MinMax-LSM - // IOMonitor2.dataSetType = - // - // DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_EnableMinMaxLSM; - // return new LocalGroupByExecutor4MinMax( - // path, allSensors, dataType, context, timeFilter, fileFilter, ascending); - // } else { // M4-LSM - // if (TSFileDescriptor.getInstance().getConfig().isUseTimeIndex() - // && TSFileDescriptor.getInstance().getConfig().isUseValueIndex()) { - // IOMonitor2.dataSetType = - // DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_UseIndex; - // } else if (!TSFileDescriptor.getInstance().getConfig().isUseTimeIndex() - // && TSFileDescriptor.getInstance().getConfig().isUseValueIndex()) { - // IOMonitor2.dataSetType = - // - // DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_NoTimeIndex; - // } else if (TSFileDescriptor.getInstance().getConfig().isUseTimeIndex() - // && !TSFileDescriptor.getInstance().getConfig().isUseValueIndex()) { - // IOMonitor2.dataSetType = - // - // DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_NoValueIndex; - // } else { - // IOMonitor2.dataSetType = - // DataSetType - // .GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_NoTimeValueIndex; - // } - // return new LocalGroupByExecutor4CPV( - // path, allSensors, dataType, context, timeFilter, fileFilter, ascending); - // } - // } else { // enableCPV=false - // if (TSFileDescriptor.getInstance().getConfig().isUseStatistics()) { - // IOMonitor2.dataSetType = - // DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor_UseStatistics; - // } else { - // IOMonitor2.dataSetType = - // - // DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor_NotUseStatistics; - // } - // return new LocalGroupByExecutor( - // path, allSensors, dataType, context, timeFilter, fileFilter, ascending); - // } } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java index c24319dfce3..d4c6122fbbe 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java @@ -75,7 +75,7 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { private long lt; private double lv; - private final int N1; // 分桶数 + private final int N1; private final int numIterations = CONFIG.getNumIterations(); // do not make it static @@ -113,7 +113,6 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { fileFilter, ascending); - // unpackAllOverlappedFilesToTimeSeriesMetadata try { // : this might be bad to load all chunk metadata at first List<ChunkSuit4Tri> futureChunkList = @@ -141,16 +140,6 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { } } - // // debug - // for (int i = 0; i < N1; i++) { - // List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(i); - // if (chunkSuit4TriList != null) { - // for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { - // System.out.println(i + "," + chunkSuit4Tri.chunkMetadata.getStartTime()); - // } - // } - // } - } catch (IOException e) { throw new QueryProcessException(e.getMessage()); } @@ -168,9 +157,6 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { public List<AggregateResult> calcResult( long curStartTime, long curEndTime, long startTime, long endTime, long interval) throws IOException { - // 这里用calcResult一次返回所有buckets结果(可以把MinValueAggrResult的value设为string类型, - // 那就把所有buckets结果作为一个string返回。这样的话返回的[t]是没有意义的,只取valueString) - // 而不是像MinMax那样在nextWithoutConstraintTri_MinMax()里调用calcResult每次计算一个bucket StringBuilder series_final = new StringBuilder(); // clear result cache @@ -178,51 +164,36 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { result.reset(); } - long[] lastIter_t = new long[N1]; // N1不包括全局首尾点,初始化都是0,假设真实时间戳都大于0 - double[] lastIter_v = new double[N1]; // N1不包括全局首尾点 + long[] lastIter_t = new long[N1]; + double[] lastIter_v = new double[N1]; - // TODO: 如果和上次迭代时使用的lr一样那么这个bucket这次迭代就使用上次的采点结果,不必重复计算 - boolean[] lastSame = new boolean[N1 + 1]; // N1不包括全局首点,初始化都是false - lastSame[N1] = true; // 把全局尾点设为true + boolean[] lastSame = new boolean[N1 + 1]; + lastSame[N1] = true; - int num = 0; // 注意从0开始! + int num = 0; for (; num < numIterations; num++) { // NOTE: init lt&lv at the start of each iteration is a must, because they are modified in // each iteration lt = CONFIG.getP1t(); lv = CONFIG.getP1v(); - // boolean[] currentSame = new boolean[N1 + 1]; // N1不包括全局首点,初始化都是false - // currentSame[N1] = true; // 把全局尾点设为true - boolean allSameFlag = true; // 如果非首轮迭代全部都是true那可以提前结束迭代,因为后面都不会再有任何变化 + boolean allSameFlag = true; boolean currentLeftSame = true; - // StringBuilder series = new StringBuilder(); // TODO debug - // // 全局首点 - // series.append(p1v).append("[").append(p1t).append("]").append(","); // TODO debug - - // 遍历分桶 Assume no empty buckets for (int b = 0; b < N1; b++) { if (CONFIG.isAcc_iterRepeat() && num > 0 && lastSame[b + 1] && currentLeftSame) { - // 排除num=0,因为第一次迭代要全部算的 - // 不需要更新本轮迭代本桶选点,或者说本轮迭代本桶选点就是lastIter内已有结果 - // 也不需要更新currentNeedRecalc - // 下一个桶自然地以select_t, select_v作为左桶固定点 lt = lastIter_t[b]; lv = lastIter_v[b]; - lastSame[b] = true; // 因为这个桶现在只会被下一轮当作右边桶读了 + lastSame[b] = true; continue; } double rt = 0; // must initialize as zero, because may be used as sum for average double rv = 0; // must initialize as zero, because may be used as sum for average - // 计算右边桶的固定点 - if (b == N1 - 1) { // 最后一个桶 - // 全局尾点 + if (b == N1 - 1) { rt = pnt; rv = pnv; - } else { // 不是最后一个桶 - if (num == 0) { // 是第一次迭代的话,就使用右边桶的平均点 - // ========计算右边桶的平均点======== + } else { + if (num == 0) { List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(b + 1); if (chunkSuit4TriList == null) { throw new IOException("Empty bucket!"); @@ -230,18 +201,14 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { long rightStartTime = startTime + (b + 1) * interval; long rightEndTime = startTime + (b + 2) * interval; int cnt = 0; - // 遍历所有与右边桶overlap的chunks for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); if (dataType != TSDataType.DOUBLE) { throw new UnSupportedDataTypeException(String.valueOf(dataType)); } - // TODO: 用元数据sum&count加速 - // 如果chunk没有被桶切开,可以直接用元数据里的sum和count if (CONFIG.isAcc_avg()) { if (chunkSuit4Tri.chunkMetadata.getStartTime() >= rightStartTime && chunkSuit4Tri.chunkMetadata.getEndTime() < rightEndTime) { - // TODO 以后元数据可以增加sum of timestamps,目前就基于时间戳均匀间隔1的假设来处理 rt += (chunkSuit4Tri.chunkMetadata.getStartTime() + chunkSuit4Tri.chunkMetadata.getEndTime()) @@ -264,7 +231,7 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! } - // 2. 计算平均点 + // 2. calculate avg PageReader pageReader = chunkSuit4Tri.pageReader; for (int j = 0; j < chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); j++) { IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; @@ -287,12 +254,11 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { } rt = rt / cnt; rv = rv / cnt; - } else { // 不是第一次迭代也不是最后一个桶的话,就使用上一轮迭代右边桶的采样点 + } else { rt = lastIter_t[b + 1]; rv = lastIter_v[b + 1]; } } - // ========找到当前桶内距离lr连线最远的点======== double maxDistance = -1; long select_t = -1; double select_v = -1; @@ -300,10 +266,6 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { long localCurStartTime = startTime + (b) * interval; long localCurEndTime = startTime + (b + 1) * interval; if (CONFIG.isAcc_rectangle()) { - // TODO: 用元数据里落在桶内的FP&LP&BP&TP形成的rectangle加速 - // 边点得到距离的紧致下限,角点得到距离的非紧致上限 - // 先遍历一遍这些元数据,得到所有落在桶内的元数据点的最远点,更新maxDistance&select_t&select_v - // 然后遍历如果这个chunk的非紧致上限<=当前已知的maxDistance,那么整个chunk都不用管了 for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { long[] rect_t = new long[] { @@ -319,7 +281,6 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { (double) chunkSuit4Tri.chunkMetadata.getStatistics().getMinValue(), // BPv (double) chunkSuit4Tri.chunkMetadata.getStatistics().getMaxValue() // TPv }; - // 用落在桶内的元数据点(紧致下限)更新maxDistance&select_t&select_v for (int i = 0; i < 4; i++) { if (rect_t[i] >= localCurStartTime && rect_t[i] < localCurEndTime) { double distance = @@ -331,43 +292,29 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { } } } - // 用四个角点计算每个块的相对于当前固定线的非紧致上限 - // 注意第一步是直接赋值而不是和旧的比较,因为距离是相对于线L的,每次迭代每个分桶下的L不同 chunkSuit4Tri.distance_loose_upper_bound = - IOMonitor2.calculateDistance(lt, lv, rect_t[0], rect_v[2], rt, rv); // FPt,BPv,左下角 + IOMonitor2.calculateDistance(lt, lv, rect_t[0], rect_v[2], rt, rv); // FPt,BPv chunkSuit4Tri.distance_loose_upper_bound = Math.max( chunkSuit4Tri.distance_loose_upper_bound, - IOMonitor2.calculateDistance( - lt, lv, rect_t[0], rect_v[3], rt, rv)); // FPt,TPv,左上角 + IOMonitor2.calculateDistance(lt, lv, rect_t[0], rect_v[3], rt, rv)); // FPt,TPv chunkSuit4Tri.distance_loose_upper_bound = Math.max( chunkSuit4Tri.distance_loose_upper_bound, - IOMonitor2.calculateDistance( - lt, lv, rect_t[1], rect_v[2], rt, rv)); // LPt,BPv,右下角 + IOMonitor2.calculateDistance(lt, lv, rect_t[1], rect_v[2], rt, rv)); // LPt,BPv chunkSuit4Tri.distance_loose_upper_bound = Math.max( chunkSuit4Tri.distance_loose_upper_bound, - IOMonitor2.calculateDistance( - lt, lv, rect_t[1], rect_v[3], rt, rv)); // LPt,TPv,右上角 + IOMonitor2.calculateDistance(lt, lv, rect_t[1], rect_v[3], rt, rv)); // LPt,TPv } } - // 遍历所有与当前桶overlap的chunks for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); if (dataType != TSDataType.DOUBLE) { throw new UnSupportedDataTypeException(String.valueOf(dataType)); } - // TODO: (continue)用元数据里落在桶内的FP&LP&BP&TP形成的rectangle加速 - // 边点得到距离的紧致下限,角点得到距离的非紧致上限 - // 如果这个chunk的非紧致上限<=当前已知的maxDistance,那么整个chunk都不用管了 if (CONFIG.isAcc_rectangle()) { - // (当一个chunk的非紧致上限=紧致下限的时候,意味着有角点和边点重合, - // 如果这个上限是maxDistance,在“用元数据点/紧致下限更新maxDistance&select_t&select_v” - // 步骤中已经赋值了这个边点,所以这里跳过没关系) if (chunkSuit4Tri.distance_loose_upper_bound <= maxDistance) { - // System.out.println("skip" + b + "," + - // chunkSuit4Tri.chunkMetadata.getStartTime()); continue; } } @@ -383,47 +330,8 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! } PageReader pageReader = chunkSuit4Tri.pageReader; - // TODO: 用凸包bitmap加速 - // 如果块被分桶边界切开,那还是逐点遍历 - // 否则块完整落在桶内时,用凸包规则快速找到这个块中沿着lr连线法向量最高和最低的点,最后和全局当前最远结果点比较 - // 也可以改成先不管是不是完整落在桶里,先找到最高低点,然后如果这两个点没有当前已知最远点远那就可以排除了, - // 否则如果最远但是不在当前桶里那还是要遍历,否则最远且在桶里就可以更新当前已知最远点。 - // //目前先这样只管完全落在桶里的 - // if (CONFIG.isAcc_convex() - // && chunkSuit4Tri.chunkMetadata.getStartTime() >= localCurStartTime - // && chunkSuit4Tri.chunkMetadata.getEndTime() < localCurEndTime - // && chunkSuit4Tri.chunkMetadata.getStatistics().getCount() >= 3 // 不考虑少于三个点 - // ) { - // BitSet bitSet = - // chunkSuit4Tri.chunkMetadata.getStatistics().getQuickHullBitSet(); - // List<QuickHullPoint> foundPoints = - // convexHullAcc( - // lt, - // lv, - // rt, - // rv, - // pageReader, - // bitSet, - // chunkSuit4Tri.chunkMetadata.getStatistics().getCount()); // - // 有可能不止两个点,当一边是平行线两端点 - // // System.out.println(foundPoints); - // for (QuickHullPoint point : foundPoints) { - // IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; - // double distance = IOMonitor2.calculateDistance(lt, lv, point.t, point.v, - // rt, rv); - // if (distance > maxDistance) { - // // 是不是因为开启了acc_rect之后,导致这里要遍历的chunk块里没有点的距离可以达到maxDistance - // // 从而acc_convex不会生效?! - // maxDistance = distance; - // select_t = point.t; - // select_v = point.v; - // } - // } - // continue; // note this - // } if (CONFIG.isAcc_convex() - && chunkSuit4Tri.chunkMetadata.getStatistics().getCount() >= 3 // 不考虑少于三个点 - ) { + && chunkSuit4Tri.chunkMetadata.getStatistics().getCount() >= 3) { BitSet bitSet = chunkSuit4Tri.chunkMetadata.getStatistics().getQuickHullBitSet(); List<QuickHullPoint> foundPoints = convexHullAcc( @@ -433,35 +341,28 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { rv, pageReader, bitSet, - chunkSuit4Tri.chunkMetadata.getStatistics().getCount()); // 有可能不止两个点,当一边是平行线两端点 - // System.out.println(foundPoints); + chunkSuit4Tri.chunkMetadata.getStatistics().getCount()); double ch_maxDistance = -1; long ch_select_t = -1; double ch_select_v = -1; - // 找到foundPoints里的最远点 for (QuickHullPoint point : foundPoints) { IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; double distance = IOMonitor2.calculateDistance(lt, lv, point.t, point.v, rt, rv); if (distance > ch_maxDistance) { - // 是不是因为开启了acc_rect之后,导致这里要遍历的chunk块里没有点的距离可以达到maxDistance - // 从而acc_convex不会生效?! ch_maxDistance = distance; ch_select_t = point.t; ch_select_v = point.v; } } - // 和当前找到的最远距离比较 if (ch_maxDistance <= maxDistance) { - continue; // 这个块里一定没有比当前找到的最远点更远的点,不管块凸包最远点在不在当前桶里都不用管了 + continue; } - // 否则ch_maxDistance>maxDistance,还要判断落在当前桶内才行 if (ch_select_t >= localCurStartTime && ch_select_t < localCurEndTime) { maxDistance = ch_maxDistance; select_t = ch_select_t; select_v = ch_select_v; continue; // note this } - // 否则ch_maxDistance>maxDistance但是这个块的凸包最远点不在当前桶里,于是继续下面的遍历点操作 } int count = chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); int j; @@ -477,68 +378,42 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { double v = valueBuffer.getDouble(pageReader.timeBufferLength + j * 8); double distance = IOMonitor2.calculateDistance(lt, lv, timestamp, v, rt, rv); if (distance > maxDistance) { - // 是不是因为开启了acc_rect之后,导致这里要遍历的chunk块里没有点的距离可以达到maxDistance - // 从而acc_convex不会生效?! maxDistance = distance; select_t = timestamp; select_v = v; } } } - } // 遍历与当前桶有overlap的chunks结束 - // // 记录结果 // TODO debug - // series.append(select_v).append("[").append(select_t).append("]").append(","); + } - // 更新currentNeedRecalc,注意在记录本轮迭代本桶选点之前判断 if (CONFIG.isAcc_iterRepeat()) { - if (select_t != lastIter_t[b]) { // 本次迭代选点结果和上一轮不一样 - allSameFlag = false; // 无法提前退出迭代 - lastSame[b] = false; // 提示下一轮迭代左边桶的右边固定点变了从而左边桶到时候要重新计算 - currentLeftSame = false; // 提示这一轮迭代右边桶的左边固定点变了从而右边桶到时候要重新计算 - // if (b == 0) { // 第一个桶 - // currentSame[b + 1] = true; // 作为右边桶的左边固定点变了,所以下一轮右边桶要重新采点 - // } else if (b == N1 - 1) { // 最后一个桶 - // currentSame[b - 1] = true; // 作为左边桶的右边固定点变了,所以下一轮左边桶要重新采点 - // } else { - // currentSame[b - 1] = true; // 作为左边桶的右边固定点变了,所以下一轮左边桶要重新采点 - // currentSame[b + 1] = true; // 作为右边桶的左边固定点变了,所以下一轮右边桶要重新采点 - // } + if (select_t != lastIter_t[b]) { + allSameFlag = false; + lastSame[b] = false; + currentLeftSame = false; } else { lastSame[b] = true; currentLeftSame = true; } } - // 更新lt,lv - // 下一个桶自然地以select_t, select_v作为左桶固定点 lt = select_t; lv = select_v; - // 记录本轮迭代本桶选点 lastIter_t[b] = select_t; lastIter_v[b] = select_v; - } // 遍历分桶结束 - - // // 全局尾点 // TODO debug - // series.append(pnv).append("[").append(pnt).append("]").append(","); - // System.out.println(series); + } if (CONFIG.isAcc_iterRepeat() && allSameFlag) { - num++; // +1表示是完成的迭代次数 + num++; break; } - // 否则currentNeedRecalc里至少有一个true,因此继续迭代 - // lastSame = currentSame; - // System.out.println(Arrays.toString(needRecalc)); // TODO debug - } // end Iterations - // System.out.println("number of iterations=" + num); // TODO debug - // 全局首点 series_final.append(p1v).append("[").append(p1t).append("]").append(","); for (int i = 0; i < lastIter_t.length; i++) { series_final.append(lastIter_v[i]).append("[").append(lastIter_t[i]).append("]").append(","); } - // 全局尾点 + series_final.append(pnv).append("[").append(pnt).append("]").append(","); MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); minValueAggrResult.updateResult(new MinMaxInfo<>(series_final.toString(), 0)); @@ -546,20 +421,13 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { return results; } - // 在convex hull加速下找到沿着lr连线法向量方向最高和最低的pageReader的buffer中的两个位置 - // 不考虑少于三个点的情况 public List<QuickHullPoint> convexHullAcc( double lt, double lv, double rt, double rv, PageReader pageReader, BitSet bitSet, int count) { - // 连接左右固定点的线的法向量(A,B) double A = lv - rv; double B = rt - lt; BitSet reverseBitSet = IOMonitor2.reverse(bitSet); - // for (int i = bitSet.nextSetBit(0); i != -1; i = bitSet.nextSetBit(i + 1)) { - // indexes.add(i); - // } - long fpt = pageReader.timeBuffer.getLong(0); double fpv = pageReader.valueBuffer.getDouble(pageReader.timeBufferLength); long lpt = pageReader.timeBuffer.getLong((count - 1) * 8); @@ -617,8 +485,7 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { boolean findLowest = false; List<QuickHullPoint> foundPoints = new ArrayList<>(); - while (bitSetIdx != -1 || reverseBitSetIdx != -1) { // TODO 判断如果只有两个点,会不会一直循环出不来 - // 如果两个都是-1说明两边都找完所有的点了 + while (bitSetIdx != -1 || reverseBitSetIdx != -1) { // from left to right IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; bitSetIdx = bitSet.nextSetBit(bitSetIdx + 1); @@ -633,10 +500,10 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { int sign = IOMonitor2.checkSumSigns(A, B, LU); if (sign > 0) { findLowest = true; - foundPoints.add(LU.get(LU.size() - 2)); // 注意是倒数第二个点,不是最后一个 + foundPoints.add(LU.get(LU.size() - 2)); } else if (sign < 0) { findHighest = true; - foundPoints.add(LU.get(LU.size() - 2)); // 注意是倒数第二个点,不是最后一个 + foundPoints.add(LU.get(LU.size() - 2)); } } if (check <= 0) { // p below or on the line connecting FP&LP @@ -644,10 +511,10 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { int sign = IOMonitor2.checkSumSigns(A, B, LL); if (sign > 0) { findLowest = true; - foundPoints.add(LL.get(LL.size() - 2)); // 注意是倒数第二个点,不是最后一个 + foundPoints.add(LL.get(LL.size() - 2)); } else if (sign < 0) { findHighest = true; - foundPoints.add(LL.get(LL.size() - 2)); // 注意是倒数第二个点,不是最后一个 + foundPoints.add(LL.get(LL.size() - 2)); } } if (findLowest && findHighest) { @@ -671,10 +538,10 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { int sign = IOMonitor2.checkSumSigns(A, B, RU); if (sign > 0) { findLowest = true; - foundPoints.add(RU.get(RU.size() - 2)); // 注意是倒数第二个点,不是最后一个 + foundPoints.add(RU.get(RU.size() - 2)); } else if (sign < 0) { findHighest = true; - foundPoints.add(RU.get(RU.size() - 2)); // 注意是倒数第二个点,不是最后一个 + foundPoints.add(RU.get(RU.size() - 2)); } } if (check <= 0) { // p below or on the line connecting FP&LP @@ -682,10 +549,10 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { int sign = IOMonitor2.checkSumSigns(A, B, RL); if (sign > 0) { findLowest = true; - foundPoints.add(RL.get(RL.size() - 2)); // 注意是倒数第二个点,不是最后一个 + foundPoints.add(RL.get(RL.size() - 2)); } else if (sign < 0) { findHighest = true; - foundPoints.add(RL.get(RL.size() - 2)); // 注意是倒数第二个点,不是最后一个 + foundPoints.add(RL.get(RL.size() - 2)); } } if (findLowest && findHighest) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS_noacc.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS_noacc.java deleted file mode 100644 index f14fa323b86..00000000000 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS_noacc.java +++ /dev/null @@ -1,317 +0,0 @@ -/// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// */ -// -// package org.apache.iotdb.db.query.dataset.groupby; -// -// import org.apache.iotdb.db.conf.IoTDBConfig; -// import org.apache.iotdb.db.conf.IoTDBDescriptor; -// import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -// import org.apache.iotdb.db.exception.StorageEngineException; -// import org.apache.iotdb.db.exception.query.QueryProcessException; -// import org.apache.iotdb.db.metadata.PartialPath; -// import org.apache.iotdb.db.query.aggregation.AggregateResult; -// import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult; -// import org.apache.iotdb.db.query.context.QueryContext; -// import org.apache.iotdb.db.query.control.QueryResourceManager; -// import org.apache.iotdb.db.query.filter.TsFileFilter; -// import org.apache.iotdb.db.query.reader.series.SeriesReader; -// import org.apache.iotdb.db.utils.FileLoaderUtils; -// import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; -// import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; -// import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -// import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; -// import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; -// import org.apache.iotdb.tsfile.read.common.IOMonitor2; -// import org.apache.iotdb.tsfile.read.filter.GroupByFilter; -// import org.apache.iotdb.tsfile.read.filter.basic.Filter; -// import org.apache.iotdb.tsfile.read.reader.page.PageReader; -// import org.apache.iotdb.tsfile.utils.Pair; -// -// import org.slf4j.Logger; -// import org.slf4j.LoggerFactory; -// -// import java.io.IOException; -// import java.nio.ByteBuffer; -// import java.util.ArrayList; -// import java.util.HashMap; -// import java.util.List; -// import java.util.Map; -// import java.util.Set; -// -// public class LocalGroupByExecutorTri_ILTS_noacc implements GroupByExecutor { -// -// private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); -// private static final Logger M4_CHUNK_METADATA = LoggerFactory.getLogger("M4_CHUNK_METADATA"); -// -// // Aggregate result buffer of this path -// private final List<AggregateResult> results = new ArrayList<>(); -// -// // keys: 0,1,...,(int) Math.floor((endTime * 1.0 - startTime) / interval)-1 -// private final Map<Integer, List<ChunkSuit4Tri>> splitChunkList = new HashMap<>(); -// -// private final long p1t = CONFIG.getP1t(); -// private final double p1v = CONFIG.getP1v(); -// private final long pnt = CONFIG.getPnt(); -// private final double pnv = CONFIG.getPnv(); -// -// private long lt = p1t; -// private double lv = p1v; -// -// private final int N1; // 分桶数 -// -// private static final int numIterations = CONFIG.getNumIterations(); -// -// private Filter timeFilter; -// -// public LocalGroupByExecutorTri_ILTS_noacc( -// PartialPath path, -// Set<String> allSensors, -// TSDataType dataType, -// QueryContext context, -// Filter timeFilter, -// TsFileFilter fileFilter, -// boolean ascending) -// throws StorageEngineException, QueryProcessException { -// // long start = System.nanoTime(); -// -// // get all data sources -// QueryDataSource queryDataSource = -// QueryResourceManager.getInstance().getQueryDataSource(path, context, this.timeFilter); -// -// // update filter by TTL -// this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); -// -// SeriesReader seriesReader = -// new SeriesReader( -// path, -// allSensors, -// // fix bug: here use the aggregation type as the series data type, -// // not using pageReader.getAllSatisfiedPageData is ok -// dataType, -// context, -// queryDataSource, -// timeFilter, -// null, -// fileFilter, -// ascending); -// -// // unpackAllOverlappedFilesToTimeSeriesMetadata -// try { -// // : this might be bad to load all chunk metadata at first -// List<ChunkSuit4Tri> futureChunkList = -// new ArrayList<>(seriesReader.getAllChunkMetadatas4Tri()); // no need sort here -// // arrange futureChunkList into each bucket -// GroupByFilter groupByFilter = (GroupByFilter) timeFilter; -// long startTime = groupByFilter.getStartTime(); -// long endTime = groupByFilter.getEndTime(); -// long interval = groupByFilter.getInterval(); -// N1 = (int) Math.floor((endTime * 1.0 - startTime) / interval); // 分桶数 -// for (ChunkSuit4Tri chunkSuit4Tri : futureChunkList) { -// ChunkMetadata chunkMetadata = chunkSuit4Tri.chunkMetadata; -// long chunkMinTime = chunkMetadata.getStartTime(); -// long chunkMaxTime = chunkMetadata.getEndTime(); -// int idx1 = (int) Math.floor((chunkMinTime - startTime) * 1.0 / interval); -// int idx2 = (int) Math.floor((chunkMaxTime - startTime) * 1.0 / interval); -// idx2 = (int) Math.min(idx2, N1 - 1); -// for (int i = idx1; i <= idx2; i++) { -// splitChunkList.computeIfAbsent(i, k -> new ArrayList<>()); -// splitChunkList.get(i).add(chunkSuit4Tri); -// } -// } -// -// } catch (IOException e) { -// throw new QueryProcessException(e.getMessage()); -// } -// -// // IOMonitor2.addMeasure(Operation.M4_LSM_INIT_LOAD_ALL_CHUNKMETADATAS, System.nanoTime() - -// // start); -// } -// -// @Override -// public void addAggregateResult(AggregateResult aggrResult) { -// results.add(aggrResult); -// } -// -// @Override -// public List<AggregateResult> calcResult( -// long curStartTime, long curEndTime, long startTime, long endTime, long interval) -// throws IOException { -// // 这里用calcResult一次返回所有buckets结果(可以把MinValueAggrResult的value设为string类型, -// // 那就把所有buckets结果作为一个string返回。这样的话返回的[t]是没有意义的,只取valueString) -// // 而不是像MinMax那样在nextWithoutConstraintTri_MinMax()里调用calcResult每次计算一个bucket -// StringBuilder series_final = new StringBuilder(); -// -// // clear result cache -// for (AggregateResult result : results) { -// result.reset(); -// } -// -// long[] lastIter_t = new long[N1]; // N1不包括全局首尾点 -// double[] lastIter_v = new double[N1]; // N1不包括全局首尾点 -// for (int num = 0; num < numIterations; num++) { -// // StringBuilder series = new StringBuilder(); -// // 全局首点 -// // series.append(p1v).append("[").append(p1t).append("]").append(","); -// // 遍历分桶 Assume no empty buckets -// for (int b = 0; b < N1; b++) { -// long rt = 0; // must initialize as zero, because may be used as sum for average -// double rv = 0; // must initialize as zero, because may be used as sum for average -// // 计算右边桶的固定点 -// if (b == N1 - 1) { // 最后一个桶 -// // 全局尾点 -// rt = pnt; -// rv = pnv; -// } else { // 不是最后一个桶 -// if (num == 0) { // 是第一次迭代的话,就使用右边桶的平均点 -// // ========计算右边桶的平均点======== -// List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(b + 1); -// long rightStartTime = startTime + (b + 1) * interval; -// long rightEndTime = startTime + (b + 2) * interval; -// int cnt = 0; -// // 遍历所有与右边桶overlap的chunks -// for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { -// TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); -// if (dataType != TSDataType.DOUBLE) { -// throw new UnSupportedDataTypeException(String.valueOf(dataType)); -// } -// // 1. load page data if it hasn't been loaded -// if (chunkSuit4Tri.pageReader == null) { -// chunkSuit4Tri.pageReader = -// FileLoaderUtils.loadPageReaderList4CPV( -// chunkSuit4Tri.chunkMetadata, this.timeFilter); -// // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, -// // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. -// // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE -// // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS -// // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! -// } -// // 2. 计算平均点 -// PageReader pageReader = chunkSuit4Tri.pageReader; -// for (int j = 0; j < chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); j++) { -// long timestamp = pageReader.timeBuffer.getLong(j * 8); -// if (timestamp < rightStartTime) { -// continue; -// } else if (timestamp >= rightEndTime) { -// break; -// } else { // rightStartTime<=t<rightEndTime -// ByteBuffer valueBuffer = pageReader.valueBuffer; -// double v = valueBuffer.getDouble(pageReader.timeBufferLength + j * 8); -// rt += timestamp; -// rv += v; -// cnt++; -// } -// } -// } -// if (cnt == 0) { -// throw new IOException("Empty bucket!"); -// } -// rt = rt / cnt; -// rv = rv / cnt; -// } else { // 不是第一次迭代也不是最后一个桶的话,就使用上一轮迭代右边桶的采样点 -// rt = lastIter_t[b + 1]; -// rv = lastIter_v[b + 1]; -// } -// } -// // ========找到当前桶内距离lr连线最远的点======== -// double maxArea = -1; -// long select_t = -1; -// double select_v = -1; -// List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(b); -// long localCurStartTime = startTime + (b) * interval; -// long localCurEndTime = startTime + (b + 1) * interval; -// // 遍历所有与当前桶overlap的chunks -// for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { -// TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); -// if (dataType != TSDataType.DOUBLE) { -// throw new UnSupportedDataTypeException(String.valueOf(dataType)); -// } -// // load page data if it hasn't been loaded -// if (chunkSuit4Tri.pageReader == null) { -// chunkSuit4Tri.pageReader = -// FileLoaderUtils.loadPageReaderList4CPV( -// chunkSuit4Tri.chunkMetadata, this.timeFilter); -// // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, -// // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. -// // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE -// // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS -// // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! -// } -// PageReader pageReader = chunkSuit4Tri.pageReader; -// for (int j = 0; j < chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); j++) { -// long timestamp = pageReader.timeBuffer.getLong(j * 8); -// if (timestamp < localCurStartTime) { -// continue; -// } else if (timestamp >= localCurEndTime) { -// break; -// } else { // localCurStartTime<=t<localCurEndTime -// ByteBuffer valueBuffer = pageReader.valueBuffer; -// double v = valueBuffer.getDouble(pageReader.timeBufferLength + j * 8); -// double area = IOMonitor2.calculateTri(lt, lv, timestamp, v, rt, rv); -// if (area > maxArea) { -// maxArea = area; -// select_t = timestamp; -// select_v = v; -// } -// } -// } -// } -// // 记录结果 -// // series.append(select_v).append("[").append(select_t).append("]").append(","); -// -// // 更新lt,lv -// // 下一个桶自然地以select_t, select_v作为左桶固定点 -// lt = select_t; -// lv = select_v; -// // 记录本轮迭代本桶选点 -// lastIter_t[b] = select_t; -// lastIter_v[b] = select_v; -// } // 遍历分桶结束 -// -// // 全局尾点 -// // series.append(pnv).append("[").append(pnt).append("]").append(","); -// // System.out.println(series); -// -// } // end Iterations -// -// // 全局首点 -// series_final.append(p1v).append("[").append(p1t).append("]").append(","); -// for (int i = 0; i < lastIter_t.length; i++) { -// -// series_final.append(lastIter_v[i]).append("[").append(lastIter_t[i]).append("]").append(","); -// } -// // 全局尾点 -// series_final.append(pnv).append("[").append(pnt).append("]").append(","); -// MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); -// minValueAggrResult.updateResult(new MinMaxInfo<>(series_final.toString(), 0)); -// -// return results; -// } -// -// @Override -// public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime) -// throws IOException { -// throw new IOException("no implemented"); -// } -// -// @Override -// public List<AggregateResult> calcResult(long curStartTime, long curEndTime) -// throws IOException, QueryProcessException { -// throw new IOException("no implemented"); -// } -// } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java index 8fe3e792c17..a8e4fe77f2e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java @@ -73,7 +73,7 @@ public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { private long lt = p1t; private double lv = p1v; - private final int N1; // 分桶数 + private final int N1; private Filter timeFilter; @@ -109,7 +109,6 @@ public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { fileFilter, ascending); - // unpackAllOverlappedFilesToTimeSeriesMetadata try { // : this might be bad to load all chunk metadata at first List<ChunkSuit4Tri> futureChunkList = @@ -150,9 +149,6 @@ public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { public List<AggregateResult> calcResult( long curStartTime, long curEndTime, long startTime, long endTime, long interval) throws IOException { - // 这里用calcResult一次返回所有buckets结果(可以把MinValueAggrResult的value设为string类型, - // 那就把所有buckets结果作为一个string返回。这样的话返回的[t]是没有意义的,只取valueString) - // 而不是像MinMax那样在nextWithoutConstraintTri_MinMax()里调用calcResult每次计算一个bucket StringBuilder series = new StringBuilder(); // clear result cache @@ -160,20 +156,16 @@ public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { result.reset(); } - // 全局首点 series.append(p1v).append("[").append(p1t).append("]").append(","); // Assume no empty buckets for (int b = 0; b < N1; b++) { double rt = 0; // must initialize as zero, because may be used as sum for average double rv = 0; // must initialize as zero, because may be used as sum for average - // 计算右边桶的固定点 - if (b == N1 - 1) { // 最后一个桶 - // 全局尾点 + if (b == N1 - 1) { rt = pnt; rv = pnv; } else { - // 计算右边桶的平均点 List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(b + 1); if (chunkSuit4TriList == null) { throw new IOException("Empty bucket!"); @@ -181,7 +173,6 @@ public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { long rightStartTime = startTime + (b + 1) * interval; long rightEndTime = startTime + (b + 2) * interval; int cnt = 0; - // 遍历所有与右边桶overlap的chunks for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); if (dataType != TSDataType.DOUBLE) { @@ -198,7 +189,7 @@ public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! } - // 2. 计算平均点 + // 2. calculate avg PageReader pageReader = chunkSuit4Tri.pageReader; for (int j = 0; j < chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); j++) { IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; @@ -223,14 +214,12 @@ public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { rv = rv / cnt; } - // 找到当前桶内距离lr连线最远的点 double maxArea = -1; long select_t = -1; double select_v = -1; List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(b); long localCurStartTime = startTime + (b) * interval; long localCurEndTime = startTime + (b + 1) * interval; - // 遍历所有与当前桶overlap的chunks for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); if (dataType != TSDataType.DOUBLE) { @@ -267,23 +256,13 @@ public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { } } } - // // clear for heap space - // if (j >= count) { - // // 代表这个chunk已经读完了,后面的bucket不会再用到,所以现在就可以清空内存的page - // // 而不是等到下一个bucket的时候再清空,因为有可能currentChunkList里chunks太多,page点同时存在太多,heap space不够 - // chunkSuit4Tri.pageReader = null; - // } } - // 记录结果 series.append(select_v).append("[").append(select_t).append("]").append(","); - // 更新lt,lv - // 下一个桶自然地以select_t, select_v作为左桶固定点 lt = select_t; lv = select_v; } - // 全局尾点 series.append(pnv).append("[").append(pnt).append("]").append(","); MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_M4.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_M4.java index 22590c54942..9c20a9c1ff2 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_M4.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_M4.java @@ -73,7 +73,7 @@ public class LocalGroupByExecutorTri_M4 implements GroupByExecutor { private Filter timeFilter; - private final int N1; // 分桶数 + private final int N1; public LocalGroupByExecutorTri_M4( PartialPath path, @@ -113,7 +113,6 @@ public class LocalGroupByExecutorTri_M4 implements GroupByExecutor { long interval = groupByFilter.getInterval(); N1 = (int) Math.floor((endTime * 1.0 - startTime) / interval); // 分桶数 - // unpackAllOverlappedFilesToTimeSeriesMetadata try { // : this might be bad to load all chunk metadata at first futureChunkList.addAll(seriesReader.getAllChunkMetadatas4Tri()); @@ -273,9 +272,6 @@ public class LocalGroupByExecutorTri_M4 implements GroupByExecutor { public List<AggregateResult> calcResult( long curStartTime, long curEndTime, long startTime, long endTime, long interval) throws IOException { - // 这里用calcResult一次返回所有buckets结果(可以把MinValueAggrResult的value设为string类型, - // 那就把所有buckets结果作为一个string返回。这样的话返回的[t]是没有意义的,只取valueString) - // 而不是像MinMax那样在nextWithoutConstraintTri_MinMax()里调用calcResult每次计算一个bucket StringBuilder series = new StringBuilder(); // clear result cache @@ -283,7 +279,6 @@ public class LocalGroupByExecutorTri_M4 implements GroupByExecutor { result.reset(); } - // 全局首点(对于MinMax来说全局首尾点只是输出不会影响到其它桶的采点) series.append(CONFIG.getP1v()).append("[").append(CONFIG.getP1t()).append("]").append(","); // Assume no empty buckets @@ -294,7 +289,6 @@ public class LocalGroupByExecutorTri_M4 implements GroupByExecutor { getCurrentChunkListFromFutureChunkList(localCurStartTime, localCurEndTime); if (currentChunkList.size() == 0) { - // System.out.println("MinMax empty currentChunkList"); // TODO debug // minValue[bottomTime],maxValue[topTime],firstValue[firstTime],lastValue[lastTime] series.append("null[null],null[null],null[null],null[null],"); continue; @@ -303,7 +297,6 @@ public class LocalGroupByExecutorTri_M4 implements GroupByExecutor { calculateM4(currentChunkList, localCurStartTime, localCurEndTime, series); } - // 全局尾点 series.append(CONFIG.getPnv()).append("[").append(CONFIG.getPnt()).append("]").append(","); MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); @@ -403,15 +396,8 @@ public class LocalGroupByExecutorTri_M4 implements GroupByExecutor { } } } - // // clear for heap space - // if (i >= count) { - // // 代表这个chunk已经读完了,后面的bucket不会再用到,所以现在就可以清空内存的page - // // 而不是等到下一个bucket的时候再清空,因为有可能currentChunkList里chunks太多,page点同时存在太多,heap space不够 - // chunkSuit4Tri.pageReader = null; - // } } } - // 记录结果 if (topTime >= 0) { // minValue[bottomTime],maxValue[topTime],firstValue[firstTime],lastValue[lastTime] series @@ -444,10 +430,6 @@ public class LocalGroupByExecutorTri_M4 implements GroupByExecutor { public boolean canUseStatistics(ChunkSuit4Tri chunkSuit4Tri, long curStartTime, long curEndTime) { return false; - // long TP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getTopTimestamp(); - // long BP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getBottomTimestamp(); - // return TP_t >= curStartTime && TP_t < curEndTime && BP_t >= curStartTime && BP_t < - // curEndTime; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_M4_deprecated.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_M4_deprecated.java deleted file mode 100644 index 0ede99042a3..00000000000 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_M4_deprecated.java +++ /dev/null @@ -1,446 +0,0 @@ -/// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// */ -// -// package org.apache.iotdb.db.query.dataset.groupby; -// -// import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -// import org.apache.iotdb.db.exception.StorageEngineException; -// import org.apache.iotdb.db.exception.query.QueryProcessException; -// import org.apache.iotdb.db.metadata.PartialPath; -// import org.apache.iotdb.db.query.aggregation.AggregateResult; -// import org.apache.iotdb.db.query.aggregation.impl.MaxValueAggrResult; -// import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult; -// import org.apache.iotdb.db.query.context.QueryContext; -// import org.apache.iotdb.db.query.control.QueryResourceManager; -// import org.apache.iotdb.db.query.filter.TsFileFilter; -// import org.apache.iotdb.db.query.reader.series.SeriesReader; -// import org.apache.iotdb.db.utils.FileLoaderUtils; -// import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; -// import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; -// import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -// import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; -// import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics; -// import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics; -// import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; -// import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; -// import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; -// import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; -// import org.apache.iotdb.tsfile.read.filter.GroupByFilter; -// import org.apache.iotdb.tsfile.read.filter.basic.Filter; -// import org.apache.iotdb.tsfile.read.reader.page.PageReader; -// import org.apache.iotdb.tsfile.utils.Pair; -// -// import org.slf4j.Logger; -// import org.slf4j.LoggerFactory; -// -// import java.io.IOException; -// import java.nio.ByteBuffer; -// import java.util.ArrayList; -// import java.util.Comparator; -// import java.util.List; -// import java.util.ListIterator; -// import java.util.Set; -// -// public class LocalGroupByExecutorTri_M4_deprecated implements GroupByExecutor { -// -// private static final Logger M4_CHUNK_METADATA = LoggerFactory.getLogger("M4_CHUNK_METADATA"); -// -// // Aggregate result buffer of this path -// private final List<AggregateResult> results = new ArrayList<>(); -// -// private List<ChunkSuit4Tri> currentChunkList; -// private final List<ChunkSuit4Tri> futureChunkList = new ArrayList<>(); -// -// private Filter timeFilter; -// -// public LocalGroupByExecutorTri_M4_deprecated( -// PartialPath path, -// Set<String> allSensors, -// TSDataType dataType, -// QueryContext context, -// Filter timeFilter, -// TsFileFilter fileFilter, -// boolean ascending) -// throws StorageEngineException, QueryProcessException { -// // long start = System.nanoTime(); -// -// // get all data sources -// QueryDataSource queryDataSource = -// QueryResourceManager.getInstance().getQueryDataSource(path, context, this.timeFilter); -// -// // update filter by TTL -// this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); -// -// SeriesReader seriesReader = -// new SeriesReader( -// path, -// allSensors, -// // fix bug: here use the aggregation type as the series data type, -// // not using pageReader.getAllSatisfiedPageData is ok -// dataType, -// context, -// queryDataSource, -// timeFilter, -// null, -// fileFilter, -// ascending); -// -// // unpackAllOverlappedFilesToTimeSeriesMetadata -// try { -// // : this might be bad to load all chunk metadata at first -// futureChunkList.addAll(seriesReader.getAllChunkMetadatas4Tri()); -// // order futureChunkList by chunk startTime -// futureChunkList.sort( -// new Comparator<ChunkSuit4Tri>() { -// public int compare(ChunkSuit4Tri o1, ChunkSuit4Tri o2) { -// return ((Comparable) (o1.chunkMetadata.getStartTime())) -// .compareTo(o2.chunkMetadata.getStartTime()); -// } -// }); -// -// if (M4_CHUNK_METADATA.isDebugEnabled()) { -// if (timeFilter instanceof GroupByFilter) { -// M4_CHUNK_METADATA.debug( -// "M4_QUERY_PARAM,{},{},{}", -// ((GroupByFilter) timeFilter).getStartTime(), -// ((GroupByFilter) timeFilter).getEndTime(), -// ((GroupByFilter) timeFilter).getInterval()); -// } -// for (ChunkSuit4Tri ChunkSuit4Tri : futureChunkList) { -// Statistics statistics = ChunkSuit4Tri.chunkMetadata.getStatistics(); -// long FP_t = statistics.getStartTime(); -// long LP_t = statistics.getEndTime(); -// long BP_t = statistics.getBottomTimestamp(); -// long TP_t = statistics.getTopTimestamp(); -// switch (statistics.getType()) { -// case INT32: -// int FP_v_int = ((IntegerStatistics) statistics).getFirstValue(); -// int LP_v_int = ((IntegerStatistics) statistics).getLastValue(); -// int BP_v_int = ((IntegerStatistics) statistics).getMinValue(); -// int TP_v_int = ((IntegerStatistics) statistics).getMaxValue(); -// M4_CHUNK_METADATA.debug( -// "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", -// FP_t, -// LP_t, -// BP_t, -// TP_t, -// FP_v_int, -// LP_v_int, -// BP_v_int, -// TP_v_int, -// ChunkSuit4Tri.chunkMetadata.getVersion(), -// ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), -// statistics.getCount()); -// break; -// case INT64: -// long FP_v_long = ((LongStatistics) statistics).getFirstValue(); -// long LP_v_long = ((LongStatistics) statistics).getLastValue(); -// long BP_v_long = ((LongStatistics) statistics).getMinValue(); -// long TP_v_long = ((LongStatistics) statistics).getMaxValue(); -// M4_CHUNK_METADATA.debug( -// "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", -// FP_t, -// LP_t, -// BP_t, -// TP_t, -// FP_v_long, -// LP_v_long, -// BP_v_long, -// TP_v_long, -// ChunkSuit4Tri.chunkMetadata.getVersion(), -// ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), -// statistics.getCount()); -// break; -// case FLOAT: -// float FP_v_float = ((FloatStatistics) statistics).getFirstValue(); -// float LP_v_float = ((FloatStatistics) statistics).getLastValue(); -// float BP_v_float = ((FloatStatistics) statistics).getMinValue(); -// float TP_v_float = ((FloatStatistics) statistics).getMaxValue(); -// M4_CHUNK_METADATA.debug( -// "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", -// FP_t, -// LP_t, -// BP_t, -// TP_t, -// FP_v_float, -// LP_v_float, -// BP_v_float, -// TP_v_float, -// ChunkSuit4Tri.chunkMetadata.getVersion(), -// ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), -// statistics.getCount()); -// break; -// case DOUBLE: -// double FP_v_double = ((DoubleStatistics) statistics).getFirstValue(); -// double LP_v_double = ((DoubleStatistics) statistics).getLastValue(); -// double BP_v_double = ((DoubleStatistics) statistics).getMinValue(); -// double TP_v_double = ((DoubleStatistics) statistics).getMaxValue(); -// M4_CHUNK_METADATA.debug( -// "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", -// FP_t, -// LP_t, -// BP_t, -// TP_t, -// FP_v_double, -// LP_v_double, -// BP_v_double, -// TP_v_double, -// ChunkSuit4Tri.chunkMetadata.getVersion(), -// ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), -// statistics.getCount()); -// break; -// default: -// throw new QueryProcessException("unsupported data type!"); -// } -// } -// } -// -// } catch (IOException e) { -// throw new QueryProcessException(e.getMessage()); -// } -// -// // IOMonitor2.addMeasure(Operation.M4_LSM_INIT_LOAD_ALL_CHUNKMETADATAS, System.nanoTime() - -// // start); -// } -// -// @Override -// public void addAggregateResult(AggregateResult aggrResult) { -// results.add(aggrResult); -// } -// -// private void getCurrentChunkListFromFutureChunkList(long curStartTime, long curEndTime) { -// // IOMonitor2.M4_LSM_status = Operation.M4_LSM_MERGE_M4_TIME_SPAN; -// -// // empty currentChunkList -// currentChunkList = new ArrayList<>(); -// -// // iterate futureChunkList -// ListIterator<ChunkSuit4Tri> itr = futureChunkList.listIterator(); -// while (itr.hasNext()) { -// ChunkSuit4Tri chunkSuit4Tri = itr.next(); -// ChunkMetadata chunkMetadata = chunkSuit4Tri.chunkMetadata; -// long chunkMinTime = chunkMetadata.getStartTime(); -// long chunkMaxTime = chunkMetadata.getEndTime(); -// if (chunkMaxTime < curStartTime) { -// // the chunk falls on the left side of the current M4 interval Ii -// itr.remove(); -// } else if (chunkMinTime >= curEndTime) { -// // the chunk falls on the right side of the current M4 interval Ii, -// // and since futureChunkList is ordered by the startTime of chunkMetadata, -// // the loop can be terminated early. -// break; -// } else if (chunkMaxTime < curEndTime) { -// // this chunk is not related to buckets later -// currentChunkList.add(chunkSuit4Tri); -// itr.remove(); -// } else { -// // this chunk is overlapped with the right border of the current bucket -// currentChunkList.add(chunkSuit4Tri); -// // still keep it in the futureChunkList -// } -// } -// } -// -// @Override -// public List<AggregateResult> calcResult( -// long curStartTime, long curEndTime, long startTime, long endTime, long interval) -// throws IOException { -// // clear result cache -// for (AggregateResult result : results) { -// result.reset(); -// } -// -// // long start = System.nanoTime(); -// getCurrentChunkListFromFutureChunkList(curStartTime, curEndTime); -// // IOMonitor2.addMeasure(Operation.M4_LSM_MERGE_M4_TIME_SPAN, System.nanoTime() - start); -// -// if (currentChunkList.size() == 0) { -// return results; -// } -// -// // start = System.nanoTime(); -// calculateM4(currentChunkList, curStartTime, curEndTime); -// // IOMonitor2.addMeasure(Operation.M4_LSM_FP, System.nanoTime() - start); -// -// return results; -// } -// -// private void calculateM4(List<ChunkSuit4Tri> currentChunkList, long curStartTime, long -// curEndTime) -// throws IOException { -// for (ChunkSuit4Tri chunkSuit4Tri : currentChunkList) { -// -// Statistics statistics = chunkSuit4Tri.chunkMetadata.getStatistics(); -// -// if (canUseStatistics(chunkSuit4Tri, curStartTime, curEndTime)) { -// // min_value(s0), max_value(s0),min_time(s0), max_time(s0), first_value(s0),last_value(s0) -// // update min_time -// results -// .get(2) -// .updateResultUsingValues( -// new long[] {chunkSuit4Tri.chunkMetadata.getStartTime()}, -// 1, -// new Object[] {statistics.getFirstValue()}); -// // update first_value -// results -// .get(4) -// .updateResultUsingValues( -// new long[] {chunkSuit4Tri.chunkMetadata.getStartTime()}, -// 1, -// new Object[] {statistics.getFirstValue()}); -// // update max_time -// results -// .get(3) -// .updateResultUsingValues( -// new long[] {chunkSuit4Tri.chunkMetadata.getEndTime()}, -// 1, -// new Object[] {statistics.getLastValue()}); -// // update last_value -// results -// .get(5) -// .updateResultUsingValues( -// new long[] {chunkSuit4Tri.chunkMetadata.getEndTime()}, -// 1, -// new Object[] {statistics.getLastValue()}); -// // update BP -// MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); -// minValueAggrResult.updateResult( -// new MinMaxInfo<>(statistics.getMinValue(), statistics.getBottomTimestamp())); -// // update TP -// MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) results.get(1); -// maxValueAggrResult.updateResult( -// new MinMaxInfo<>(statistics.getMaxValue(), statistics.getTopTimestamp())); -// } else { // cannot use statistics directly -// -// double minVal = Double.MAX_VALUE; -// long bottomTime = -1; -// double maxVal = -Double.MAX_VALUE; // Double.MIN_VALUE is positive so do not use it!!! -// long topTime = -1; -// long firstTime = -1; -// double firstValue = 0; -// long lastTime = -1; -// double lastValue = 0; -// -// // 1. load page data if it hasn't been loaded -// TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); -// if (dataType != TSDataType.DOUBLE) { -// throw new UnSupportedDataTypeException(String.valueOf(dataType)); -// } -// if (chunkSuit4Tri.pageReader == null) { -// chunkSuit4Tri.pageReader = -// FileLoaderUtils.loadPageReaderList4CPV(chunkSuit4Tri.chunkMetadata, -// this.timeFilter); -// // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, -// // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. -// // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE -// // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS -// // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! -// } -// -// int count = chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); -// PageReader pageReader = chunkSuit4Tri.pageReader; -// int i; -// for (i = chunkSuit4Tri.lastReadPos; i < count; i++) { -// long timestamp = pageReader.timeBuffer.getLong(i * 8); -// if (timestamp < curStartTime) { -// // 2. read from lastReadPos until the first point fallen within this bucket (if it -// // exists) -// continue; -// } else if (timestamp >= curEndTime) { -// // 3. traverse until the first point fallen right this bucket, also remember to update -// // lastReadPos -// chunkSuit4Tri.lastReadPos = i; -// break; -// } else { -// // 4. update MinMax by traversing points fallen within this bucket -// ByteBuffer valueBuffer = pageReader.valueBuffer; -// double v = valueBuffer.getDouble(pageReader.timeBufferLength + i * 8); -// if (firstTime < 0) { -// firstTime = timestamp; -// firstValue = v; -// } -// lastTime = timestamp; -// lastValue = v; -// if (v < minVal) { -// minVal = v; -// bottomTime = timestamp; -// } -// if (v > maxVal) { -// maxVal = v; -// topTime = timestamp; -// } -// } -// } -// // clear for heap space -// if (i >= count) { -// // 代表这个chunk已经读完了,后面的bucket不会再用到,所以现在就可以清空内存的page -// // 而不是等到下一个bucket的时候再清空,因为有可能currentChunkList里chunks太多,page点同时存在太多,heap space不够 -// chunkSuit4Tri.pageReader = null; -// } -// // 4. update MinMax by traversing points fallen within this bucket -// if (topTime >= 0) { -// // min_value(s0), max_value(s0),min_time(s0), max_time(s0), -// first_value(s0),last_value(s0) -// // update min_time -// results -// .get(2) -// .updateResultUsingValues(new long[] {firstTime}, 1, new Object[] {firstValue}); -// // update first_value -// results -// .get(4) -// .updateResultUsingValues(new long[] {firstTime}, 1, new Object[] {firstValue}); -// // update max_time -// results -// .get(3) -// .updateResultUsingValues(new long[] {lastTime}, 1, new Object[] {lastValue}); -// // update last_value -// results -// .get(5) -// .updateResultUsingValues(new long[] {lastTime}, 1, new Object[] {lastValue}); -// // update BP -// MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); -// minValueAggrResult.updateResult(new MinMaxInfo<>(minVal, bottomTime)); -// // update TP -// MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) results.get(1); -// maxValueAggrResult.updateResult(new MinMaxInfo<>(maxVal, topTime)); -// } -// } -// } -// } -// -// public boolean canUseStatistics(ChunkSuit4Tri chunkSuit4Tri, long curStartTime, long curEndTime) -// { -// return false; -// // long minT = chunkSuit4Tri.chunkMetadata.getStartTime(); -// // long maxT = chunkSuit4Tri.chunkMetadata.getEndTime(); -// // return minT >= curStartTime && maxT < curEndTime; -// } -// -// @Override -// public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime) -// throws IOException { -// throw new IOException("no implemented"); -// } -// -// @Override -// public List<AggregateResult> calcResult(long curStartTime, long curEndTime) -// throws IOException, QueryProcessException { -// throw new IOException("no implemented"); -// } -// } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java index 6020b07a7a6..6d150e48976 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java @@ -73,7 +73,7 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { private Filter timeFilter; - private final int N1; // 分桶数 + private final int N1; public LocalGroupByExecutorTri_MinMax( PartialPath path, @@ -111,9 +111,8 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { long startTime = groupByFilter.getStartTime(); long endTime = groupByFilter.getEndTime(); long interval = groupByFilter.getInterval(); - N1 = (int) Math.floor((endTime * 1.0 - startTime) / interval); // 分桶数 + N1 = (int) Math.floor((endTime * 1.0 - startTime) / interval); - // unpackAllOverlappedFilesToTimeSeriesMetadata try { // : this might be bad to load all chunk metadata at first futureChunkList.addAll(seriesReader.getAllChunkMetadatas4Tri()); @@ -273,9 +272,6 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { public List<AggregateResult> calcResult( long curStartTime, long curEndTime, long startTime, long endTime, long interval) throws IOException { - // 这里用calcResult一次返回所有buckets结果(可以把MinValueAggrResult的value设为string类型, - // 那就把所有buckets结果作为一个string返回。这样的话返回的[t]是没有意义的,只取valueString) - // 而不是像MinMax那样在nextWithoutConstraintTri_MinMax()里调用calcResult每次计算一个bucket StringBuilder series = new StringBuilder(); // clear result cache @@ -283,7 +279,6 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { result.reset(); } - // 全局首点(对于MinMax来说全局首尾点只是输出不会影响到其它桶的采点) series.append(CONFIG.getP1v()).append("[").append(CONFIG.getP1t()).append("]").append(","); // Assume no empty buckets @@ -294,7 +289,6 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { getCurrentChunkListFromFutureChunkList(localCurStartTime, localCurEndTime); if (currentChunkList.size() == 0) { - // System.out.println("MinMax empty currentChunkList"); // TODO debug series .append("null") .append("[") @@ -312,7 +306,6 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { calculateMinMax(currentChunkList, localCurStartTime, localCurEndTime, series); } - // 全局尾点 series.append(CONFIG.getPnv()).append("[").append(CONFIG.getPnt()).append("]").append(","); MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); @@ -395,15 +388,8 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { } } } - // // clear for heap space - // if (i >= count) { - // // 代表这个chunk已经读完了,后面的bucket不会再用到,所以现在就可以清空内存的page - // // 而不是等到下一个bucket的时候再清空,因为有可能currentChunkList里chunks太多,page点同时存在太多,heap space不够 - // chunkSuit4Tri.pageReader = null; - // } } } - // 记录结果 if (topTime >= 0) { series .append(minValue) @@ -434,10 +420,6 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { public boolean canUseStatistics(ChunkSuit4Tri chunkSuit4Tri, long curStartTime, long curEndTime) { return false; - // long TP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getTopTimestamp(); - // long BP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getBottomTimestamp(); - // return TP_t >= curStartTime && TP_t < curEndTime && BP_t >= curStartTime && BP_t < - // curEndTime; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMaxPreselection.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMaxPreselection.java index 555e775fd67..655a0ca3d88 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMaxPreselection.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMaxPreselection.java @@ -102,7 +102,6 @@ public class LocalGroupByExecutorTri_MinMaxPreselection implements GroupByExecut fileFilter, ascending); - // unpackAllOverlappedFilesToTimeSeriesMetadata try { // : this might be bad to load all chunk metadata at first futureChunkList.addAll(seriesReader.getAllChunkMetadatas4Tri()); @@ -267,18 +266,13 @@ public class LocalGroupByExecutorTri_MinMaxPreselection implements GroupByExecut result.reset(); } - // long start = System.nanoTime(); getCurrentChunkListFromFutureChunkList(curStartTime, curEndTime); - // IOMonitor2.addMeasure(Operation.M4_LSM_MERGE_M4_TIME_SPAN, System.nanoTime() - start); if (currentChunkList.size() == 0) { - // System.out.println("MinMax empty currentChunkList"); // TODO debug return results; } - // start = System.nanoTime(); calculateMinMax(currentChunkList, curStartTime, curEndTime); - // IOMonitor2.addMeasure(Operation.M4_LSM_FP, System.nanoTime() - start); return results; } @@ -351,8 +345,6 @@ public class LocalGroupByExecutorTri_MinMaxPreselection implements GroupByExecut } // clear for heap space if (i >= count) { - // 代表这个chunk已经读完了,后面的bucket不会再用到,所以现在就可以清空内存的page - // 而不是等到下一个bucket的时候再清空,因为有可能currentChunkList里chunks太多,page点同时存在太多,heap space不够 chunkSuit4Tri.pageReader = null; } // 4. update MinMax by traversing points fallen within this bucket @@ -370,10 +362,6 @@ public class LocalGroupByExecutorTri_MinMaxPreselection implements GroupByExecut public boolean canUseStatistics(ChunkSuit4Tri chunkSuit4Tri, long curStartTime, long curEndTime) { return false; - // long TP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getTopTimestamp(); - // long BP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getBottomTimestamp(); - // return TP_t >= curStartTime && TP_t < curEndTime && BP_t >= curStartTime && BP_t < - // curEndTime; } @Override diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ILTS.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ILTS.java index 5f8dfde95ba..a8895c2457d 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ILTS.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ILTS.java @@ -43,11 +43,6 @@ import static org.junit.Assert.fail; public class MyTest_ILTS { /* - * Sql format: SELECT min_value(s0) FROM root.vehicle.d0 group by ([2,102),20ms) - * not real min_value here, actually controlled by enableTri="ILTS" - * 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) - * 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 - * * Requirements: * (1) Don't change the sequence of the above two aggregates * (2) Assume each chunk has only one page. @@ -78,7 +73,6 @@ public class MyTest_ILTS { config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION); config.setEnableTri("ILTS"); - // config.setNumIterations(4); config.setAcc_avg(true); config.setAcc_rectangle(true); config.setAcc_convex(false); @@ -353,64 +347,4 @@ public class MyTest_ILTS { e.printStackTrace(); } } - - // @Test - // public void test3() { - // prepareData3(); - // config.setNumIterations(1); // result equals LTTB - // String res = "5.0[1],10.0[2],2.0[40],15.0[60],18.0[70],1.0[90],7.0[102],"; - // try (Connection connection = - // DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); - // Statement statement = connection.createStatement()) { - // boolean hasResultSet = - // statement.execute( - // "SELECT min_value(s0)" - // // TODO not real min_value here, actually controlled by enableTri - // + ",max_value(s0),min_time(s0), max_time(s0), first_value(s0), last_value(s0)" - // + " FROM root.vehicle.d0 group by ([1,100),50ms)"); - // // (102-2)/(7-2)=20ms - // // note keep no empty buckets - // - // Assert.assertTrue(hasResultSet); - // try (ResultSet resultSet = statement.getResultSet()) { - // int i = 0; - // while (resultSet.next()) { - // String ans = resultSet.getString(2); - // System.out.println(ans); - //// Assert.assertEquals(res, ans); - // } - // } - // } catch (Exception e) { - // e.printStackTrace(); - // fail(e.getMessage()); - // } - // } - // - // private static void prepareData3() { - // try (Connection connection = - // DriverManager.getConnection( - // Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); - // Statement statement = connection.createStatement()) { - // - // for (String sql : creationSqls) { - // statement.execute(sql); - // } - // - // config.setP1t(0); - // config.setP1v(0); - // config.setPnt(100); - // config.setPnv(100); - // - // for (int i = 0; i < 100; i++) { - // // linear - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, i + 1, i + 1.0)); - // if ((i + 1) % 4 == 0) { - // statement.execute("FLUSH"); - // } - // } - // statement.execute("FLUSH"); - // } catch (Exception e) { - // e.printStackTrace(); - // } - // } } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_LTTB.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_LTTB.java index 542a9037c27..811f8af2ce6 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_LTTB.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_LTTB.java @@ -42,10 +42,6 @@ import static org.junit.Assert.fail; public class MyTest_LTTB { /* - * Sql format: SELECT min_value(s0) FROM root.vehicle.d0 group by ([2,106),26ms) - * not real min_value here, actually controlled by enableTri="LTTB" - * 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) - * 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 * Requirements: * (1) Don't change the sequence of the above two aggregates * (2) Assume each chunk has only one page. @@ -106,7 +102,6 @@ public class MyTest_LTTB { boolean hasResultSet = statement.execute( "SELECT min_value(s0)" - // TODO not real min_value here, actually controlled by enableTri + ",max_value(s0),min_time(s0), max_time(s0), first_value(s0), last_value(s0)" + " FROM root.vehicle.d0 group by ([2,106),26ms)"); @@ -170,7 +165,6 @@ public class MyTest_LTTB { boolean hasResultSet = statement.execute( "SELECT min_value(s0)" - // TODO not real min_value here, actually controlled by enableTri + ",max_value(s0),min_time(s0), max_time(s0), first_value(s0), last_value(s0)" + " FROM root.vehicle.d0 group by ([100,2100),250ms)"); // (tn-t2)/(nout-2)=(2100-100)/(10-2)=2000/8=250 diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_M4.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_M4.java index 9fe7855351d..96edde2c040 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_M4.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_M4.java @@ -43,11 +43,6 @@ import static org.junit.Assert.fail; public class MyTest_M4 { /* - * Sql format: SELECT min_value(s0), max_value(s0),min_time(s0), max_time(s0), first_value(s0), - * last_value(s0) FROM root.vehicle.d0 group by ([0,100),25ms) - * enableTri="M4" - * 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) - * 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 * Requirements: * (1) Don't change the sequence of the above two aggregates * (2) Assume each chunk has only one page. @@ -73,23 +68,15 @@ public class MyTest_M4 { @Before public void setUp() throws Exception { TSFileDescriptor.getInstance().getConfig().setTimeEncoder("PLAIN"); - // originalCompactionStrategy = config.getCompactionStrategy(); config.setTimestampPrecision("ms"); config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION); config.setEnableTri("M4"); - // 对于M4来说全局首尾点只是输出不会影响到其它桶的采点 config.setP1t(0); config.setP1v(0); config.setPnt(200); config.setPnv(200); - // 但是如果走的是unpackOneChunkMetaData(firstChunkMetadata)就没问题, - // 因为它直接用chunk元数据去构造pageReader, - // 但是如果走的是传统聚合类型->seriesAggregateReader->seriesReader->hasNextOverlappedPage里 - // cachedBatchData = BatchDataFactory.createBatchData(dataType, orderUtils.getAscending(), true) - // 这个路径就错了,把聚合类型赋给batchData了。所以这个LocalGroupByExecutor bug得在有overlap数据的时候才能复现 - // (那刚好我本文数据都不会有Overlap,可以用LocalGroupByExecutor来得到正确结果) config.setEnableCPV(false); TSFileDescriptor.getInstance().getConfig().setEnableMinMaxLSM(false); TSFileDescriptor.getInstance().getConfig().setUseStatistics(false); @@ -101,7 +88,6 @@ public class MyTest_M4 { @After public void tearDown() throws Exception { EnvironmentUtils.cleanEnv(); - // config.setCompactionStrategy(originalCompactionStrategy); } @Test @@ -116,21 +102,16 @@ public class MyTest_M4 { boolean hasResultSet = statement.execute( "SELECT min_value(s0), max_value(s0),min_time(s0), max_time(s0), first_value(s0), last_value(s0)" - // do not change sequence + " FROM root.vehicle.d0 group by ([0,100),25ms)"); - // 注意需要第一项是min_value因为以后会用到record.addField(series, - // TSDataType.MIN_MAX_INT64)把所有序列组装成string放在第一行第二列里 Assert.assertTrue(hasResultSet); try (ResultSet resultSet = statement.getResultSet()) { int i = 0; while (resultSet.next()) { - // 注意从1开始编号,所以第一列是无意义时间戳 String ans = resultSet.getString(2); System.out.println(ans); Assert.assertEquals(res, ans); } } - // System.out.println(((IoTDBStatement) statement).executeFinish()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java index b6bcc0c6a0e..e09c3b0520e 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java @@ -42,8 +42,6 @@ import static org.junit.Assert.fail; public class MyTest_MinMax { /* - * Sql format: SELECT min_value(s0), max_value(s0) ROM root.xx group by ([tqs,tqe),IntervalLength). - * enableTri="MinMax" * Requirements: * (1) Don't change the sequence of the above two aggregates * (2) Assume each chunk has only one page. @@ -69,23 +67,15 @@ public class MyTest_MinMax { @Before public void setUp() throws Exception { TSFileDescriptor.getInstance().getConfig().setTimeEncoder("PLAIN"); - // originalCompactionStrategy = config.getCompactionStrategy(); config.setTimestampPrecision("ms"); config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION); config.setEnableTri("MinMax"); - // 对于MinMax来说全局首尾点只是输出不会影响到其它桶的采点 config.setP1t(0); config.setP1v(0); config.setPnt(200); config.setPnv(200); - // 但是如果走的是unpackOneChunkMetaData(firstChunkMetadata)就没问题, - // 因为它直接用chunk元数据去构造pageReader, - // 但是如果走的是传统聚合类型->seriesAggregateReader->seriesReader->hasNextOverlappedPage里 - // cachedBatchData = BatchDataFactory.createBatchData(dataType, orderUtils.getAscending(), true) - // 这个路径就错了,把聚合类型赋给batchData了。所以这个LocalGroupByExecutor bug得在有overlap数据的时候才能复现 - // (那刚好我本文数据都不会有Overlap,可以用LocalGroupByExecutor来得到正确结果) config.setEnableCPV(false); TSFileDescriptor.getInstance().getConfig().setEnableMinMaxLSM(false); TSFileDescriptor.getInstance().getConfig().setUseStatistics(false); @@ -97,7 +87,6 @@ public class MyTest_MinMax { @After public void tearDown() throws Exception { EnvironmentUtils.cleanEnv(); - // config.setCompactionStrategy(originalCompactionStrategy); } @Test @@ -118,13 +107,11 @@ public class MyTest_MinMax { try (ResultSet resultSet = statement.getResultSet()) { int i = 0; while (resultSet.next()) { - // 注意从1开始编号,所以第一列是无意义时间戳 String ans = resultSet.getString(2); System.out.println(ans); Assert.assertEquals(res, ans); } } - // System.out.println(((IoTDBStatement) statement).executeFinish()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMaxLTTB.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMaxLTTB.java index 160df95c60d..62b2f225b17 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMaxLTTB.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMaxLTTB.java @@ -42,8 +42,6 @@ import static org.junit.Assert.fail; public class MyTest_MinMaxLTTB { /* - * Sql format: SELECT min_value(s0), max_value(s0) FROM root.vehicle.d0 group by ([100,2100),250ms) - * enableTri="MinMaxLTTB" * Requirements: * (1) Don't change the sequence of the above two aggregates * (2) Assume each chunk has only one page. @@ -116,13 +114,11 @@ public class MyTest_MinMaxLTTB { try (ResultSet resultSet = statement.getResultSet()) { int i = 0; while (resultSet.next()) { - // 注意从1开始编号,所以第一列是无意义时间戳 String ans = resultSet.getString(2); System.out.println(ans); Assert.assertEquals(res, ans); } } - // System.out.println(((IoTDBStatement) statement).executeFinish()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java index 2b9fe6e2399..8715f603829 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java @@ -230,7 +230,6 @@ public abstract class Statistics<T> { quickHullLearned = true; quickHullPoints = null; } - // System.out.println("debug:::::" + quickHullBitSet); // write bitset int byteLen = 0; @@ -239,7 +238,6 @@ public abstract class Statistics<T> { oos.writeObject(quickHullBitSet); oos.flush(); byte[] bytes = baos.toByteArray(); - // 参考serializeBloomFilter byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(bytes.length, outputStream); outputStream.write(bytes); byteLen += bytes.length; @@ -329,7 +327,6 @@ public abstract class Statistics<T> { // this.stepRegress = stats.stepRegress; // this.valueIndex = stats.valueIndex; - // 注意这里要把points也一并赋值,因为当chunk里只有一个page的时候只serialize chunk的metadata this.quickHullPoints = stats.quickHullPoints; this.quickHullBitSet = stats.quickHullBitSet; this.quickHullIdx = stats.quickHullIdx;