This is an automated email from the ASF dual-hosted git repository.
spricoder pushed a commit to branch research/timestamp-regulation
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/research/timestamp-regulation
by this push:
new 057756a7146 Create new timestamp-repair and data quality UDF (#14786)
057756a7146 is described below
commit 057756a71460a37d868187051aabb2f7dafeb619
Author: Beasttt <[email protected]>
AuthorDate: Tue Feb 4 15:34:34 2025 +0800
Create new timestamp-repair and data quality UDF (#14786)
* Create new timestamp-repair and data quality UDF
* Create new timestamp-repair and data quality UDF v2
* change only one parameter name
---
.../iotdb/library/dquality/UDTFCompleteness.java | 55 +++--
.../iotdb/library/dquality/UDTFConsistency.java | 50 ++---
.../iotdb/library/dquality/UDTFTimeliness.java | 52 ++---
.../dquality/util/SubsequenceRowIterator.java | 58 +++++
.../dquality/util/TimeSeriesSegQuality.java | 171 ++++++++++++++
.../library/dquality/util/TimeSeriesSegment.java | 205 +++++++++++++++++
.../iotdb/library/drepair/UDTFTimestampRepair.java | 94 ++++----
.../library/drepair/util/TimesegmentRepair.java | 249 +++++++++++++++++++++
8 files changed, 796 insertions(+), 138 deletions(-)
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFCompleteness.java
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFCompleteness.java
index 8b349240386..e4257cb5f07 100644
---
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFCompleteness.java
+++
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFCompleteness.java
@@ -1,25 +1,8 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
package org.apache.iotdb.library.dquality;
import org.apache.iotdb.library.dquality.util.TimeSeriesQuality;
+import org.apache.iotdb.library.dquality.util.TimeSeriesSegQuality;
+import org.apache.iotdb.library.util.NoNumberException;
import org.apache.iotdb.library.util.Util;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.RowWindow;
@@ -34,18 +17,19 @@ import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
-/** This function calculates completeness of input series. */
public class UDTFCompleteness implements UDTF {
-
+ Boolean segmentation;
private boolean downtime;
@Override
public void beforeStart(UDFParameters udfp, UDTFConfigurations udtfc) throws
Exception {
boolean isTime = false;
long window = Integer.MAX_VALUE;
+
+ segmentation = udfp.getBooleanOrDefault("Segment", false);
if (udfp.hasAttribute("window")) {
String s = udfp.getString("window");
- window = Util.parseTime(s, udfp);
+ window = Util.parseTime(s);
if (window > 0) {
isTime = true;
} else {
@@ -63,15 +47,26 @@ public class UDTFCompleteness implements UDTF {
@Override
public void transform(RowWindow rowWindow, PointCollector collector) throws
Exception {
- try {
- if (rowWindow.windowSize() > TimeSeriesQuality.WINDOW_SIZE) {
- TimeSeriesQuality tsq = new
TimeSeriesQuality(rowWindow.getRowIterator());
- tsq.setDowntime(downtime);
- tsq.timeDetect();
- collector.putDouble(rowWindow.getRow(0).getTime(),
tsq.getCompleteness());
+ if (segmentation) {
+ try {
+ if (rowWindow.windowSize() > TimeSeriesSegQuality.WINDOW_SIZE) {
+ TimeSeriesSegQuality tsq = new
TimeSeriesSegQuality(rowWindow.getRowIterator(), 0);
+ collector.putDouble(rowWindow.getRow(0).getTime(), tsq.getAnswer());
+ }
+ } catch (IOException | NoNumberException ex) {
+ Logger.getLogger(UDTFCompleteness.class.getName()).log(Level.SEVERE,
null, ex);
+ }
+ } else {
+ try {
+ if (rowWindow.windowSize() > TimeSeriesQuality.WINDOW_SIZE) {
+ TimeSeriesQuality tsq = new
TimeSeriesQuality(rowWindow.getRowIterator());
+ tsq.setDowntime(downtime);
+ tsq.timeDetect();
+ collector.putDouble(rowWindow.getRow(0).getTime(),
tsq.getCompleteness());
+ }
+ } catch (IOException | NoNumberException ex) {
+ Logger.getLogger(UDTFCompleteness.class.getName()).log(Level.SEVERE,
null, ex);
}
- } catch (IOException ex) {
- Logger.getLogger(UDTFCompleteness.class.getName()).log(Level.SEVERE,
null, ex);
}
}
}
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFConsistency.java
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFConsistency.java
index 3178d33343c..07b121ac0d4 100644
---
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFConsistency.java
+++
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFConsistency.java
@@ -1,25 +1,7 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
package org.apache.iotdb.library.dquality;
import org.apache.iotdb.library.dquality.util.TimeSeriesQuality;
+import org.apache.iotdb.library.dquality.util.TimeSeriesSegQuality;
import org.apache.iotdb.library.util.NoNumberException;
import org.apache.iotdb.library.util.Util;
import org.apache.iotdb.udf.api.UDTF;
@@ -35,16 +17,17 @@ import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
-/** This function calculates consistency of input series. */
public class UDTFConsistency implements UDTF {
+ Boolean segmentation;
@Override
public void beforeStart(UDFParameters udfp, UDTFConfigurations udtfc) throws
Exception {
boolean isTime = false;
long window = Integer.MAX_VALUE;
+ segmentation = udfp.getBooleanOrDefault("Segment", false);
if (udfp.hasAttribute("window")) {
String s = udfp.getString("window");
- window = Util.parseTime(s, udfp);
+ window = Util.parseTime(s);
if (window > 0) {
isTime = true;
} else {
@@ -61,14 +44,25 @@ public class UDTFConsistency implements UDTF {
@Override
public void transform(RowWindow rowWindow, PointCollector collector) throws
Exception {
- try {
- if (rowWindow.windowSize() > TimeSeriesQuality.WINDOW_SIZE) {
- TimeSeriesQuality tsq = new
TimeSeriesQuality(rowWindow.getRowIterator());
- tsq.timeDetect();
- collector.putDouble(rowWindow.getRow(0).getTime(),
tsq.getConsistency());
+ if (segmentation) {
+ try {
+ if (rowWindow.windowSize() > TimeSeriesSegQuality.WINDOW_SIZE) {
+ TimeSeriesSegQuality tsq = new
TimeSeriesSegQuality(rowWindow.getRowIterator(), 1);
+ collector.putDouble(rowWindow.getRow(0).getTime(), tsq.getAnswer());
+ }
+ } catch (IOException | NoNumberException ex) {
+ Logger.getLogger(UDTFConsistency.class.getName()).log(Level.SEVERE,
null, ex);
+ }
+ } else {
+ try {
+ if (rowWindow.windowSize() > TimeSeriesQuality.WINDOW_SIZE) {
+ TimeSeriesQuality tsq = new
TimeSeriesQuality(rowWindow.getRowIterator());
+ tsq.timeDetect();
+ collector.putDouble(rowWindow.getRow(0).getTime(),
tsq.getConsistency());
+ }
+ } catch (IOException | NoNumberException ex) {
+ Logger.getLogger(UDTFConsistency.class.getName()).log(Level.SEVERE,
null, ex);
}
- } catch (IOException | NoNumberException ex) {
- Logger.getLogger(UDTFCompleteness.class.getName()).log(Level.SEVERE,
null, ex);
}
}
}
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFTimeliness.java
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFTimeliness.java
index 7a5fc98e182..b7ef9866d2f 100644
---
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFTimeliness.java
+++
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFTimeliness.java
@@ -1,25 +1,7 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
package org.apache.iotdb.library.dquality;
import org.apache.iotdb.library.dquality.util.TimeSeriesQuality;
+import org.apache.iotdb.library.dquality.util.TimeSeriesSegQuality;
import org.apache.iotdb.library.util.NoNumberException;
import org.apache.iotdb.library.util.Util;
import org.apache.iotdb.udf.api.UDTF;
@@ -35,16 +17,19 @@ import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
-/** This function calculates timeliness of input series. */
public class UDTFTimeliness implements UDTF {
+ Boolean segmentation;
+ long[] timestamp;
+ double[] value;
@Override
public void beforeStart(UDFParameters udfp, UDTFConfigurations udtfc) throws
Exception {
boolean isTime = false;
long window = Integer.MAX_VALUE;
+ segmentation = udfp.getBooleanOrDefault("Segment", false);
if (udfp.hasAttribute("window")) {
String s = udfp.getString("window");
- window = Util.parseTime(s, udfp);
+ window = Util.parseTime(s);
if (window > 0) {
isTime = true;
} else {
@@ -61,14 +46,25 @@ public class UDTFTimeliness implements UDTF {
@Override
public void transform(RowWindow rowWindow, PointCollector collector) throws
Exception {
- try {
- if (rowWindow.windowSize() > TimeSeriesQuality.WINDOW_SIZE) {
- TimeSeriesQuality tsq = new
TimeSeriesQuality(rowWindow.getRowIterator());
- tsq.timeDetect();
- collector.putDouble(rowWindow.getRow(0).getTime(),
tsq.getTimeliness());
+ if (segmentation) {
+ try {
+ if (rowWindow.windowSize() > TimeSeriesSegQuality.WINDOW_SIZE) {
+ TimeSeriesSegQuality tsq = new
TimeSeriesSegQuality(rowWindow.getRowIterator(), 2);
+ collector.putDouble(rowWindow.getRow(0).getTime(), tsq.getAnswer());
+ }
+ } catch (IOException | NoNumberException ex) {
+ Logger.getLogger(UDTFTimeliness.class.getName()).log(Level.SEVERE,
null, ex);
+ }
+ } else {
+ try {
+ if (rowWindow.windowSize() > TimeSeriesQuality.WINDOW_SIZE) {
+ TimeSeriesQuality tsq = new
TimeSeriesQuality(rowWindow.getRowIterator());
+ tsq.timeDetect();
+ collector.putDouble(rowWindow.getRow(0).getTime(),
tsq.getTimeliness());
+ }
+ } catch (IOException | NoNumberException ex) {
+ Logger.getLogger(UDTFTimeliness.class.getName()).log(Level.SEVERE,
null, ex);
}
- } catch (IOException | NoNumberException ex) {
- Logger.getLogger(UDTFCompleteness.class.getName()).log(Level.SEVERE,
null, ex);
}
}
}
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/SubsequenceRowIterator.java
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/SubsequenceRowIterator.java
new file mode 100644
index 00000000000..85be05d5a7b
--- /dev/null
+++
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/SubsequenceRowIterator.java
@@ -0,0 +1,58 @@
+package org.apache.iotdb.library.dquality.util;
+
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowIterator;
+
+import java.io.IOException;
+
+public class SubsequenceRowIterator implements RowIterator {
+ private RowIterator originalIterator = null;
+ private long start = 0;
+ private long end = 0;
+ private int currentIndex;
+ private boolean resetFlag;
+
+ public SubsequenceRowIterator(RowIterator originalIterator, long start, long
end) {
+ this.originalIterator = originalIterator;
+ this.start = start;
+ this.end = end;
+ this.currentIndex = -1;
+ this.resetFlag = true;
+ reset();
+ }
+
+ @Override
+ public boolean hasNextRow() {
+ return currentIndex < end && originalIterator.hasNextRow();
+ }
+
+ @Override
+ public Row next() throws IOException {
+ currentIndex++;
+ if (currentIndex < start) {
+ while (currentIndex < start && originalIterator.hasNextRow()) {
+ originalIterator.next();
+ currentIndex++;
+ }
+ }
+ if (currentIndex >= start && currentIndex <= end) {
+ return originalIterator.next();
+ }
+ throw new IOException("No more rows in the specified range");
+ }
+
+ @Override
+ public void reset() {
+ originalIterator.reset();
+ currentIndex = -1;
+ resetFlag = true;
+ while (resetFlag && currentIndex < start - 1) {
+ try {
+ originalIterator.next();
+ currentIndex++;
+ } catch (IOException e) {
+ resetFlag = false;
+ }
+ }
+ }
+}
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/TimeSeriesSegQuality.java
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/TimeSeriesSegQuality.java
new file mode 100644
index 00000000000..69f8554b758
--- /dev/null
+++
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/TimeSeriesSegQuality.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.dquality.util;
+
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowIterator;
+
+import org.apache.commons.math3.stat.descriptive.rank.Median;
+
+import java.util.ArrayList;
+
+/** Class for computing data quality index. */
+public class TimeSeriesSegQuality {
+ public static final int WINDOW_SIZE = 10;
+ private boolean downtime = false; // count for shutdown period
+ private int cnt = 0; // total number of points
+ private int missCnt = 0; // number of missing points
+ private int specialCnt = 0; // number of special values
+ private int lateCnt = 0; // number of latency points
+ private int redundancyCnt = 0; // number of redundancy points
+ private int valueCnt = 0; // number of out of range points
+ private int variationCnt = 0; // number of variation out of range points
+ private int speedCnt = 0; // number of speed out of range points
+ private int speedchangeCnt = 0; // number of speed change(acceleration) out
of range points
+ protected double answer;
+ private double[] time; // series without special values
+
+ public TimeSeriesSegQuality(RowIterator dataIterator, int mode) throws
Exception {
+ TimeSeriesSegment tseg = new TimeSeriesSegment(dataIterator);
+ tseg.exactRepair();
+ long[] starts = tseg.getStart();
+ long[] ends = tseg.getEnd();
+ answer = 0;
+ specialCnt = 0;
+ missCnt = 0;
+ specialCnt = 0;
+ lateCnt = 0;
+ redundancyCnt = 0;
+ cnt = 0;
+ valueCnt = 0;
+ variationCnt = 0;
+ speedCnt = 0;
+ speedchangeCnt = 0;
+ for (int i = 0; i < starts.length; i++) {
+ RowIterator subIterator = new SubsequenceRowIterator(dataIterator,
starts[i], ends[i]);
+ ArrayList<Double> timeList = new ArrayList<>();
+ while (subIterator.hasNextRow()) {
+ Row row = subIterator.next();
+ cnt++;
+ double v = Util.getValueAsDouble(row);
+ double t = row.getTime();
+ if (Double.isFinite(v)) {
+ timeList.add(t);
+ } else { // processing NAN,INF
+ specialCnt++;
+ timeList.add(t);
+ }
+ }
+ time = Util.toDoubleArray(timeList);
+ timeList.clear();
+ timeDetect();
+ // starttimestamp[i] = starts[i];
+ }
+ if (mode == 0) {
+ answer = getCompleteness();
+ } else if (mode == 1) {
+ answer = getConsistency();
+ } else {
+ answer = getTimeliness();
+ }
+ }
+
+ /** Detect timestamp errors. */
+ public void timeDetect() {
+ // compute interval properties
+ double[] interval = Util.variation(time);
+ Median median = new Median();
+ double base = median.evaluate(interval);
+ // find timestamp anomalies
+ ArrayList<Double> window = new ArrayList<>();
+ int i;
+ for (i = 0; i < Math.min(time.length, WINDOW_SIZE); i++) { // fill initial
data
+ window.add(time[i]);
+ }
+ while (window.size() > 1) {
+ double times = (window.get(1) - window.get(0)) / base;
+ if (times <= 0.5) { // delete over-concentrated points
+ window.remove(1);
+ redundancyCnt++;
+ } else if (times > 1.0 && (!downtime || times <= 9.0)) { // exclude
power-off periods
+ // large interval means missing or delaying
+ int temp = 0; // find number of over-concentrated points in the
following window
+ for (int j = 2; j < window.size(); j++) {
+ double times2 = (window.get(j) - window.get(j - 1)) / base;
+ if (times2 >= 2.0) { // end searching when another missing is found
+ break;
+ }
+ if (times2 < 1.0) { // over-concentrated points founded, maybe
caused by delaying
+ temp++;
+ window.remove(j); // move delayed points
+ j--;
+ if (temp == (int) Math.round(times - 1)) {
+ break; // enough points to fill have been found
+ }
+ }
+ }
+ lateCnt += temp;
+ missCnt += (Math.round(times - 1) - temp);
+ }
+ window.remove(0); // remove processed points
+ while (window.size() < WINDOW_SIZE && i < time.length) {
+ // fill into the window
+ window.add(time[i]);
+ i++;
+ }
+ }
+ window.clear();
+ }
+
+ public double getCompleteness() {
+ return 1 - (missCnt + specialCnt) * 1.0 / (cnt + missCnt);
+ }
+
+ public double getConsistency() {
+ return 1 - redundancyCnt * 1.0 / cnt;
+ }
+
+ public double getTimeliness() {
+ return 1 - lateCnt * 1.0 / cnt;
+ }
+
+ public double getValidity() {
+ return 1 - (valueCnt + variationCnt + speedCnt + speedchangeCnt) * 0.25 /
cnt;
+ }
+
+ /**
+ * @return the downtime
+ */
+ public boolean isDowntime() {
+ return downtime;
+ }
+
+ /**
+ * @param downtime the downtime to set
+ */
+ public void setDowntime(boolean downtime) {
+ this.downtime = downtime;
+ }
+
+ public double getAnswer() {
+ return answer;
+ }
+}
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/TimeSeriesSegment.java
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/TimeSeriesSegment.java
new file mode 100644
index 00000000000..49926faead5
--- /dev/null
+++
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/TimeSeriesSegment.java
@@ -0,0 +1,205 @@
+package org.apache.iotdb.library.dquality.util;
+
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowIterator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TimeSeriesSegment {
+ protected int n;
+ protected long[] time;
+ protected double lmdA = 100000.0; // Assign appropriate value for lmdA
+ protected double lmdD = 100000.0; // Assign appropriate value for lmdD
+ protected double mate = 100000.0; // Assign appropriate value for mate
+ protected long[] starts;
+ protected long[] ends;
+
+ public TimeSeriesSegment(RowIterator dataIterator) throws Exception {
+ ArrayList<Long> timeList = new ArrayList<>();
+ while (dataIterator.hasNextRow()) {
+ Row row = dataIterator.next();
+ timeList.add(row.getTime());
+ }
+ time = Util.toLongArray(timeList);
+ n = time.length;
+ timeList.clear();
+ }
+
+ public List<Map.Entry<Double, Long>> modeIntervalGranularity(List<Double>
value) {
+ Map<Double, Long> counter =
+ value.stream().collect(Collectors.groupingBy(e -> e,
Collectors.counting()));
+ return counter.entrySet().stream()
+ .sorted(Map.Entry.<Double, Long>comparingByValue().reversed())
+ .collect(Collectors.toList());
+ }
+
+ public double move(int i, int j, double interval, int s0) {
+ double tLen = time[j] - time[s0];
+ double sLen = i * interval;
+ double m = Math.abs(tLen - sLen) / interval;
+ return (m == 0) ? mate : -1 * m;
+ }
+
+ public Map<String, Object> scoreMatrix(double epsT, int k) {
+ int sNum = (int) Math.round((time[time.length - 1] - time[0]) / epsT + 1);
+ double[][] dp = new double[sNum][time.length];
+ int[][] st = new int[sNum][time.length];
+ int[][] step = new int[sNum][time.length];
+ List<Integer> s0 = new ArrayList<>();
+ for (int i = 0; i < sNum; i++) {
+ for (int j = 0; j < time.length; j++) {
+ dp[i][j] = -100;
+ st[i][j] = 0;
+ }
+ }
+ for (int j = 0; j < time.length; j++) {
+ dp[0][j] = mate;
+ st[0][j] = j;
+ step[0][j] = 2;
+ if (j != 0 && Math.abs(time[j] - time[j - 1]) > (k * epsT)) {
+ s0.add(j);
+ }
+ }
+ for (int i = 1; i < sNum; i++) {
+ for (int j = 0; j < time.length; j++) {
+ if (j == 0) {
+ dp[i][0] = Math.round(dp[i - 1][0] - lmdA);
+ step[i][0] = 1;
+ continue;
+ }
+ Map<Integer, Integer> dic = new HashMap<>();
+ dic.put(0, st[i - 1][j - 1]);
+ dic.put(1, st[i - 1][j]);
+ dic.put(2, st[i][j - 1]);
+ if (s0.contains(j)) {
+ dp[i][j] = Math.round(dp[i - 1][j] - lmdA);
+ st[i][j] = st[i - 1][j];
+ } else {
+ double m = Math.round(move(i, j, epsT, st[i - 1][j - 1]));
+ double a = Math.round(dp[i - 1][j] - lmdA);
+ double d = Math.round(dp[i][j - 1] - lmdD);
+ double[] arr = {Math.round((dp[i - 1][j - 1] + m)), a, d};
+ dp[i][j] = Math.round(Arrays.stream(arr).max().orElse(0.0));
+
+ for (int kIndex = 0; kIndex < arr.length; kIndex++) {
+ if (arr[kIndex] == dp[i][j]) {
+ step[i][j] = kIndex;
+ break;
+ }
+ }
+ st[i][j] = dic.get(step[i][j]);
+ }
+ }
+ }
+ Map<String, Object> result = new HashMap<>();
+ result.put("dp", dp);
+ result.put("st", st);
+ result.put("sNum", sNum);
+ result.put("step", step);
+ s0.clear();
+ return result;
+ }
+
+ public void exactRepair() {
+ List<Double> epsList = new ArrayList<>();
+ for (int i = 1; i < time.length; i++) {
+ epsList.add((double) Math.round(time[i] - time[i - 1]));
+ }
+ List<Map.Entry<Double, Long>> interval = modeIntervalGranularity(epsList);
+ lmdA = interval.get(0).getKey();
+ lmdD = interval.get(0).getKey();
+ mate = interval.get(0).getKey() * 2;
+
+ double median =
+
epsList.stream().mapToDouble(Double::doubleValue).sorted().toArray()[epsList.size()
/ 2];
+ int tNum = (median == interval.get(0).getKey()) ? 1 : 3;
+
+ List<List<Double>> intervalList = new ArrayList<>();
+ double[][] allMatrix = null;
+ int[][] allStart = null;
+ int[][] allStep = null;
+ int i = 0;
+ for (int j = 0; j < tNum; j++) {
+ double epsT = interval.get(j).getKey();
+
+ if (epsT == 0) {
+ continue;
+ }
+ Map<String, Object> result = scoreMatrix(epsT, 10);
+ double[][] mt = (double[][]) result.get("dp");
+ int[][] st = (int[][]) result.get("st");
+ int m = (int) result.get("sNum");
+ int[][] step = (int[][]) result.get("step");
+ i = i + 1;
+ for (int k = 1; k <= m; k++) {
+ intervalList.add(Arrays.asList(epsT, (double) k));
+ }
+ if (i == 1) {
+ allMatrix = mt;
+ allStart = st;
+ allStep = step;
+ continue;
+ } else {
+ // Concatenate all_matrix and mt
+ double[][] concatenatedMatrix = new double[mt.length +
allMatrix.length][];
+ System.arraycopy(allMatrix, 0, concatenatedMatrix, 0,
allMatrix.length);
+ System.arraycopy(mt, 0, concatenatedMatrix, allMatrix.length,
mt.length);
+ allMatrix = concatenatedMatrix;
+ int[][] concatenate = new int[st.length + allStart.length][];
+ System.arraycopy(allStart, 0, concatenate, 0, allStart.length);
+ System.arraycopy(st, 0, concatenate, allStart.length, st.length);
+ allStart = concatenate;
+
+ int[][] concatenatestep = new int[step.length + allStep.length][];
+ System.arraycopy(allStep, 0, concatenatestep, 0, allStep.length);
+ System.arraycopy(step, 0, concatenatestep, allStep.length,
step.length);
+ allStep = concatenatestep;
+ }
+ }
+ section(allMatrix, allStart, allStep, intervalList);
+ }
+
+ public void section(
+ double[][] matrix, int[][] start, int[][] step, List<List<Double>>
intervalList) {
+ List<Integer> s0e = new ArrayList<>();
+ List<Integer> end = new ArrayList<>();
+ int j = time.length - 1;
+ while (j >= 0) {
+
+ double maxValue = Double.NEGATIVE_INFINITY;
+ int maxRowIndex = -1;
+
+ for (int rowIndex = 0; rowIndex < matrix.length; rowIndex++) {
+ if (matrix[rowIndex][j] > maxValue) {
+ maxValue = matrix[rowIndex][j];
+ maxRowIndex = rowIndex;
+ }
+ }
+ s0e.add(start[maxRowIndex][j]);
+ end.add(j);
+ j = start[maxRowIndex][j] - 1;
+ }
+ starts = new long[s0e.size()];
+ ends = new long[end.size()];
+ for (int i = 0; i < s0e.size(); i++) {
+ starts[i] = s0e.get(i);
+ ends[i] = end.get(i);
+ }
+ s0e.clear();
+ end.clear();
+ }
+
+ public long[] getStart() {
+ return starts;
+ }
+
+ public long[] getEnd() {
+ return ends;
+ }
+}
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFTimestampRepair.java
b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFTimestampRepair.java
index 2a605b9ab3a..520735709a6 100644
---
a/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFTimestampRepair.java
+++
b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFTimestampRepair.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.library.drepair;
+import org.apache.iotdb.library.drepair.util.TimesegmentRepair;
import org.apache.iotdb.library.drepair.util.TimestampRepair;
-import org.apache.iotdb.library.util.Util;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.RowWindow;
import org.apache.iotdb.udf.api.collector.PointCollector;
@@ -31,33 +31,24 @@ import
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrat
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.type.Type;
-/** This function is used for timestamp repair. */
public class UDTFTimestampRepair implements UDTF {
String intervalMethod;
- long interval;
- long intervalMode;
+ Boolean segmentation;
+ int interval;
+ int intervalMode;
+ int num_interval;
+ long[] timestamp;
+ double[] value;
@Override
public void validate(UDFParameterValidator validator) throws Exception {
validator
.validateInputSeriesNumber(1)
- .validateInputSeriesDataType(0, Type.DOUBLE, Type.FLOAT, Type.INT32,
Type.INT64);
-
- String intervalString =
validator.getParameters().getStringOrDefault("interval", null);
- if (intervalString != null) {
- try {
- interval = Long.parseLong(intervalString);
- } catch (NumberFormatException e) {
- try {
- interval = Util.parseTime(intervalString, validator.getParameters());
- } catch (Exception ex) {
- throw new UDFException("Invalid time format for interval.");
- }
- }
- validator.validate(
- x -> interval > 0,
- "Invalid time unit input. Supported units are ns, us, ms, s, m, h,
d.");
- }
+ .validateInputSeriesDataType(0, Type.DOUBLE, Type.FLOAT, Type.INT32,
Type.INT64)
+ .validate(
+ x -> (Integer) x >= 0,
+ "Interval should be a positive integer.",
+ validator.getParameters().getIntOrDefault("interval", 0));
}
@Override
@@ -66,39 +57,44 @@ public class UDTFTimestampRepair implements UDTF {
configurations
.setAccessStrategy(new
SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE))
.setOutputDataType(parameters.getDataType(0));
-
+ segmentation = parameters.getBooleanOrDefault("Segment", false);
intervalMethod = parameters.getStringOrDefault("method", "Median");
- String intervalString = parameters.getStringOrDefault("interval", null);
-
- if (intervalString != null) {
- try {
- interval = Long.parseLong(intervalString);
- } catch (NumberFormatException e) {
- interval = Util.parseTime(intervalString, parameters);
+ interval = parameters.getIntOrDefault("interval", 0);
+ num_interval = parameters.getIntOrDefault("num_interval", 1);
+ if (segmentation) {
+ if (num_interval > 10) {
+ throw new UDFException("The calculation interval is too large, causing
slow execution!");
+ } else if (num_interval <= 0) {
+ throw new UDFException("Error num_interval!");
}
} else {
- interval = 0;
- }
-
- if (interval > 0) {
- intervalMode = interval;
- } else if ("Median".equalsIgnoreCase(intervalMethod)) {
- intervalMode = -1L;
- } else if ("Mode".equalsIgnoreCase(intervalMethod)) {
- intervalMode = -2L;
- } else if ("Cluster".equalsIgnoreCase(intervalMethod)) {
- intervalMode = -3L;
- } else {
- throw new UDFException("Illegal method.");
+ if (interval > 0) {
+ intervalMode = interval;
+ } else if ("Median".equalsIgnoreCase(intervalMethod)) {
+ intervalMode = -1;
+ } else if ("Mode".equalsIgnoreCase(intervalMethod)) {
+ intervalMode = -2;
+ } else if ("Cluster".equalsIgnoreCase(intervalMethod)) {
+ intervalMode = -3;
+ } else {
+ throw new UDFException("Illegal method.");
+ }
}
}
@Override
public void transform(RowWindow rowWindow, PointCollector collector) throws
Exception {
- TimestampRepair ts = new TimestampRepair(rowWindow.getRowIterator(),
intervalMode, 2);
- ts.dpRepair();
- long[] timestamp = ts.getRepaired();
- double[] value = ts.getRepairedValue();
+ if (segmentation) {
+ TimesegmentRepair tseg = new
TimesegmentRepair(rowWindow.getRowIterator(), this.num_interval);
+ tseg.exactRepair();
+ timestamp = tseg.getRepaired();
+ value = tseg.getRepairedValue();
+ } else {
+ TimestampRepair ts = new TimestampRepair(rowWindow.getRowIterator(),
intervalMode, 2);
+ ts.dpRepair();
+ timestamp = ts.getRepaired();
+ value = ts.getRepairedValue();
+ }
switch (rowWindow.getDataType(0)) {
case DOUBLE:
for (int i = 0; i < timestamp.length; i++) {
@@ -120,12 +116,6 @@ public class UDTFTimestampRepair implements UDTF {
collector.putLong(timestamp[i], (long) value[i]);
}
break;
- case DATE:
- case TIMESTAMP:
- case BLOB:
- case BOOLEAN:
- case TEXT:
- case STRING:
default:
throw new UDFException("");
}
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimesegmentRepair.java
b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimesegmentRepair.java
new file mode 100644
index 00000000000..51b48aad1f2
--- /dev/null
+++
b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimesegmentRepair.java
@@ -0,0 +1,249 @@
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowIterator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TimesegmentRepair {
+ protected int n;
+ protected long[] time;
+ protected double[] original;
+ protected long[] repaired;
+ protected double[] repairedValue;
+ protected double lmdA = 100000.0; // Assign appropriate value for lmdA
+ protected double lmdD = 100000.0; // Assign appropriate value for lmdD
+ protected double mate = 100000.0; // Assign appropriate value for mate
+ protected int num_interval;
+
+ public TimesegmentRepair(RowIterator dataIterator, int num_interval) throws
Exception {
+ ArrayList<Long> timeList = new ArrayList<>();
+ ArrayList<Double> originList = new ArrayList<>();
+ while (dataIterator.hasNextRow()) {
+ Row row = dataIterator.next();
+ double v = Util.getValueAsDouble(row);
+ timeList.add(row.getTime());
+ if (!Double.isFinite(v)) {
+ originList.add(Double.NaN);
+ } else {
+ originList.add(v);
+ }
+ }
+ this.num_interval = num_interval;
+ time = Util.toLongArray(timeList);
+ original = Util.toDoubleArray(originList);
+ n = time.length;
+ }
+
+ public List<Map.Entry<Double, Long>> modeIntervalGranularity(List<Double>
value) {
+ Map<Double, Long> counter =
+ value.stream().collect(Collectors.groupingBy(e -> e,
Collectors.counting()));
+ return counter.entrySet().stream()
+ .sorted(Map.Entry.<Double, Long>comparingByValue().reversed())
+ .collect(Collectors.toList());
+ }
+
+ public double move(int i, int j, double interval, int s0) {
+ double tLen = time[j] - time[s0];
+ double sLen = i * interval;
+ double m = Math.abs(tLen - sLen) / interval;
+ return (m == 0) ? mate : -1 * m;
+ }
+
+ public Map<String, Object> scoreMatrix(double epsT, int k) {
+ int sNum = (int) Math.round((time[time.length - 1] - time[0]) / epsT + 1);
+ double[][] dp = new double[sNum][time.length];
+ int[][] st = new int[sNum][time.length];
+ int[][] step = new int[sNum][time.length];
+ List<Integer> s0 = new ArrayList<>();
+ for (int i = 0; i < sNum; i++) {
+ for (int j = 0; j < time.length; j++) {
+ dp[i][j] = -100;
+ st[i][j] = 0;
+ }
+ }
+ for (int j = 0; j < time.length; j++) {
+ dp[0][j] = mate;
+ st[0][j] = j;
+ step[0][j] = 2;
+ if (j != 0 && Math.abs(time[j] - time[j - 1]) > (k * epsT)) {
+ s0.add(j);
+ }
+ }
+ for (int i = 1; i < sNum; i++) {
+ for (int j = 0; j < time.length; j++) {
+ if (j == 0) {
+ dp[i][0] = Math.round(dp[i - 1][0] - lmdA);
+ step[i][0] = 1;
+ continue;
+ }
+ Map<Integer, Integer> dic = new HashMap<>();
+ dic.put(0, st[i - 1][j - 1]);
+ dic.put(1, st[i - 1][j]);
+ dic.put(2, st[i][j - 1]);
+ if (s0.contains(j)) {
+ dp[i][j] = Math.round(dp[i - 1][j] - lmdA);
+ st[i][j] = st[i - 1][j];
+ } else {
+ double m = Math.round(move(i, j, epsT, st[i - 1][j - 1]));
+ double a = Math.round(dp[i - 1][j] - lmdA);
+ double d = Math.round(dp[i][j - 1] - lmdD);
+ double[] arr = {Math.round((dp[i - 1][j - 1] + m)), a, d};
+ dp[i][j] = Math.round(Arrays.stream(arr).max().orElse(0.0));
+ // int maxIndex = 0;
+ for (int kIndex = 0; kIndex < arr.length; kIndex++) {
+ if (arr[kIndex] == dp[i][j]) {
+ step[i][j] = kIndex;
+ break;
+ }
+ }
+ st[i][j] = dic.get(step[i][j]);
+ }
+ }
+ }
+ Map<String, Object> result = new HashMap<>();
+ result.put("dp", dp);
+ result.put("st", st);
+ result.put("sNum", sNum);
+ result.put("step", step);
+ return result;
+ }
+
+ private void noRepair() {
+ for (int i = 0; i < time.length; i++) {
+ repaired[i] = time[i];
+ repairedValue[i] = original[i];
+ }
+ }
+
+ public void exactRepair() {
+ if (time.length <= 2) {
+ noRepair();
+ return;
+ }
+ List<Double> epsList = new ArrayList<>();
+ for (int i = 1; i < time.length; i++) {
+ epsList.add((double) Math.round(time[i] - time[i - 1]));
+ }
+ List<Map.Entry<Double, Long>> interval = modeIntervalGranularity(epsList);
+ lmdA = interval.get(0).getKey();
+ lmdD = interval.get(0).getKey();
+ mate = interval.get(0).getKey() * 2;
+ double median =
+
epsList.stream().mapToDouble(Double::doubleValue).sorted().toArray()[epsList.size()
/ 2];
+ int tNum = (median == interval.get(0).getKey()) ? 1 : num_interval;
+ List<List<Double>> intervalList = new ArrayList<>();
+ double[][] allMatrix = null;
+ int[][] allStart = null;
+ int[][] allStep = null;
+ int i = 0;
+ for (int j = 0; j < tNum + 1; j++) {
+ double epsT = interval.get(j).getKey();
+ if (epsT == 0) {
+ continue;
+ }
+ Map<String, Object> result = scoreMatrix(epsT, 20);
+ double[][] mt = (double[][]) result.get("dp");
+ int[][] st = (int[][]) result.get("st");
+ int m = (int) result.get("sNum");
+ int[][] step = (int[][]) result.get("step");
+ i = i + 1;
+ for (int k = 1; k <= m; k++) {
+ intervalList.add(Arrays.asList(epsT, (double) k));
+ }
+ if (i == 1) {
+ allMatrix = mt;
+ allStart = st;
+ allStep = step;
+ continue;
+ } else {
+ double[][] concatenatedMatrix = new double[mt.length +
allMatrix.length][];
+ System.arraycopy(allMatrix, 0, concatenatedMatrix, 0,
allMatrix.length);
+ System.arraycopy(mt, 0, concatenatedMatrix, allMatrix.length,
mt.length);
+ allMatrix = concatenatedMatrix;
+ int[][] concatenate = new int[st.length + allStart.length][];
+ System.arraycopy(allStart, 0, concatenate, 0, allStart.length);
+ System.arraycopy(st, 0, concatenate, allStart.length, st.length);
+ allStart = concatenate;
+
+ int[][] concatenatestep = new int[step.length + allStep.length][];
+ System.arraycopy(allStep, 0, concatenatestep, 0, allStep.length);
+ System.arraycopy(step, 0, concatenatestep, allStep.length,
step.length);
+ allStep = concatenatestep;
+ }
+ }
+ section(allMatrix, allStart, allStep, intervalList);
+ }
+
+ public void section(
+ double[][] matrix, int[][] start, int[][] step, List<List<Double>>
intervalList) {
+ // double maxScore = 0;
+ List<Integer> s0e = new ArrayList<>();
+ List<Integer> epsTe = new ArrayList<>();
+ List<List<Integer>> sp = new ArrayList<>();
+ List<Integer> me = new ArrayList<>();
+ int j = time.length - 1;
+ while (j >= 0) {
+ double maxValue = Double.NEGATIVE_INFINITY;
+ int maxRowIndex = -1;
+ for (int rowIndex = 0; rowIndex < matrix.length; rowIndex++) {
+ if (matrix[rowIndex][j] > maxValue) {
+ maxValue = matrix[rowIndex][j];
+ maxRowIndex = rowIndex;
+ }
+ }
+ s0e.add(start[maxRowIndex][j]);
+ epsTe.add(intervalList.get(maxRowIndex).get(0).intValue());
+ me.add(intervalList.get(maxRowIndex).get(1).intValue());
+ sp.add(Arrays.asList(maxRowIndex, j));
+ j = start[maxRowIndex][j] - 1;
+ }
+ int k = me.stream().mapToInt(Integer::intValue).sum();
+ repaired = new long[k];
+ repairedValue = new double[k];
+ for (int i = 0; i < s0e.size(); i++) {
+ int a = sp.get(i).get(0);
+ int b = sp.get(i).get(1);
+ for (j = (me.get(i) - 1); j >= 0; j--) {
+ long ps = time[s0e.get(i)] + j * epsTe.get(i);
+ if (j == 0) {
+ repaired[k - 1] = ps;
+ repairedValue[k - 1] = original[b];
+ k--;
+ continue;
+ }
+ if (step[a][b] == 0) {
+ repaired[k - 1] = ps;
+ repairedValue[k - 1] = original[b];
+ k--;
+ b--;
+ a--;
+ } else if (step[a][b] == 1) {
+ // add points
+ repaired[k - 1] = ps;
+ repairedValue[k - 1] = Double.NaN;
+ k--;
+ a--;
+ } else {
+ // delete points
+ b--;
+ j++;
+ }
+ }
+ }
+ }
+
+ public double[] getRepairedValue() {
+ return repairedValue;
+ }
+
+ public long[] getRepaired() {
+ return repaired;
+ }
+}