This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c1df2bf8a65 Pipe: Avoid writing out-of-order data in tsfile when 
`'sink.format' = 'file'`  (#12810)
c1df2bf8a65 is described below

commit c1df2bf8a6579c702b4fd2e0a618d3499978a870
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jun 26 18:45:38 2024 +0800

    Pipe: Avoid writing out-of-order data in tsfile when `'sink.format' = 
'file'`  (#12810)
---
 .../batch/PipeTabletEventTsFileBatch.java          | 159 ++++++++++-----
 .../request/PipeTransferTabletRawReq.java          | 133 +------------
 .../pipe/connector/util/PipeTabletEventSorter.java | 202 +++++++++++++++++++
 .../pipe/connector/PipeTabletEventSorterTest.java  | 217 +++++++++++++++++++++
 4 files changed, 530 insertions(+), 181 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 6abca7b3183..4864a124668 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
@@ -48,6 +49,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -165,7 +167,12 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
   }
 
   private void bufferTablet(
-      final String pipeName, long creationTime, Tablet tablet, boolean 
isAligned) {
+      final String pipeName,
+      final long creationTime,
+      final Tablet tablet,
+      final boolean isAligned) {
+    new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+
     totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
 
     pipeName2WeightMap.compute(
@@ -208,12 +215,19 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
           Comparator.comparingLong(tablet -> tablet.timestamps[0]));
     }
 
+    // Sort the devices by device id
+    List<String> devices = new ArrayList<>(device2Tablets.keySet());
+    devices.sort(Comparator.naturalOrder());
+
     // Replace ArrayList with LinkedList to improve performance
-    final Map<String, LinkedList<Tablet>> device2TabletsLinkedList = new 
HashMap<>();
-    for (final Map.Entry<String, List<Tablet>> entry : 
device2Tablets.entrySet()) {
-      device2TabletsLinkedList.put(entry.getKey(), new 
LinkedList<>(entry.getValue()));
+    final LinkedHashMap<String, LinkedList<Tablet>> device2TabletsLinkedList =
+        new LinkedHashMap<>();
+    for (final String device : devices) {
+      device2TabletsLinkedList.put(device, new 
LinkedList<>(device2Tablets.get(device)));
     }
-    // Clear the original device2Tablets to release memory
+
+    // Help GC
+    devices.clear();
     device2Tablets.clear();
 
     // Write the tablets to the tsfile device by device, and the tablets
@@ -241,56 +255,44 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
                         + TsFileConstant.TSFILE_SUFFIX));
       }
 
-      final Iterator<Map.Entry<String, LinkedList<Tablet>>> iterator =
-          device2TabletsLinkedList.entrySet().iterator();
-
-      while (iterator.hasNext()) {
-        final Map.Entry<String, LinkedList<Tablet>> entry = iterator.next();
-        final String deviceId = entry.getKey();
-        final LinkedList<Tablet> tablets = entry.getValue();
-
-        final List<Tablet> tabletsToWrite = new ArrayList<>();
-
-        Tablet lastTablet = null;
-        while (!tablets.isEmpty()) {
-          final Tablet tablet = tablets.peekFirst();
-          if (Objects.isNull(lastTablet)
-              // lastTablet.rowSize is not 0
-              || lastTablet.timestamps[lastTablet.rowSize - 1] < 
tablet.timestamps[0]) {
-            tabletsToWrite.add(tablet);
-            lastTablet = tablet;
-            tablets.pollFirst();
-          } else {
-            break;
-          }
-        }
+      try {
+        tryBestToWriteTabletsIntoOneFile(device2TabletsLinkedList, 
device2Aligned);
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Batch id = {}: Failed to write tablets into tsfile, because {}",
+            currentBatchId.get(),
+            e.getMessage(),
+            e);
 
-        if (tablets.isEmpty()) {
-          iterator.remove();
+        try {
+          fileWriter.close();
+        } catch (final Exception closeException) {
+          LOGGER.warn(
+              "Batch id = {}: Failed to close the tsfile {} after failed to 
write tablets into, because {}",
+              currentBatchId.get(),
+              fileWriter.getIOWriter().getFile().getPath(),
+              closeException.getMessage(),
+              closeException);
+        } finally {
+          // Add current writing file to the list and delete the file
+          sealedFiles.add(fileWriter.getIOWriter().getFile());
         }
 
-        final boolean isAligned = device2Aligned.get(deviceId);
-        for (final Tablet tablet : tabletsToWrite) {
-          if (isAligned) {
-            try {
-              fileWriter.registerAlignedTimeseries(new Path(tablet.deviceId), 
tablet.getSchemas());
-            } catch (final WriteProcessException ignore) {
-              // Do nothing if the timeSeries has been registered
-            }
+        for (final File sealedFile : sealedFiles) {
+          final boolean deleteSuccess = FileUtils.deleteQuietly(sealedFile);
+          LOGGER.warn(
+              "Batch id = {}: {} delete the tsfile {} after failed to write 
tablets into {}. {}",
+              currentBatchId.get(),
+              deleteSuccess ? "Successfully" : "Failed to",
+              sealedFile.getPath(),
+              fileWriter.getIOWriter().getFile().getPath(),
+              deleteSuccess ? "" : "Maybe the tsfile needs to be deleted 
manually.");
+        }
+        sealedFiles.clear();
 
-            fileWriter.writeAligned(tablet);
-          } else {
-            for (final MeasurementSchema schema : tablet.getSchemas()) {
-              try {
-                fileWriter.registerTimeseries(new Path(tablet.deviceId), 
schema);
-              } catch (final WriteProcessException ignore) {
-                // Do nothing if the timeSeries has been registered
-              }
-            }
+        fileWriter = null;
 
-            fileWriter.write(tablet);
-          }
-        }
+        throw e;
       }
 
       fileWriter.close();
@@ -308,6 +310,63 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
     return sealedFiles;
   }
 
+  private void tryBestToWriteTabletsIntoOneFile(
+      LinkedHashMap<String, LinkedList<Tablet>> device2TabletsLinkedList,
+      Map<String, Boolean> device2Aligned)
+      throws IOException, WriteProcessException {
+    final Iterator<Map.Entry<String, LinkedList<Tablet>>> iterator =
+        device2TabletsLinkedList.entrySet().iterator();
+
+    while (iterator.hasNext()) {
+      final Map.Entry<String, LinkedList<Tablet>> entry = iterator.next();
+      final String deviceId = entry.getKey();
+      final LinkedList<Tablet> tablets = entry.getValue();
+
+      final List<Tablet> tabletsToWrite = new ArrayList<>();
+
+      Tablet lastTablet = null;
+      while (!tablets.isEmpty()) {
+        final Tablet tablet = tablets.peekFirst();
+        if (Objects.isNull(lastTablet)
+            // lastTablet.rowSize is not 0
+            || lastTablet.timestamps[lastTablet.rowSize - 1] < 
tablet.timestamps[0]) {
+          tabletsToWrite.add(tablet);
+          lastTablet = tablet;
+          tablets.pollFirst();
+        } else {
+          break;
+        }
+      }
+
+      if (tablets.isEmpty()) {
+        iterator.remove();
+      }
+
+      final boolean isAligned = device2Aligned.get(deviceId);
+      for (final Tablet tablet : tabletsToWrite) {
+        if (isAligned) {
+          try {
+            fileWriter.registerAlignedTimeseries(new Path(tablet.deviceId), 
tablet.getSchemas());
+          } catch (final WriteProcessException ignore) {
+            // Do nothing if the timeSeries has been registered
+          }
+
+          fileWriter.writeAligned(tablet);
+        } else {
+          for (final MeasurementSchema schema : tablet.getSchemas()) {
+            try {
+              fileWriter.registerTimeseries(new Path(tablet.deviceId), schema);
+            } catch (final WriteProcessException ignore) {
+              // Do nothing if the timeSeries has been registered
+            }
+          }
+
+          fileWriter.write(tablet);
+        }
+      }
+    }
+  }
+
   @Override
   protected long getMaxBatchSizeInBytes() {
     return maxSizeInBytes;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
index 33a98d84e42..ed4ca323fbe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -23,18 +23,15 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
 import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter;
 import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.session.util.SessionUtils;
 
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.utils.Binary;
-import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.apache.tsfile.write.UnSupportedDataTypeException;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.slf4j.Logger;
@@ -43,9 +40,6 @@ import org.slf4j.LoggerFactory;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.time.LocalDate;
-import java.util.Arrays;
-import java.util.Comparator;
 import java.util.Objects;
 
 public class PipeTransferTabletRawReq extends TPipeTransferReq {
@@ -64,9 +58,7 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
   }
 
   public InsertTabletStatement constructStatement() {
-    if (!checkSorted(tablet)) {
-      sortTablet(tablet);
-    }
+    new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
 
     try {
       final TSInsertTabletReq request = new TSInsertTabletReq();
@@ -91,127 +83,6 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
     }
   }
 
-  public static boolean checkSorted(final Tablet tablet) {
-    for (int i = 1; i < tablet.rowSize; i++) {
-      if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  public static void sortTablet(final Tablet tablet) {
-    /*
-     * following part of code sort the batch data by time,
-     * so we can insert continuous data in value list to get a better 
performance
-     */
-    // sort to get index, and use index to sort value list
-    final Integer[] index = new Integer[tablet.rowSize];
-    for (int i = 0; i < tablet.rowSize; i++) {
-      index[i] = i;
-    }
-    Arrays.sort(index, Comparator.comparingLong(o -> tablet.timestamps[o]));
-    Arrays.sort(tablet.timestamps, 0, tablet.rowSize);
-    int columnIndex = 0;
-    for (int i = 0; i < tablet.getSchemas().size(); i++) {
-      final IMeasurementSchema schema = tablet.getSchemas().get(i);
-      if (schema != null) {
-        tablet.values[columnIndex] = sortList(tablet.values[columnIndex], 
schema.getType(), index);
-        if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
-          tablet.bitMaps[columnIndex] = 
sortBitMap(tablet.bitMaps[columnIndex], index);
-        }
-        columnIndex++;
-      }
-    }
-  }
-
-  /**
-   * Sort value list by index.
-   *
-   * @param valueList value list
-   * @param dataType data type
-   * @param index index
-   * @return sorted list
-   * @throws UnSupportedDataTypeException if dataType is illegal
-   */
-  private static Object sortList(
-      final Object valueList, final TSDataType dataType, final Integer[] 
index) {
-    switch (dataType) {
-      case BOOLEAN:
-        final boolean[] boolValues = (boolean[]) valueList;
-        final boolean[] sortedValues = new boolean[boolValues.length];
-        for (int i = 0; i < index.length; i++) {
-          sortedValues[i] = boolValues[index[i]];
-        }
-        return sortedValues;
-      case INT32:
-        final int[] intValues = (int[]) valueList;
-        final int[] sortedIntValues = new int[intValues.length];
-        for (int i = 0; i < index.length; i++) {
-          sortedIntValues[i] = intValues[index[i]];
-        }
-        return sortedIntValues;
-      case DATE:
-        final LocalDate[] dateValues = (LocalDate[]) valueList;
-        final LocalDate[] sortedDateValues = new LocalDate[dateValues.length];
-        for (int i = 0; i < index.length; i++) {
-          sortedDateValues[i] = dateValues[index[i]];
-        }
-        return sortedDateValues;
-      case INT64:
-      case TIMESTAMP:
-        final long[] longValues = (long[]) valueList;
-        final long[] sortedLongValues = new long[longValues.length];
-        for (int i = 0; i < index.length; i++) {
-          sortedLongValues[i] = longValues[index[i]];
-        }
-        return sortedLongValues;
-      case FLOAT:
-        final float[] floatValues = (float[]) valueList;
-        final float[] sortedFloatValues = new float[floatValues.length];
-        for (int i = 0; i < index.length; i++) {
-          sortedFloatValues[i] = floatValues[index[i]];
-        }
-        return sortedFloatValues;
-      case DOUBLE:
-        final double[] doubleValues = (double[]) valueList;
-        final double[] sortedDoubleValues = new double[doubleValues.length];
-        for (int i = 0; i < index.length; i++) {
-          sortedDoubleValues[i] = doubleValues[index[i]];
-        }
-        return sortedDoubleValues;
-      case TEXT:
-      case BLOB:
-      case STRING:
-        final Binary[] binaryValues = (Binary[]) valueList;
-        final Binary[] sortedBinaryValues = new Binary[binaryValues.length];
-        for (int i = 0; i < index.length; i++) {
-          sortedBinaryValues[i] = binaryValues[index[i]];
-        }
-        return sortedBinaryValues;
-      default:
-        throw new UnSupportedDataTypeException(
-            String.format("Data type %s is not supported.", dataType));
-    }
-  }
-
-  /**
-   * Sort BitMap by index.
-   *
-   * @param bitMap BitMap to be sorted
-   * @param index index
-   * @return sorted bitMap
-   */
-  private static BitMap sortBitMap(final BitMap bitMap, final Integer[] index) 
{
-    final BitMap sortedBitMap = new BitMap(bitMap.getSize());
-    for (int i = 0; i < index.length; i++) {
-      if (bitMap.isMarked(index[i])) {
-        sortedBitMap.mark(i);
-      }
-    }
-    return sortedBitMap;
-  }
-
   /////////////////////////////// WriteBack & Batch 
///////////////////////////////
 
   public static PipeTransferTabletRawReq toTPipeTransferRawReq(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java
new file mode 100644
index 00000000000..2a5e8769b59
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.util;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Comparator;
+
+public class PipeTabletEventSorter {
+
+  private final Tablet tablet;
+
+  private boolean isSorted = true;
+  private boolean isDeduplicated = true;
+
+  private Integer[] index;
+  private int deduplicatedSize;
+
+  public PipeTabletEventSorter(final Tablet tablet) {
+    this.tablet = tablet;
+    deduplicatedSize = tablet == null ? 0 : tablet.rowSize;
+  }
+
+  public void deduplicateAndSortTimestampsIfNecessary() {
+    if (tablet == null || tablet.rowSize == 0) {
+      return;
+    }
+
+    for (int i = 1, size = tablet.rowSize; i < size; ++i) {
+      final long currentTimestamp = tablet.timestamps[i];
+      final long previousTimestamp = tablet.timestamps[i - 1];
+
+      if (currentTimestamp < previousTimestamp) {
+        isSorted = false;
+      }
+      if (currentTimestamp == previousTimestamp) {
+        isDeduplicated = false;
+      }
+
+      if (!isSorted && !isDeduplicated) {
+        break;
+      }
+    }
+
+    if (isSorted && isDeduplicated) {
+      return;
+    }
+
+    index = new Integer[tablet.rowSize];
+    for (int i = 0, size = tablet.rowSize; i < size; i++) {
+      index[i] = i;
+    }
+
+    if (!isSorted) {
+      sortTimestamps();
+
+      // Do deduplicate anyway.
+      // isDeduplicated may be false positive when isSorted is false.
+      deduplicateTimestamps();
+      isDeduplicated = true;
+    }
+
+    if (!isDeduplicated) {
+      deduplicateTimestamps();
+    }
+
+    sortAndDeduplicateValuesAndBitMaps();
+  }
+
+  private void sortTimestamps() {
+    Arrays.sort(index, Comparator.comparingLong(i -> tablet.timestamps[i]));
+    Arrays.sort(tablet.timestamps, 0, tablet.rowSize);
+  }
+
+  private void deduplicateTimestamps() {
+    deduplicatedSize = 1;
+    for (int i = 1, size = tablet.rowSize; i < size; i++) {
+      if (tablet.timestamps[i] != tablet.timestamps[i - 1]) {
+        index[deduplicatedSize] = index[i];
+        tablet.timestamps[deduplicatedSize] = tablet.timestamps[i];
+
+        ++deduplicatedSize;
+      }
+    }
+    tablet.rowSize = deduplicatedSize;
+  }
+
+  private void sortAndDeduplicateValuesAndBitMaps() {
+    int columnIndex = 0;
+    for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) {
+      final IMeasurementSchema schema = tablet.getSchemas().get(i);
+      if (schema != null) {
+        tablet.values[columnIndex] =
+            reorderValueList(deduplicatedSize, tablet.values[columnIndex], 
schema.getType(), index);
+        if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
+          tablet.bitMaps[columnIndex] =
+              reorderBitMap(deduplicatedSize, tablet.bitMaps[columnIndex], 
index);
+        }
+        columnIndex++;
+      }
+    }
+  }
+
+  private static Object reorderValueList(
+      int deduplicatedSize,
+      final Object valueList,
+      final TSDataType dataType,
+      final Integer[] index) {
+    switch (dataType) {
+      case BOOLEAN:
+        final boolean[] boolValues = (boolean[]) valueList;
+        final boolean[] deduplicatedBoolValues = new 
boolean[boolValues.length];
+        for (int i = 0; i < deduplicatedSize; i++) {
+          deduplicatedBoolValues[i] = boolValues[index[i]];
+        }
+        return deduplicatedBoolValues;
+      case INT32:
+        final int[] intValues = (int[]) valueList;
+        final int[] deduplicatedIntValues = new int[intValues.length];
+        for (int i = 0; i < deduplicatedSize; i++) {
+          deduplicatedIntValues[i] = intValues[index[i]];
+        }
+        return deduplicatedIntValues;
+      case DATE:
+        final LocalDate[] dateValues = (LocalDate[]) valueList;
+        final LocalDate[] deduplicatedDateValues = new 
LocalDate[dateValues.length];
+        for (int i = 0; i < deduplicatedSize; i++) {
+          deduplicatedDateValues[i] = dateValues[index[i]];
+        }
+        return deduplicatedDateValues;
+      case INT64:
+      case TIMESTAMP:
+        final long[] longValues = (long[]) valueList;
+        final long[] deduplicatedLongValues = new long[longValues.length];
+        for (int i = 0; i < deduplicatedSize; i++) {
+          deduplicatedLongValues[i] = longValues[index[i]];
+        }
+        return deduplicatedLongValues;
+      case FLOAT:
+        final float[] floatValues = (float[]) valueList;
+        final float[] deduplicatedFloatValues = new float[floatValues.length];
+        for (int i = 0; i < deduplicatedSize; i++) {
+          deduplicatedFloatValues[i] = floatValues[index[i]];
+        }
+        return deduplicatedFloatValues;
+      case DOUBLE:
+        final double[] doubleValues = (double[]) valueList;
+        final double[] deduplicatedDoubleValues = new 
double[doubleValues.length];
+        for (int i = 0; i < deduplicatedSize; i++) {
+          deduplicatedDoubleValues[i] = doubleValues[index[i]];
+        }
+        return deduplicatedDoubleValues;
+      case TEXT:
+      case BLOB:
+      case STRING:
+        final Binary[] binaryValues = (Binary[]) valueList;
+        final Binary[] deduplicatedBinaryValues = new 
Binary[binaryValues.length];
+        for (int i = 0; i < deduplicatedSize; i++) {
+          deduplicatedBinaryValues[i] = binaryValues[index[i]];
+        }
+        return deduplicatedBinaryValues;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Data type %s is not supported.", dataType));
+    }
+  }
+
+  private static BitMap reorderBitMap(
+      int deduplicatedSize, final BitMap bitMap, final Integer[] index) {
+    final BitMap deduplicatedBitMap = new BitMap(bitMap.getSize());
+    for (int i = 0; i < deduplicatedSize; i++) {
+      if (bitMap.isMarked(index[i])) {
+        deduplicatedBitMap.mark(i);
+      }
+    }
+    return deduplicatedBitMap;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java
new file mode 100644
index 00000000000..e58bcf1c294
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector;
+
+import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class PipeTabletEventSorterTest {
+
+  private static boolean checkSorted(final Tablet tablet) {
+    for (int i = 1; i < tablet.rowSize; i++) {
+      if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Test
+  public void testDeduplicateAndSort() {
+    List<MeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+    Tablet tablet = new Tablet("root.sg.device", schemaList, 30);
+
+    long timestamp = 300;
+    for (long i = 0; i < 10; i++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp + i);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, 
timestamp + i);
+      }
+
+      rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp - i);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, 
timestamp - i);
+      }
+
+      rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, 
timestamp);
+      }
+    }
+
+    Set<Integer> indices = new HashSet<>();
+    for (int i = 0; i < 30; i++) {
+      indices.add((int) tablet.timestamps[i]);
+    }
+
+    Assert.assertFalse(checkSorted(tablet));
+
+    new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+
+    Assert.assertTrue(checkSorted(tablet));
+
+    Assert.assertEquals(indices.size(), tablet.rowSize);
+
+    final long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0, 
tablet.rowSize);
+    for (int i = 0; i < 3; ++i) {
+      Assert.assertArrayEquals(
+          timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, 
tablet.rowSize));
+    }
+
+    for (int i = 1; i < tablet.rowSize; ++i) {
+      Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
+      for (int j = 0; j < 3; ++j) {
+        Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[]) 
tablet.values[j])[i - 1]);
+      }
+    }
+  }
+
+  @Test
+  public void testDeduplicate() {
+    List<MeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+    Tablet tablet = new Tablet("root.sg.device", schemaList, 10);
+
+    long timestamp = 300;
+    for (long i = 0; i < 10; i++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, 
timestamp);
+      }
+    }
+
+    Set<Integer> indices = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      indices.add((int) tablet.timestamps[i]);
+    }
+
+    Assert.assertTrue(checkSorted(tablet));
+
+    new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+
+    Assert.assertTrue(checkSorted(tablet));
+
+    Assert.assertEquals(indices.size(), tablet.rowSize);
+
+    final long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0, 
tablet.rowSize);
+    for (int i = 0; i < 3; ++i) {
+      Assert.assertArrayEquals(
+          timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, 
tablet.rowSize));
+    }
+
+    for (int i = 1; i < tablet.rowSize; ++i) {
+      Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
+      for (int j = 0; j < 3; ++j) {
+        Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[]) 
tablet.values[j])[i - 1]);
+      }
+    }
+  }
+
+  @Test
+  public void testSort() {
+    List<MeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+    Tablet tablet = new Tablet("root.sg.device", schemaList, 30);
+
+    for (long i = 0; i < 10; i++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, (long) rowIndex + 2);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, (long) 
rowIndex + 2);
+      }
+
+      rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, (long) rowIndex);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, (long) 
rowIndex);
+      }
+
+      rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, (long) rowIndex - 2);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, (long) 
rowIndex - 2);
+      }
+    }
+
+    Set<Integer> indices = new HashSet<>();
+    for (int i = 0; i < 30; i++) {
+      indices.add((int) tablet.timestamps[i]);
+    }
+
+    Assert.assertFalse(checkSorted(tablet));
+
+    long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0, 
tablet.rowSize);
+    for (int i = 0; i < 3; ++i) {
+      Assert.assertArrayEquals(
+          timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, 
tablet.rowSize));
+    }
+
+    for (int i = 1; i < tablet.rowSize; ++i) {
+      Assert.assertTrue(timestamps[i] != timestamps[i - 1]);
+      for (int j = 0; j < 3; ++j) {
+        Assert.assertTrue(((long[]) tablet.values[j])[i] != ((long[]) 
tablet.values[j])[i - 1]);
+      }
+    }
+
+    new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+
+    Assert.assertTrue(checkSorted(tablet));
+
+    Assert.assertEquals(indices.size(), tablet.rowSize);
+
+    timestamps = Arrays.copyOfRange(tablet.timestamps, 0, tablet.rowSize);
+    for (int i = 0; i < 3; ++i) {
+      Assert.assertArrayEquals(
+          timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, 
tablet.rowSize));
+    }
+
+    for (int i = 1; i < tablet.rowSize; ++i) {
+      Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
+      for (int j = 0; j < 3; ++j) {
+        Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[]) 
tablet.values[j])[i - 1]);
+      }
+    }
+  }
+}

Reply via email to