This is an automated email from the ASF dual-hosted git repository.

spricoder pushed a commit to branch research/timestamp-regulation
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/research/timestamp-regulation 
by this push:
     new 057756a7146 Create new timestamp-repair and data quality UDF (#14786)
057756a7146 is described below

commit 057756a71460a37d868187051aabb2f7dafeb619
Author: Beasttt <[email protected]>
AuthorDate: Tue Feb 4 15:34:34 2025 +0800

    Create new timestamp-repair and data quality UDF (#14786)
    
    * Create new timestamp-repair and data quality UDF
    
    * Create new timestamp-repair and data quality UDF v2
    
    * change only one parameter name
---
 .../iotdb/library/dquality/UDTFCompleteness.java   |  55 +++--
 .../iotdb/library/dquality/UDTFConsistency.java    |  50 ++---
 .../iotdb/library/dquality/UDTFTimeliness.java     |  52 ++---
 .../dquality/util/SubsequenceRowIterator.java      |  58 +++++
 .../dquality/util/TimeSeriesSegQuality.java        | 171 ++++++++++++++
 .../library/dquality/util/TimeSeriesSegment.java   | 205 +++++++++++++++++
 .../iotdb/library/drepair/UDTFTimestampRepair.java |  94 ++++----
 .../library/drepair/util/TimesegmentRepair.java    | 249 +++++++++++++++++++++
 8 files changed, 796 insertions(+), 138 deletions(-)

diff --git 
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFCompleteness.java
 
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFCompleteness.java
index 8b349240386..e4257cb5f07 100644
--- 
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFCompleteness.java
+++ 
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFCompleteness.java
@@ -1,25 +1,8 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
 package org.apache.iotdb.library.dquality;
 
 import org.apache.iotdb.library.dquality.util.TimeSeriesQuality;
+import org.apache.iotdb.library.dquality.util.TimeSeriesSegQuality;
+import org.apache.iotdb.library.util.NoNumberException;
 import org.apache.iotdb.library.util.Util;
 import org.apache.iotdb.udf.api.UDTF;
 import org.apache.iotdb.udf.api.access.RowWindow;
@@ -34,18 +17,19 @@ import java.io.IOException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-/** This function calculates completeness of input series. */
 public class UDTFCompleteness implements UDTF {
-
+  Boolean segmentation;
   private boolean downtime;
 
   @Override
   public void beforeStart(UDFParameters udfp, UDTFConfigurations udtfc) throws 
Exception {
     boolean isTime = false;
     long window = Integer.MAX_VALUE;
+
+    segmentation = udfp.getBooleanOrDefault("Segment", false);
     if (udfp.hasAttribute("window")) {
       String s = udfp.getString("window");
-      window = Util.parseTime(s, udfp);
+      window = Util.parseTime(s);
       if (window > 0) {
         isTime = true;
       } else {
@@ -63,15 +47,26 @@ public class UDTFCompleteness implements UDTF {
 
   @Override
   public void transform(RowWindow rowWindow, PointCollector collector) throws 
Exception {
-    try {
-      if (rowWindow.windowSize() > TimeSeriesQuality.WINDOW_SIZE) {
-        TimeSeriesQuality tsq = new 
TimeSeriesQuality(rowWindow.getRowIterator());
-        tsq.setDowntime(downtime);
-        tsq.timeDetect();
-        collector.putDouble(rowWindow.getRow(0).getTime(), 
tsq.getCompleteness());
+    if (segmentation) {
+      try {
+        if (rowWindow.windowSize() > TimeSeriesSegQuality.WINDOW_SIZE) {
+          TimeSeriesSegQuality tsq = new 
TimeSeriesSegQuality(rowWindow.getRowIterator(), 0);
+          collector.putDouble(rowWindow.getRow(0).getTime(), tsq.getAnswer());
+        }
+      } catch (IOException | NoNumberException ex) {
+        Logger.getLogger(UDTFCompleteness.class.getName()).log(Level.SEVERE, 
null, ex);
+      }
+    } else {
+      try {
+        if (rowWindow.windowSize() > TimeSeriesQuality.WINDOW_SIZE) {
+          TimeSeriesQuality tsq = new 
TimeSeriesQuality(rowWindow.getRowIterator());
+          tsq.setDowntime(downtime);
+          tsq.timeDetect();
+          collector.putDouble(rowWindow.getRow(0).getTime(), 
tsq.getCompleteness());
+        }
+      } catch (IOException | NoNumberException ex) {
+        Logger.getLogger(UDTFCompleteness.class.getName()).log(Level.SEVERE, 
null, ex);
       }
-    } catch (IOException ex) {
-      Logger.getLogger(UDTFCompleteness.class.getName()).log(Level.SEVERE, 
null, ex);
     }
   }
 }
diff --git 
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFConsistency.java
 
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFConsistency.java
index 3178d33343c..07b121ac0d4 100644
--- 
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFConsistency.java
+++ 
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFConsistency.java
@@ -1,25 +1,7 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
 package org.apache.iotdb.library.dquality;
 
 import org.apache.iotdb.library.dquality.util.TimeSeriesQuality;
+import org.apache.iotdb.library.dquality.util.TimeSeriesSegQuality;
 import org.apache.iotdb.library.util.NoNumberException;
 import org.apache.iotdb.library.util.Util;
 import org.apache.iotdb.udf.api.UDTF;
@@ -35,16 +17,17 @@ import java.io.IOException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-/** This function calculates consistency of input series. */
 public class UDTFConsistency implements UDTF {
+  Boolean segmentation;
 
   @Override
   public void beforeStart(UDFParameters udfp, UDTFConfigurations udtfc) throws 
Exception {
     boolean isTime = false;
     long window = Integer.MAX_VALUE;
+    segmentation = udfp.getBooleanOrDefault("Segment", false);
     if (udfp.hasAttribute("window")) {
       String s = udfp.getString("window");
-      window = Util.parseTime(s, udfp);
+      window = Util.parseTime(s);
       if (window > 0) {
         isTime = true;
       } else {
@@ -61,14 +44,25 @@ public class UDTFConsistency implements UDTF {
 
   @Override
   public void transform(RowWindow rowWindow, PointCollector collector) throws 
Exception {
-    try {
-      if (rowWindow.windowSize() > TimeSeriesQuality.WINDOW_SIZE) {
-        TimeSeriesQuality tsq = new 
TimeSeriesQuality(rowWindow.getRowIterator());
-        tsq.timeDetect();
-        collector.putDouble(rowWindow.getRow(0).getTime(), 
tsq.getConsistency());
+    if (segmentation) {
+      try {
+        if (rowWindow.windowSize() > TimeSeriesSegQuality.WINDOW_SIZE) {
+          TimeSeriesSegQuality tsq = new 
TimeSeriesSegQuality(rowWindow.getRowIterator(), 1);
+          collector.putDouble(rowWindow.getRow(0).getTime(), tsq.getAnswer());
+        }
+      } catch (IOException | NoNumberException ex) {
+        Logger.getLogger(UDTFConsistency.class.getName()).log(Level.SEVERE, 
null, ex);
+      }
+    } else {
+      try {
+        if (rowWindow.windowSize() > TimeSeriesQuality.WINDOW_SIZE) {
+          TimeSeriesQuality tsq = new 
TimeSeriesQuality(rowWindow.getRowIterator());
+          tsq.timeDetect();
+          collector.putDouble(rowWindow.getRow(0).getTime(), 
tsq.getConsistency());
+        }
+      } catch (IOException | NoNumberException ex) {
+        Logger.getLogger(UDTFConsistency.class.getName()).log(Level.SEVERE, 
null, ex);
       }
-    } catch (IOException | NoNumberException ex) {
-      Logger.getLogger(UDTFCompleteness.class.getName()).log(Level.SEVERE, 
null, ex);
     }
   }
 }
diff --git 
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFTimeliness.java
 
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFTimeliness.java
index 7a5fc98e182..b7ef9866d2f 100644
--- 
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFTimeliness.java
+++ 
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/UDTFTimeliness.java
@@ -1,25 +1,7 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
 package org.apache.iotdb.library.dquality;
 
 import org.apache.iotdb.library.dquality.util.TimeSeriesQuality;
+import org.apache.iotdb.library.dquality.util.TimeSeriesSegQuality;
 import org.apache.iotdb.library.util.NoNumberException;
 import org.apache.iotdb.library.util.Util;
 import org.apache.iotdb.udf.api.UDTF;
@@ -35,16 +17,19 @@ import java.io.IOException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-/** This function calculates timeliness of input series. */
 public class UDTFTimeliness implements UDTF {
+  Boolean segmentation;
+  long[] timestamp;
+  double[] value;
 
   @Override
   public void beforeStart(UDFParameters udfp, UDTFConfigurations udtfc) throws 
Exception {
     boolean isTime = false;
     long window = Integer.MAX_VALUE;
+    segmentation = udfp.getBooleanOrDefault("Segment", false);
     if (udfp.hasAttribute("window")) {
       String s = udfp.getString("window");
-      window = Util.parseTime(s, udfp);
+      window = Util.parseTime(s);
       if (window > 0) {
         isTime = true;
       } else {
@@ -61,14 +46,25 @@ public class UDTFTimeliness implements UDTF {
 
   @Override
   public void transform(RowWindow rowWindow, PointCollector collector) throws 
Exception {
-    try {
-      if (rowWindow.windowSize() > TimeSeriesQuality.WINDOW_SIZE) {
-        TimeSeriesQuality tsq = new 
TimeSeriesQuality(rowWindow.getRowIterator());
-        tsq.timeDetect();
-        collector.putDouble(rowWindow.getRow(0).getTime(), 
tsq.getTimeliness());
+    if (segmentation) {
+      try {
+        if (rowWindow.windowSize() > TimeSeriesSegQuality.WINDOW_SIZE) {
+          TimeSeriesSegQuality tsq = new 
TimeSeriesSegQuality(rowWindow.getRowIterator(), 2);
+          collector.putDouble(rowWindow.getRow(0).getTime(), tsq.getAnswer());
+        }
+      } catch (IOException | NoNumberException ex) {
+        Logger.getLogger(UDTFTimeliness.class.getName()).log(Level.SEVERE, 
null, ex);
+      }
+    } else {
+      try {
+        if (rowWindow.windowSize() > TimeSeriesQuality.WINDOW_SIZE) {
+          TimeSeriesQuality tsq = new 
TimeSeriesQuality(rowWindow.getRowIterator());
+          tsq.timeDetect();
+          collector.putDouble(rowWindow.getRow(0).getTime(), 
tsq.getTimeliness());
+        }
+      } catch (IOException | NoNumberException ex) {
+        Logger.getLogger(UDTFTimeliness.class.getName()).log(Level.SEVERE, 
null, ex);
       }
-    } catch (IOException | NoNumberException ex) {
-      Logger.getLogger(UDTFCompleteness.class.getName()).log(Level.SEVERE, 
null, ex);
     }
   }
 }
diff --git 
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/SubsequenceRowIterator.java
 
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/SubsequenceRowIterator.java
new file mode 100644
index 00000000000..85be05d5a7b
--- /dev/null
+++ 
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/SubsequenceRowIterator.java
@@ -0,0 +1,58 @@
+package org.apache.iotdb.library.dquality.util;
+
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowIterator;
+
+import java.io.IOException;
+
+public class SubsequenceRowIterator implements RowIterator {
+  private RowIterator originalIterator = null;
+  private long start = 0;
+  private long end = 0;
+  private int currentIndex;
+  private boolean resetFlag;
+
+  public SubsequenceRowIterator(RowIterator originalIterator, long start, long 
end) {
+    this.originalIterator = originalIterator;
+    this.start = start;
+    this.end = end;
+    this.currentIndex = -1;
+    this.resetFlag = true;
+    reset();
+  }
+
+  @Override
+  public boolean hasNextRow() {
+    return currentIndex < end && originalIterator.hasNextRow();
+  }
+
+  @Override
+  public Row next() throws IOException {
+    currentIndex++;
+    if (currentIndex < start) {
+      while (currentIndex < start && originalIterator.hasNextRow()) {
+        originalIterator.next();
+        currentIndex++;
+      }
+    }
+    if (currentIndex >= start && currentIndex <= end) {
+      return originalIterator.next();
+    }
+    throw new IOException("No more rows in the specified range");
+  }
+
+  @Override
+  public void reset() {
+    originalIterator.reset();
+    currentIndex = -1;
+    resetFlag = true;
+    while (resetFlag && currentIndex < start - 1) {
+      try {
+        originalIterator.next();
+        currentIndex++;
+      } catch (IOException e) {
+        resetFlag = false;
+      }
+    }
+  }
+}
diff --git 
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/TimeSeriesSegQuality.java
 
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/TimeSeriesSegQuality.java
new file mode 100644
index 00000000000..69f8554b758
--- /dev/null
+++ 
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/TimeSeriesSegQuality.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.dquality.util;
+
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowIterator;
+
+import org.apache.commons.math3.stat.descriptive.rank.Median;
+
+import java.util.ArrayList;
+
+/** Class for computing data quality index. */
+public class TimeSeriesSegQuality {
+  public static final int WINDOW_SIZE = 10;
+  private boolean downtime = false; // count for shutdown period
+  private int cnt = 0; // total number of points
+  private int missCnt = 0; // number of missing points
+  private int specialCnt = 0; // number of special values
+  private int lateCnt = 0; // number of latency points
+  private int redundancyCnt = 0; // number of redundancy points
+  private int valueCnt = 0; // number of out of range points
+  private int variationCnt = 0; // number of variation out of range points
+  private int speedCnt = 0; // number of speed out of range points
+  private int speedchangeCnt = 0; // number of speed change(acceleration) out 
of range points
+  protected double answer;
+  private double[] time; // series without special values
+
+  public TimeSeriesSegQuality(RowIterator dataIterator, int mode) throws 
Exception {
+    TimeSeriesSegment tseg = new TimeSeriesSegment(dataIterator);
+    tseg.exactRepair();
+    long[] starts = tseg.getStart();
+    long[] ends = tseg.getEnd();
+    answer = 0;
+    specialCnt = 0;
+    missCnt = 0;
+    specialCnt = 0;
+    lateCnt = 0;
+    redundancyCnt = 0;
+    cnt = 0;
+    valueCnt = 0;
+    variationCnt = 0;
+    speedCnt = 0;
+    speedchangeCnt = 0;
+    for (int i = 0; i < starts.length; i++) {
+      RowIterator subIterator = new SubsequenceRowIterator(dataIterator, 
starts[i], ends[i]);
+      ArrayList<Double> timeList = new ArrayList<>();
+      while (subIterator.hasNextRow()) {
+        Row row = subIterator.next();
+        cnt++;
+        double v = Util.getValueAsDouble(row);
+        double t = row.getTime();
+        if (Double.isFinite(v)) {
+          timeList.add(t);
+        } else { // processing NAN,INF
+          specialCnt++;
+          timeList.add(t);
+        }
+      }
+      time = Util.toDoubleArray(timeList);
+      timeList.clear();
+      timeDetect();
+      // starttimestamp[i] = starts[i];
+    }
+    if (mode == 0) {
+      answer = getCompleteness();
+    } else if (mode == 1) {
+      answer = getConsistency();
+    } else {
+      answer = getTimeliness();
+    }
+  }
+
+  /** Detect timestamp errors. */
+  public void timeDetect() {
+    // compute interval properties
+    double[] interval = Util.variation(time);
+    Median median = new Median();
+    double base = median.evaluate(interval);
+    // find timestamp anomalies
+    ArrayList<Double> window = new ArrayList<>();
+    int i;
+    for (i = 0; i < Math.min(time.length, WINDOW_SIZE); i++) { // fill initial 
data
+      window.add(time[i]);
+    }
+    while (window.size() > 1) {
+      double times = (window.get(1) - window.get(0)) / base;
+      if (times <= 0.5) { // delete over-concentrated points
+        window.remove(1);
+        redundancyCnt++;
+      } else if (times > 1.0 && (!downtime || times <= 9.0)) { // exclude 
power-off periods
+        // large interval means missing or delaying
+        int temp = 0; // find number of over-concentrated points in the 
following window
+        for (int j = 2; j < window.size(); j++) {
+          double times2 = (window.get(j) - window.get(j - 1)) / base;
+          if (times2 >= 2.0) { // end searching when another missing is found
+            break;
+          }
+          if (times2 < 1.0) { // over-concentrated points founded, maybe 
caused by delaying
+            temp++;
+            window.remove(j); // move delayed points
+            j--;
+            if (temp == (int) Math.round(times - 1)) {
+              break; // enough points to fill have been found
+            }
+          }
+        }
+        lateCnt += temp;
+        missCnt += (Math.round(times - 1) - temp);
+      }
+      window.remove(0); // remove processed points
+      while (window.size() < WINDOW_SIZE && i < time.length) {
+        // fill into the window
+        window.add(time[i]);
+        i++;
+      }
+    }
+    window.clear();
+  }
+
+  public double getCompleteness() {
+    return 1 - (missCnt + specialCnt) * 1.0 / (cnt + missCnt);
+  }
+
+  public double getConsistency() {
+    return 1 - redundancyCnt * 1.0 / cnt;
+  }
+
+  public double getTimeliness() {
+    return 1 - lateCnt * 1.0 / cnt;
+  }
+
+  public double getValidity() {
+    return 1 - (valueCnt + variationCnt + speedCnt + speedchangeCnt) * 0.25 / 
cnt;
+  }
+
+  /**
+   * @return the downtime
+   */
+  public boolean isDowntime() {
+    return downtime;
+  }
+
+  /**
+   * @param downtime the downtime to set
+   */
+  public void setDowntime(boolean downtime) {
+    this.downtime = downtime;
+  }
+
+  public double getAnswer() {
+    return answer;
+  }
+}
diff --git 
a/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/TimeSeriesSegment.java
 
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/TimeSeriesSegment.java
new file mode 100644
index 00000000000..49926faead5
--- /dev/null
+++ 
b/library-udf/src/main/java/org/apache/iotdb/library/dquality/util/TimeSeriesSegment.java
@@ -0,0 +1,205 @@
+package org.apache.iotdb.library.dquality.util;
+
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowIterator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TimeSeriesSegment {
+  protected int n;
+  protected long[] time;
+  protected double lmdA = 100000.0; // Assign appropriate value for lmdA
+  protected double lmdD = 100000.0; // Assign appropriate value for lmdD
+  protected double mate = 100000.0; // Assign appropriate value for mate
+  protected long[] starts;
+  protected long[] ends;
+
+  public TimeSeriesSegment(RowIterator dataIterator) throws Exception {
+    ArrayList<Long> timeList = new ArrayList<>();
+    while (dataIterator.hasNextRow()) {
+      Row row = dataIterator.next();
+      timeList.add(row.getTime());
+    }
+    time = Util.toLongArray(timeList);
+    n = time.length;
+    timeList.clear();
+  }
+
+  public List<Map.Entry<Double, Long>> modeIntervalGranularity(List<Double> 
value) {
+    Map<Double, Long> counter =
+        value.stream().collect(Collectors.groupingBy(e -> e, 
Collectors.counting()));
+    return counter.entrySet().stream()
+        .sorted(Map.Entry.<Double, Long>comparingByValue().reversed())
+        .collect(Collectors.toList());
+  }
+
+  public double move(int i, int j, double interval, int s0) {
+    double tLen = time[j] - time[s0];
+    double sLen = i * interval;
+    double m = Math.abs(tLen - sLen) / interval;
+    return (m == 0) ? mate : -1 * m;
+  }
+
+  public Map<String, Object> scoreMatrix(double epsT, int k) {
+    int sNum = (int) Math.round((time[time.length - 1] - time[0]) / epsT + 1);
+    double[][] dp = new double[sNum][time.length];
+    int[][] st = new int[sNum][time.length];
+    int[][] step = new int[sNum][time.length];
+    List<Integer> s0 = new ArrayList<>();
+    for (int i = 0; i < sNum; i++) {
+      for (int j = 0; j < time.length; j++) {
+        dp[i][j] = -100;
+        st[i][j] = 0;
+      }
+    }
+    for (int j = 0; j < time.length; j++) {
+      dp[0][j] = mate;
+      st[0][j] = j;
+      step[0][j] = 2;
+      if (j != 0 && Math.abs(time[j] - time[j - 1]) > (k * epsT)) {
+        s0.add(j);
+      }
+    }
+    for (int i = 1; i < sNum; i++) {
+      for (int j = 0; j < time.length; j++) {
+        if (j == 0) {
+          dp[i][0] = Math.round(dp[i - 1][0] - lmdA);
+          step[i][0] = 1;
+          continue;
+        }
+        Map<Integer, Integer> dic = new HashMap<>();
+        dic.put(0, st[i - 1][j - 1]);
+        dic.put(1, st[i - 1][j]);
+        dic.put(2, st[i][j - 1]);
+        if (s0.contains(j)) {
+          dp[i][j] = Math.round(dp[i - 1][j] - lmdA);
+          st[i][j] = st[i - 1][j];
+        } else {
+          double m = Math.round(move(i, j, epsT, st[i - 1][j - 1]));
+          double a = Math.round(dp[i - 1][j] - lmdA);
+          double d = Math.round(dp[i][j - 1] - lmdD);
+          double[] arr = {Math.round((dp[i - 1][j - 1] + m)), a, d};
+          dp[i][j] = Math.round(Arrays.stream(arr).max().orElse(0.0));
+
+          for (int kIndex = 0; kIndex < arr.length; kIndex++) {
+            if (arr[kIndex] == dp[i][j]) {
+              step[i][j] = kIndex;
+              break;
+            }
+          }
+          st[i][j] = dic.get(step[i][j]);
+        }
+      }
+    }
+    Map<String, Object> result = new HashMap<>();
+    result.put("dp", dp);
+    result.put("st", st);
+    result.put("sNum", sNum);
+    result.put("step", step);
+    s0.clear();
+    return result;
+  }
+
+  public void exactRepair() {
+    List<Double> epsList = new ArrayList<>();
+    for (int i = 1; i < time.length; i++) {
+      epsList.add((double) Math.round(time[i] - time[i - 1]));
+    }
+    List<Map.Entry<Double, Long>> interval = modeIntervalGranularity(epsList);
+    lmdA = interval.get(0).getKey();
+    lmdD = interval.get(0).getKey();
+    mate = interval.get(0).getKey() * 2;
+
+    double median =
+        
epsList.stream().mapToDouble(Double::doubleValue).sorted().toArray()[epsList.size()
 / 2];
+    int tNum = (median == interval.get(0).getKey()) ? 1 : 3;
+
+    List<List<Double>> intervalList = new ArrayList<>();
+    double[][] allMatrix = null;
+    int[][] allStart = null;
+    int[][] allStep = null;
+    int i = 0;
+    for (int j = 0; j < tNum; j++) {
+      double epsT = interval.get(j).getKey();
+
+      if (epsT == 0) {
+        continue;
+      }
+      Map<String, Object> result = scoreMatrix(epsT, 10);
+      double[][] mt = (double[][]) result.get("dp");
+      int[][] st = (int[][]) result.get("st");
+      int m = (int) result.get("sNum");
+      int[][] step = (int[][]) result.get("step");
+      i = i + 1;
+      for (int k = 1; k <= m; k++) {
+        intervalList.add(Arrays.asList(epsT, (double) k));
+      }
+      if (i == 1) {
+        allMatrix = mt;
+        allStart = st;
+        allStep = step;
+        continue;
+      } else {
+        // Concatenate all_matrix and mt
+        double[][] concatenatedMatrix = new double[mt.length + 
allMatrix.length][];
+        System.arraycopy(allMatrix, 0, concatenatedMatrix, 0, 
allMatrix.length);
+        System.arraycopy(mt, 0, concatenatedMatrix, allMatrix.length, 
mt.length);
+        allMatrix = concatenatedMatrix;
+        int[][] concatenate = new int[st.length + allStart.length][];
+        System.arraycopy(allStart, 0, concatenate, 0, allStart.length);
+        System.arraycopy(st, 0, concatenate, allStart.length, st.length);
+        allStart = concatenate;
+
+        int[][] concatenatestep = new int[step.length + allStep.length][];
+        System.arraycopy(allStep, 0, concatenatestep, 0, allStep.length);
+        System.arraycopy(step, 0, concatenatestep, allStep.length, 
step.length);
+        allStep = concatenatestep;
+      }
+    }
+    section(allMatrix, allStart, allStep, intervalList);
+  }
+
+  public void section(
+      double[][] matrix, int[][] start, int[][] step, List<List<Double>> 
intervalList) {
+    List<Integer> s0e = new ArrayList<>();
+    List<Integer> end = new ArrayList<>();
+    int j = time.length - 1;
+    while (j >= 0) {
+
+      double maxValue = Double.NEGATIVE_INFINITY;
+      int maxRowIndex = -1;
+
+      for (int rowIndex = 0; rowIndex < matrix.length; rowIndex++) {
+        if (matrix[rowIndex][j] > maxValue) {
+          maxValue = matrix[rowIndex][j];
+          maxRowIndex = rowIndex;
+        }
+      }
+      s0e.add(start[maxRowIndex][j]);
+      end.add(j);
+      j = start[maxRowIndex][j] - 1;
+    }
+    starts = new long[s0e.size()];
+    ends = new long[end.size()];
+    for (int i = 0; i < s0e.size(); i++) {
+      starts[i] = s0e.get(i);
+      ends[i] = end.get(i);
+    }
+    s0e.clear();
+    end.clear();
+  }
+
+  public long[] getStart() {
+    return starts;
+  }
+
+  public long[] getEnd() {
+    return ends;
+  }
+}
diff --git 
a/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFTimestampRepair.java
 
b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFTimestampRepair.java
index 2a605b9ab3a..520735709a6 100644
--- 
a/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFTimestampRepair.java
+++ 
b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFTimestampRepair.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.library.drepair;
 
+import org.apache.iotdb.library.drepair.util.TimesegmentRepair;
 import org.apache.iotdb.library.drepair.util.TimestampRepair;
-import org.apache.iotdb.library.util.Util;
 import org.apache.iotdb.udf.api.UDTF;
 import org.apache.iotdb.udf.api.access.RowWindow;
 import org.apache.iotdb.udf.api.collector.PointCollector;
@@ -31,33 +31,24 @@ import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrat
 import org.apache.iotdb.udf.api.exception.UDFException;
 import org.apache.iotdb.udf.api.type.Type;
 
-/** This function is used for timestamp repair. */
 public class UDTFTimestampRepair implements UDTF {
   String intervalMethod;
-  long interval;
-  long intervalMode;
+  Boolean segmentation;
+  int interval;
+  int intervalMode;
+  int num_interval;
+  long[] timestamp;
+  double[] value;
 
   @Override
   public void validate(UDFParameterValidator validator) throws Exception {
     validator
         .validateInputSeriesNumber(1)
-        .validateInputSeriesDataType(0, Type.DOUBLE, Type.FLOAT, Type.INT32, 
Type.INT64);
-
-    String intervalString = 
validator.getParameters().getStringOrDefault("interval", null);
-    if (intervalString != null) {
-      try {
-        interval = Long.parseLong(intervalString);
-      } catch (NumberFormatException e) {
-        try {
-          interval = Util.parseTime(intervalString, validator.getParameters());
-        } catch (Exception ex) {
-          throw new UDFException("Invalid time format for interval.");
-        }
-      }
-      validator.validate(
-          x -> interval > 0,
-          "Invalid time unit input. Supported units are ns, us, ms, s, m, h, 
d.");
-    }
+        .validateInputSeriesDataType(0, Type.DOUBLE, Type.FLOAT, Type.INT32, 
Type.INT64)
+        .validate(
+            x -> (Integer) x >= 0,
+            "Interval should be a positive integer.",
+            validator.getParameters().getIntOrDefault("interval", 0));
   }
 
   @Override
@@ -66,39 +57,44 @@ public class UDTFTimestampRepair implements UDTF {
     configurations
         .setAccessStrategy(new 
SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE))
         .setOutputDataType(parameters.getDataType(0));
-
+    segmentation = parameters.getBooleanOrDefault("Segment", false);
     intervalMethod = parameters.getStringOrDefault("method", "Median");
-    String intervalString = parameters.getStringOrDefault("interval", null);
-
-    if (intervalString != null) {
-      try {
-        interval = Long.parseLong(intervalString);
-      } catch (NumberFormatException e) {
-        interval = Util.parseTime(intervalString, parameters);
+    interval = parameters.getIntOrDefault("interval", 0);
+    num_interval = parameters.getIntOrDefault("num_interval", 1);
+    if (segmentation) {
+      if (num_interval > 10) {
+        throw new UDFException("The calculation interval is too large, causing 
slow execution!");
+      } else if (num_interval <= 0) {
+        throw new UDFException("Error num_interval!");
       }
     } else {
-      interval = 0;
-    }
-
-    if (interval > 0) {
-      intervalMode = interval;
-    } else if ("Median".equalsIgnoreCase(intervalMethod)) {
-      intervalMode = -1L;
-    } else if ("Mode".equalsIgnoreCase(intervalMethod)) {
-      intervalMode = -2L;
-    } else if ("Cluster".equalsIgnoreCase(intervalMethod)) {
-      intervalMode = -3L;
-    } else {
-      throw new UDFException("Illegal method.");
+      if (interval > 0) {
+        intervalMode = interval;
+      } else if ("Median".equalsIgnoreCase(intervalMethod)) {
+        intervalMode = -1;
+      } else if ("Mode".equalsIgnoreCase(intervalMethod)) {
+        intervalMode = -2;
+      } else if ("Cluster".equalsIgnoreCase(intervalMethod)) {
+        intervalMode = -3;
+      } else {
+        throw new UDFException("Illegal method.");
+      }
     }
   }
 
   @Override
   public void transform(RowWindow rowWindow, PointCollector collector) throws 
Exception {
-    TimestampRepair ts = new TimestampRepair(rowWindow.getRowIterator(), 
intervalMode, 2);
-    ts.dpRepair();
-    long[] timestamp = ts.getRepaired();
-    double[] value = ts.getRepairedValue();
+    if (segmentation) {
+      TimesegmentRepair tseg = new 
TimesegmentRepair(rowWindow.getRowIterator(), this.num_interval);
+      tseg.exactRepair();
+      timestamp = tseg.getRepaired();
+      value = tseg.getRepairedValue();
+    } else {
+      TimestampRepair ts = new TimestampRepair(rowWindow.getRowIterator(), 
intervalMode, 2);
+      ts.dpRepair();
+      timestamp = ts.getRepaired();
+      value = ts.getRepairedValue();
+    }
     switch (rowWindow.getDataType(0)) {
       case DOUBLE:
         for (int i = 0; i < timestamp.length; i++) {
@@ -120,12 +116,6 @@ public class UDTFTimestampRepair implements UDTF {
           collector.putLong(timestamp[i], (long) value[i]);
         }
         break;
-      case DATE:
-      case TIMESTAMP:
-      case BLOB:
-      case BOOLEAN:
-      case TEXT:
-      case STRING:
       default:
         throw new UDFException("");
     }
diff --git 
a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimesegmentRepair.java
 
b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimesegmentRepair.java
new file mode 100644
index 00000000000..51b48aad1f2
--- /dev/null
+++ 
b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimesegmentRepair.java
@@ -0,0 +1,249 @@
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowIterator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TimesegmentRepair {
+  protected int n;
+  protected long[] time;
+  protected double[] original;
+  protected long[] repaired;
+  protected double[] repairedValue;
+  protected double lmdA = 100000.0; // Assign appropriate value for lmdA
+  protected double lmdD = 100000.0; // Assign appropriate value for lmdD
+  protected double mate = 100000.0; // Assign appropriate value for mate
+  protected int num_interval;
+
+  public TimesegmentRepair(RowIterator dataIterator, int num_interval) throws 
Exception {
+    ArrayList<Long> timeList = new ArrayList<>();
+    ArrayList<Double> originList = new ArrayList<>();
+    while (dataIterator.hasNextRow()) {
+      Row row = dataIterator.next();
+      double v = Util.getValueAsDouble(row);
+      timeList.add(row.getTime());
+      if (!Double.isFinite(v)) {
+        originList.add(Double.NaN);
+      } else {
+        originList.add(v);
+      }
+    }
+    this.num_interval = num_interval;
+    time = Util.toLongArray(timeList);
+    original = Util.toDoubleArray(originList);
+    n = time.length;
+  }
+
+  public List<Map.Entry<Double, Long>> modeIntervalGranularity(List<Double> 
value) {
+    Map<Double, Long> counter =
+        value.stream().collect(Collectors.groupingBy(e -> e, 
Collectors.counting()));
+    return counter.entrySet().stream()
+        .sorted(Map.Entry.<Double, Long>comparingByValue().reversed())
+        .collect(Collectors.toList());
+  }
+
+  public double move(int i, int j, double interval, int s0) {
+    double tLen = time[j] - time[s0];
+    double sLen = i * interval;
+    double m = Math.abs(tLen - sLen) / interval;
+    return (m == 0) ? mate : -1 * m;
+  }
+
+  public Map<String, Object> scoreMatrix(double epsT, int k) {
+    int sNum = (int) Math.round((time[time.length - 1] - time[0]) / epsT + 1);
+    double[][] dp = new double[sNum][time.length];
+    int[][] st = new int[sNum][time.length];
+    int[][] step = new int[sNum][time.length];
+    List<Integer> s0 = new ArrayList<>();
+    for (int i = 0; i < sNum; i++) {
+      for (int j = 0; j < time.length; j++) {
+        dp[i][j] = -100;
+        st[i][j] = 0;
+      }
+    }
+    for (int j = 0; j < time.length; j++) {
+      dp[0][j] = mate;
+      st[0][j] = j;
+      step[0][j] = 2;
+      if (j != 0 && Math.abs(time[j] - time[j - 1]) > (k * epsT)) {
+        s0.add(j);
+      }
+    }
+    for (int i = 1; i < sNum; i++) {
+      for (int j = 0; j < time.length; j++) {
+        if (j == 0) {
+          dp[i][0] = Math.round(dp[i - 1][0] - lmdA);
+          step[i][0] = 1;
+          continue;
+        }
+        Map<Integer, Integer> dic = new HashMap<>();
+        dic.put(0, st[i - 1][j - 1]);
+        dic.put(1, st[i - 1][j]);
+        dic.put(2, st[i][j - 1]);
+        if (s0.contains(j)) {
+          dp[i][j] = Math.round(dp[i - 1][j] - lmdA);
+          st[i][j] = st[i - 1][j];
+        } else {
+          double m = Math.round(move(i, j, epsT, st[i - 1][j - 1]));
+          double a = Math.round(dp[i - 1][j] - lmdA);
+          double d = Math.round(dp[i][j - 1] - lmdD);
+          double[] arr = {Math.round((dp[i - 1][j - 1] + m)), a, d};
+          dp[i][j] = Math.round(Arrays.stream(arr).max().orElse(0.0));
+          // int maxIndex = 0;
+          for (int kIndex = 0; kIndex < arr.length; kIndex++) {
+            if (arr[kIndex] == dp[i][j]) {
+              step[i][j] = kIndex;
+              break;
+            }
+          }
+          st[i][j] = dic.get(step[i][j]);
+        }
+      }
+    }
+    Map<String, Object> result = new HashMap<>();
+    result.put("dp", dp);
+    result.put("st", st);
+    result.put("sNum", sNum);
+    result.put("step", step);
+    return result;
+  }
+
+  private void noRepair() {
+    for (int i = 0; i < time.length; i++) {
+      repaired[i] = time[i];
+      repairedValue[i] = original[i];
+    }
+  }
+
+  public void exactRepair() {
+    if (time.length <= 2) {
+      noRepair();
+      return;
+    }
+    List<Double> epsList = new ArrayList<>();
+    for (int i = 1; i < time.length; i++) {
+      epsList.add((double) Math.round(time[i] - time[i - 1]));
+    }
+    List<Map.Entry<Double, Long>> interval = modeIntervalGranularity(epsList);
+    lmdA = interval.get(0).getKey();
+    lmdD = interval.get(0).getKey();
+    mate = interval.get(0).getKey() * 2;
+    double median =
+        
epsList.stream().mapToDouble(Double::doubleValue).sorted().toArray()[epsList.size()
 / 2];
+    int tNum = (median == interval.get(0).getKey()) ? 1 : num_interval;
+    List<List<Double>> intervalList = new ArrayList<>();
+    double[][] allMatrix = null;
+    int[][] allStart = null;
+    int[][] allStep = null;
+    int i = 0;
+    for (int j = 0; j < tNum + 1; j++) {
+      double epsT = interval.get(j).getKey();
+      if (epsT == 0) {
+        continue;
+      }
+      Map<String, Object> result = scoreMatrix(epsT, 20);
+      double[][] mt = (double[][]) result.get("dp");
+      int[][] st = (int[][]) result.get("st");
+      int m = (int) result.get("sNum");
+      int[][] step = (int[][]) result.get("step");
+      i = i + 1;
+      for (int k = 1; k <= m; k++) {
+        intervalList.add(Arrays.asList(epsT, (double) k));
+      }
+      if (i == 1) {
+        allMatrix = mt;
+        allStart = st;
+        allStep = step;
+        continue;
+      } else {
+        double[][] concatenatedMatrix = new double[mt.length + 
allMatrix.length][];
+        System.arraycopy(allMatrix, 0, concatenatedMatrix, 0, 
allMatrix.length);
+        System.arraycopy(mt, 0, concatenatedMatrix, allMatrix.length, 
mt.length);
+        allMatrix = concatenatedMatrix;
+        int[][] concatenate = new int[st.length + allStart.length][];
+        System.arraycopy(allStart, 0, concatenate, 0, allStart.length);
+        System.arraycopy(st, 0, concatenate, allStart.length, st.length);
+        allStart = concatenate;
+
+        int[][] concatenatestep = new int[step.length + allStep.length][];
+        System.arraycopy(allStep, 0, concatenatestep, 0, allStep.length);
+        System.arraycopy(step, 0, concatenatestep, allStep.length, 
step.length);
+        allStep = concatenatestep;
+      }
+    }
+    section(allMatrix, allStart, allStep, intervalList);
+  }
+
+  public void section(
+      double[][] matrix, int[][] start, int[][] step, List<List<Double>> 
intervalList) {
+    // double maxScore = 0;
+    List<Integer> s0e = new ArrayList<>();
+    List<Integer> epsTe = new ArrayList<>();
+    List<List<Integer>> sp = new ArrayList<>();
+    List<Integer> me = new ArrayList<>();
+    int j = time.length - 1;
+    while (j >= 0) {
+      double maxValue = Double.NEGATIVE_INFINITY;
+      int maxRowIndex = -1;
+      for (int rowIndex = 0; rowIndex < matrix.length; rowIndex++) {
+        if (matrix[rowIndex][j] > maxValue) {
+          maxValue = matrix[rowIndex][j];
+          maxRowIndex = rowIndex;
+        }
+      }
+      s0e.add(start[maxRowIndex][j]);
+      epsTe.add(intervalList.get(maxRowIndex).get(0).intValue());
+      me.add(intervalList.get(maxRowIndex).get(1).intValue());
+      sp.add(Arrays.asList(maxRowIndex, j));
+      j = start[maxRowIndex][j] - 1;
+    }
+    int k = me.stream().mapToInt(Integer::intValue).sum();
+    repaired = new long[k];
+    repairedValue = new double[k];
+    for (int i = 0; i < s0e.size(); i++) {
+      int a = sp.get(i).get(0);
+      int b = sp.get(i).get(1);
+      for (j = (me.get(i) - 1); j >= 0; j--) {
+        long ps = time[s0e.get(i)] + j * epsTe.get(i);
+        if (j == 0) {
+          repaired[k - 1] = ps;
+          repairedValue[k - 1] = original[b];
+          k--;
+          continue;
+        }
+        if (step[a][b] == 0) {
+          repaired[k - 1] = ps;
+          repairedValue[k - 1] = original[b];
+          k--;
+          b--;
+          a--;
+        } else if (step[a][b] == 1) {
+          // add points
+          repaired[k - 1] = ps;
+          repairedValue[k - 1] = Double.NaN;
+          k--;
+          a--;
+        } else {
+          // delete points
+          b--;
+          j++;
+        }
+      }
+    }
+  }
+
+  public double[] getRepairedValue() {
+    return repairedValue;
+  }
+
+  public long[] getRepaired() {
+    return repaired;
+  }
+}


Reply via email to