This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fa3eb6395c Pipe: Fixed the bug that empty tsfile may not be marked 
when already closed & Improved exception.conflict.resolve-strategy parsing & 
Improved config/schema region snapshot listenning logger & Refactor (#12265)
0fa3eb6395c is described below

commit 0fa3eb6395c11e2e233534c6523b6fbcf67b4e2b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 2 17:54:53 2024 +0800

    Pipe: Fixed the bug that empty tsfile may not be marked when already closed 
& Improved exception.conflict.resolve-strategy parsing & Improved config/schema 
region snapshot listenning logger & Refactor (#12265)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../parameter/PipeParameterValidator.java          |  22 +--
 .../statemachine/ConfigRegionStateMachine.java     |  18 ++-
 .../schemaregion/SchemaRegionStateMachine.java     |   8 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  42 ++++--
 .../realtime/PipeRealtimeDataRegionExtractor.java  |   2 +-
 .../visitor/PipeStatementExceptionVisitor.java     |  21 +++
 .../dataregion/memtable/TsFileProcessor.java       | 163 ++++++++++-----------
 .../pipe/connector/PipeReceiverStatusHandler.java  |  10 +-
 .../pipe/connector/protocol/IoTDBConnector.java    |  62 +++++---
 9 files changed, 195 insertions(+), 153 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
index a7f6401c64e..01e9507f7fe 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
@@ -28,7 +28,7 @@ public class PipeParameterValidator {
 
   private final PipeParameters parameters;
 
-  public PipeParameterValidator(PipeParameters parameters) {
+  public PipeParameterValidator(final PipeParameters parameters) {
     this.parameters = parameters;
   }
 
@@ -43,7 +43,7 @@ public class PipeParameterValidator {
    * @param key key of the attribute
    * @throws PipeAttributeNotProvidedException if the attribute is not provided
    */
-  public PipeParameterValidator validateRequiredAttribute(String key)
+  public PipeParameterValidator validateRequiredAttribute(final String key)
       throws PipeAttributeNotProvidedException {
     if (!parameters.hasAttribute(key)) {
       throw new PipeAttributeNotProvidedException(key);
@@ -52,7 +52,7 @@ public class PipeParameterValidator {
   }
 
   public PipeParameterValidator validateAttributeValueRange(
-      String key, boolean canBeOptional, String... optionalValues)
+      final String key, final boolean canBeOptional, final String... 
optionalValues)
       throws PipeAttributeNotProvidedException {
     if (!parameters.hasAttribute(key)) {
       if (!canBeOptional) {
@@ -81,9 +81,9 @@ public class PipeParameterValidator {
    * @throws PipeParameterNotValidException if the given argument is not valid
    */
   public PipeParameterValidator validate(
-      PipeParameterValidator.SingleObjectValidationRule validationRule,
-      String messageToThrow,
-      Object argument)
+      final PipeParameterValidator.SingleObjectValidationRule validationRule,
+      final String messageToThrow,
+      final Object argument)
       throws PipeParameterNotValidException {
     if (!validationRule.validate(argument)) {
       throw new PipeParameterNotValidException(messageToThrow);
@@ -93,7 +93,7 @@ public class PipeParameterValidator {
 
   public interface SingleObjectValidationRule {
 
-    boolean validate(Object arg);
+    boolean validate(final Object arg);
   }
 
   /**
@@ -105,9 +105,9 @@ public class PipeParameterValidator {
    * @throws PipeParameterNotValidException if the given arguments are not 
valid
    */
   public PipeParameterValidator validate(
-      PipeParameterValidator.MultipleObjectsValidationRule validationRule,
-      String messageToThrow,
-      Object... arguments)
+      final PipeParameterValidator.MultipleObjectsValidationRule 
validationRule,
+      final String messageToThrow,
+      final Object... arguments)
       throws PipeParameterNotValidException {
     if (!validationRule.validate(arguments)) {
       throw new PipeParameterNotValidException(messageToThrow);
@@ -117,6 +117,6 @@ public class PipeParameterValidator {
 
   public interface MultipleObjectsValidationRule {
 
-    boolean validate(Object... args);
+    boolean validate(final Object... args);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index aca6ade457a..039624e49ce 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -205,9 +205,11 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
             .tryListenToSnapshots(ConfignodeSnapshotParser.getSnapshots());
         return true;
       } catch (IOException e) {
-        LOGGER.error(
-            "Config Region Listening Queue Listen to snapshot failed, the 
historical data may not be transferred.",
-            e);
+        if (PipeConfigNodeAgent.runtime().listener().isOpened()) {
+          LOGGER.warn(
+              "Config Region Listening Queue Listen to snapshot failed, the 
historical data may not be transferred.",
+              e);
+        }
       }
     }
     return false;
@@ -223,9 +225,11 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
           .listener()
           .tryListenToSnapshots(ConfignodeSnapshotParser.getSnapshots());
     } catch (IOException e) {
-      LOGGER.error(
-          "Config Region Listening Queue Listen to snapshot failed when 
startup, snapshot will be tried again when starting schema transferring pipes",
-          e);
+      if (PipeConfigNodeAgent.runtime().listener().isOpened()) {
+        LOGGER.warn(
+            "Config Region Listening Queue Listen to snapshot failed when 
startup, snapshot will be tried again when starting schema transferring pipes",
+            e);
+      }
     }
   }
 
@@ -412,7 +416,7 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
         startIndex = endIndex;
         while (logReader.hasNext()) {
           endIndex++;
-          // read and re-serialize the PhysicalPlan
+          // Read and re-serialize the PhysicalPlan
           ConfigPhysicalPlan nextPlan = logReader.next();
           try {
             TSStatus status = executor.executeNonQueryPlan(nextPlan);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
index f78b835d04b..3044c08be8d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
@@ -143,9 +143,11 @@ public class SchemaRegionStateMachine extends 
BaseStateMachine {
     SchemaRegionListeningQueue listener =
         PipeAgent.runtime().schemaListener(schemaRegion.getSchemaRegionId());
     if (Objects.isNull(snapshotPaths) || 
Objects.isNull(snapshotPaths.getLeft())) {
-      logger.error(
-          "Schema Region Listening Queue Listen to snapshot failed, the 
historical data may not be transferred. snapshotPaths:{}",
-          snapshotPaths);
+      if (listener.isOpened()) {
+        logger.warn(
+            "Schema Region Listening Queue Listen to snapshot failed, the 
historical data may not be transferred. snapshotPaths:{}",
+            snapshotPaths);
+      }
       return;
     }
     listener.tryListenToSnapshot(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 5bf8d979080..8e3fa5bd1f1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -44,8 +44,6 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
 
-  private boolean isTsFileFormatValid = true;
-
   private final TsFileResource resource;
   private File tsFile;
 
@@ -97,27 +95,42 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
     this.isGeneratedByPipe = isGeneratedByPipe;
 
     isClosed = new AtomicBoolean(resource.isClosed());
-    // register close listener if TsFile is not closed
+    // Register close listener if TsFile is not closed
     if (!isClosed.get()) {
       final TsFileProcessor processor = resource.getProcessor();
       if (processor != null) {
         processor.addCloseFileListener(
             o -> {
               synchronized (isClosed) {
-                isTsFileFormatValid = o.isTsFileFormatValidForPipe();
                 isClosed.set(true);
                 isClosed.notifyAll();
               }
             });
       }
     }
-    // check again after register close listener in case TsFile is closed 
during the process
+    // Check again after register close listener in case TsFile is closed 
during the process
+    // TsFile flushing steps:
+    // 1. Flush tsFile
+    // 2. First listener (Set resource status "closed" -> Set processor == 
null -> processor == null
+    // is seen)
+    // 3. Other listeners (Set "closed" status for events)
+    // Then we can imply that:
+    // 1. If the listener cannot be executed because all listeners passed, 
then resources status is
+    // set "closed" and can be set here
+    // 2. If the listener cannot be executed because processor == null is 
seen, then resources
+    // status is set "closed" and can be set here
+    // Then we know:
+    // 1. The status in the event can be closed eventually.
+    // 2. If the status is "closed", then the resource status is "closed".
+    // Then we know:
+    // If the status is "closed", then the resource status is "closed", the 
tsFile won't be altered
+    // and can be sent.
     isClosed.set(resource.isClosed());
   }
 
   /**
-   * @return {@code false} if this file can't be sent by pipe due to format 
violations or is empty.
-   *     {@code true} otherwise.
+   * @return {@code false} if this file can't be sent by pipe because it is 
empty. {@code true}
+   *     otherwise.
    */
   public boolean waitForTsFileClose() throws InterruptedException {
     if (!isClosed.get()) {
@@ -127,7 +140,10 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
         }
       }
     }
-    return isTsFileFormatValid;
+    // From illustrations above we know If the status is "closed", then the 
tsFile is flushed
+    // And here we guarantee that the isEmpty() is set before flushing if 
tsFile is empty
+    // Then we know: "isClosed" --> tsFile flushed --> (isEmpty() <--> tsFile 
is empty)
+    return !resource.isEmpty();
   }
 
   public File getTsFile() {
@@ -321,14 +337,8 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
   @Override
   public String toString() {
     return String.format(
-            "PipeTsFileInsertionEvent{isTsFileFormatValid=%s, resource=%s, 
tsFile=%s, isLoaded=%s, isGeneratedByPipe=%s, isClosed=%s, dataContainer=%s}",
-            isTsFileFormatValid,
-            resource,
-            tsFile,
-            isLoaded,
-            isGeneratedByPipe,
-            isClosed.get(),
-            dataContainer)
+            "PipeTsFileInsertionEvent{resource=%s, tsFile=%s, isLoaded=%s, 
isGeneratedByPipe=%s, isClosed=%s, dataContainer=%s}",
+            resource, tsFile, isLoaded, isGeneratedByPipe, isClosed.get(), 
dataContainer)
         + " - "
         + super.toString();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index 0f2492fa4ac..a10a5f7a1ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -242,7 +242,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
       if (isDataRegionTimePartitionCoveredByTimeRange()) {
         event.skipParsingTime();
       } else {
-        // Since we only record the upper and lower bounds that time partition 
have ever reached, if
+        // Since we only record the upper and lower bounds that time partition 
has ever reached, if
         // the time partition cannot be covered by the time range during 
query, it will not be
         // possible later.
         disableSkippingTimeParse = true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
index c99b2fb41dc..1ae7d106dc5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
@@ -27,7 +27,10 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateTimeSeriesStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTimeSeriesStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
@@ -69,6 +72,24 @@ public class PipeStatementExceptionVisitor extends 
StatementVisitor<TSStatus, Ex
     return visitGeneralCreateTimeSeries(statement, context);
   }
 
+  @Override
+  public TSStatus visitCreateMultiTimeseries(
+      CreateMultiTimeSeriesStatement statement, Exception context) {
+    return visitGeneralCreateTimeSeries(statement, context);
+  }
+
+  @Override
+  public TSStatus visitInternalCreateTimeseries(
+      InternalCreateTimeSeriesStatement statement, Exception context) {
+    return visitGeneralCreateTimeSeries(statement, context);
+  }
+
+  @Override
+  public TSStatus visitInternalCreateMultiTimeSeries(
+      InternalCreateMultiTimeSeriesStatement statement, Exception context) {
+    return visitGeneralCreateTimeSeries(statement, context);
+  }
+
   private TSStatus visitGeneralCreateTimeSeries(Statement statement, Exception 
context) {
     if (context instanceof SemanticException) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 71d93647a0d..d6b753705d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -99,40 +100,40 @@ import static 
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.WORK
 @SuppressWarnings("java:S1135") // ignore todos
 public class TsFileProcessor {
 
-  /** logger fot this class. */
+  /** Logger fot this class. */
   private static final Logger logger = 
LoggerFactory.getLogger(TsFileProcessor.class);
 
-  /** storgae group name of this tsfile. */
+  /** Storage group name of this tsfile. */
   private final String storageGroupName;
 
   /** IoTDB config. */
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  /** database info for mem control. */
+  /** Database info for mem control. */
   private final DataRegionInfo dataRegionInfo;
-  /** tsfile processor info for mem control. */
+  /** Tsfile processor info for mem control. */
   private TsFileProcessorInfo tsFileProcessorInfo;
 
-  /** sync this object in read() and asyncTryToFlush(). */
+  /** Sync this object in read() and asyncTryToFlush(). */
   private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new 
ConcurrentLinkedDeque<>();
 
-  /** modification to memtable mapping. */
+  /** Modification to memtable mapping. */
   private final List<Pair<Modification, IMemTable>> modsToMemtable = new 
ArrayList<>();
 
-  /** writer for restore tsfile and flushing. */
+  /** Writer for restore tsfile and flushing. */
   private RestorableTsFileIOWriter writer;
 
-  /** tsfile resource for index this tsfile. */
+  /** Tsfile resource for index this tsfile. */
   private final TsFileResource tsFileResource;
 
-  /** time range index to indicate this processor belongs to which time range 
*/
+  /** Time range index to indicate this processor belongs to which time range 
*/
   private long timeRangeId;
   /**
    * Whether the processor is in the queue of the FlushManager or being 
flushed by a flush thread.
    */
   private volatile boolean managedByFlushManager;
 
-  /** a lock to mutual exclude read and read */
+  /** A lock to mutual exclude read and read */
   private final ReadWriteLock flushQueryLock = new ReentrantReadWriteLock();
   /**
    * It is set by the StorageGroupProcessor and checked by flush threads. (If 
shouldClose == true
@@ -140,38 +141,32 @@ public class TsFileProcessor {
    */
   private volatile boolean shouldClose;
 
-  /** working memtable. */
+  /** Working memtable. */
   private IMemTable workMemTable;
 
-  /** last flush time to flush the working memtable. */
+  /** Last flush time to flush the working memtable. */
   private long lastWorkMemtableFlushTime;
 
-  /** this callback is called before the workMemtable is added into the 
flushingMemTables. */
+  /** This callback is called before the workMemtable is added into the 
flushingMemTables. */
   private final DataRegion.UpdateEndTimeCallBack updateLatestFlushTimeCallback;
 
-  /** wal node. */
+  /** Wal node. */
   private final IWALNode walNode;
 
-  /** whether it's a sequence file or not. */
+  /** Whether it's a sequence file or not. */
   private final boolean sequence;
 
-  /** total memtable size for mem control. */
+  /** Total memtable size for mem control. */
   private long totalMemTableSize;
 
   private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get 
flushQueryLock write lock";
   private static final String FLUSH_QUERY_WRITE_RELEASE =
       "{}: {} get flushQueryLock write lock released";
 
-  /**
-   * Whether this file keeps TsFile format. If the file violates TsFile 
format, then it shouldn't be
-   * captured by pipe engine.
-   */
-  private boolean isTsFileFormatValidForPipe = true;
-
-  /** close file listener. */
+  /** Close file listener. */
   private final List<CloseFileListener> closeFileListeners = new 
CopyOnWriteArrayList<>();
 
-  /** flush file listener. */
+  /** Flush file listener. */
   private final List<FlushListener> flushListeners = new ArrayList<>();
 
   private final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
@@ -233,7 +228,7 @@ public class TsFileProcessor {
   }
 
   /**
-   * insert data in an InsertRowNode into the workingMemtable.
+   * Insert data in an InsertRowNode into the workingMemtable.
    *
    * @param insertRowNode physical plan of insertion
    */
@@ -249,7 +244,7 @@ public class TsFileProcessor {
           
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
     }
 
-    long[] memIncrements = null;
+    long[] memIncrements;
 
     long memControlStartTime = System.nanoTime();
     if (insertRowNode.isAligned()) {
@@ -304,16 +299,16 @@ public class TsFileProcessor {
       workMemTable.insert(insertRowNode);
     }
 
-    // update start time of this memtable
+    // Update start time of this memtable
     tsFileResource.updateStartTime(insertRowNode.getDeviceID(), 
insertRowNode.getTime());
-    // for sequence tsfile, we update the endTime only when the file is 
prepared to be closed.
-    // for unsequence tsfile, we have to update the endTime for each insertion.
+    // For sequence tsfile, we update the endTime only when the file is 
prepared to be closed.
+    // For unsequence tsfile, we have to update the endTime for each insertion.
     if (!sequence) {
       tsFileResource.updateEndTime(insertRowNode.getDeviceID(), 
insertRowNode.getTime());
     }
 
     tsFileResource.updateProgressIndex(insertRowNode.getProgressIndex());
-    // recordScheduleMemTableCost
+    // RecordScheduleMemTableCost
     costsForMetrics[3] += System.nanoTime() - startTime;
   }
 
@@ -327,7 +322,7 @@ public class TsFileProcessor {
   }
 
   /**
-   * insert batch data of insertTabletPlan into the workingMemtable. The rows 
to be inserted are in
+   * Insert batch data of insertTabletPlan into the workingMemtable. The rows 
to be inserted are in
    * the range [start, end). Null value in each column values will be replaced 
by the subsequent
    * non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5}
    *
@@ -348,7 +343,7 @@ public class TsFileProcessor {
           
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
     }
 
-    long[] memIncrements = null;
+    long[] memIncrements;
     try {
       long startTime = System.nanoTime();
       if (insertTabletNode.isAligned()) {
@@ -426,8 +421,8 @@ public class TsFileProcessor {
 
     tsFileResource.updateStartTime(
         insertTabletNode.getDeviceID(), insertTabletNode.getTimes()[start]);
-    // for sequence tsfile, we update the endTime only when the file is 
prepared to be closed.
-    // for unsequence tsfile, we have to update the endTime for each insertion.
+    // For sequence tsfile, we update the endTime only when the file is 
prepared to be closed.
+    // For unsequence tsfile, we have to update the endTime for each insertion.
     if (!sequence) {
       tsFileResource.updateEndTime(
           insertTabletNode.getDeviceID(), insertTabletNode.getTimes()[end - 
1]);
@@ -438,17 +433,17 @@ public class TsFileProcessor {
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(System.nanoTime() 
- startTime);
   }
 
-  @SuppressWarnings("squid:S3776") // high Cognitive Complexity
+  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
   private long[] checkMemCostAndAddToTspInfo(
       IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes, 
Object[] values)
       throws WriteProcessException {
-    // memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
+    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
     long memTableIncrement = 0L;
     long textDataIncrement = 0L;
     long chunkMetadataIncrement = 0L;
 
     for (int i = 0; i < dataTypes.length; i++) {
-      // skip failed Measurements
+      // Skip failed Measurements
       if (dataTypes[i] == null || measurements[i] == null) {
         continue;
       }
@@ -477,20 +472,20 @@ public class TsFileProcessor {
   private long[] checkAlignedMemCostAndAddToTspInfo(
       IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes, 
Object[] values)
       throws WriteProcessException {
-    // memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
+    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
     long memTableIncrement = 0L;
     long textDataIncrement = 0L;
     long chunkMetadataIncrement = 0L;
 
     if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)) {
-      // for new device of this mem table
+      // For new device of this mem table
       // ChunkMetadataIncrement
       chunkMetadataIncrement +=
           ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, 
TSDataType.VECTOR)
               * dataTypes.length;
       memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
       for (int i = 0; i < dataTypes.length; i++) {
-        // skip failed Measurements
+        // Skip failed Measurements
         if (dataTypes[i] == null || measurements[i] == null) {
           continue;
         }
@@ -500,18 +495,18 @@ public class TsFileProcessor {
         }
       }
     } else {
-      // for existed device of this mem table
+      // For existed device of this mem table
       AlignedWritableMemChunk alignedMemChunk =
           ((AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId))
               .getAlignedMemChunk();
       List<TSDataType> dataTypesInTVList = new ArrayList<>();
       for (int i = 0; i < dataTypes.length; i++) {
-        // skip failed Measurements
+        // Skip failed Measurements
         if (dataTypes[i] == null || measurements[i] == null) {
           continue;
         }
 
-        // extending the column of aligned mem chunk
+        // Extending the column of aligned mem chunk
         if (!alignedMemChunk.containsMeasurement(measurements[i])) {
           memTableIncrement +=
               (alignedMemChunk.alignedListSize() / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
@@ -523,7 +518,7 @@ public class TsFileProcessor {
           textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
         }
       }
-      // here currentChunkPointNum >= 1
+      // Here currentChunkPointNum >= 1
       if ((alignedMemChunk.alignedListSize() % 
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
         dataTypesInTVList.addAll(((AlignedTVList) 
alignedMemChunk.getTVList()).getTsDataTypes());
         memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
@@ -547,7 +542,7 @@ public class TsFileProcessor {
     long[] memIncrements = new long[3]; // memTable, text, chunk metadata
 
     for (int i = 0; i < dataTypes.length; i++) {
-      // skip failed Measurements
+      // Skip failed Measurements
       if (dataTypes[i] == null || columns[i] == null || measurements[i] == 
null) {
         continue;
       }
@@ -662,7 +657,7 @@ public class TsFileProcessor {
         if (dataType == null || column == null || measurement == null) {
           continue;
         }
-        // extending the column of aligned mem chunk
+        // Extending the column of aligned mem chunk
         if (!alignedMemChunk.containsMeasurement(measurementIds[i])) {
           memIncrements[0] +=
               (alignedMemChunk.alignedListSize() / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
@@ -753,7 +748,7 @@ public class TsFileProcessor {
               deletion.getPath(), device, deletion.getStartTime(), 
deletion.getEndTime());
         }
       }
-      // flushing memTables are immutable, only record this deletion in these 
memTables for read
+      // Flushing memTables are immutable, only record this deletion in these 
memTables for read
       if (!flushingMemTables.isEmpty()) {
         modsToMemtable.add(new Pair<>(deletion, flushingMemTables.getLast()));
       }
@@ -961,7 +956,7 @@ public class TsFileProcessor {
     }
   }
 
-  /** put the working memtable into flushing list and set the working memtable 
to null */
+  /** Put the working memtable into flushing list and set the working memtable 
to null */
   public void asyncFlush() {
     flushQueryLock.writeLock().lock();
     if (logger.isDebugEnabled()) {
@@ -991,21 +986,21 @@ public class TsFileProcessor {
   }
 
   /**
-   * this method calls updateLatestFlushTimeCallback and move the given 
memtable into the flushing
+   * This method calls updateLatestFlushTimeCallback and move the given 
memtable into the flushing
    * queue, set the current working memtable as null and then register the 
tsfileProcessor into the
    * flushManager again.
    */
   private Future<?> addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws 
IOException {
-    Map<IDeviceID, Long> lastTimeForEachDevice = new HashMap<>();
-    if (sequence) {
-      lastTimeForEachDevice = tobeFlushed.getMaxTime();
-      // If some devices have been removed in MemTable, the number of device 
in MemTable and
-      // tsFileResource will not be the same. And the endTime of these devices 
in resource will be
-      // Long.minValue.
-      // In the case, we need to delete the removed devices in tsFileResource.
-      if (lastTimeForEachDevice.size() != tsFileResource.getDevices().size()) {
-        
tsFileResource.deleteRemovedDeviceAndUpdateEndTime(lastTimeForEachDevice);
-      } else {
+    final Map<IDeviceID, Long> lastTimeForEachDevice = 
tobeFlushed.getMaxTime();
+
+    // If some devices have been removed in MemTable, the number of device in 
MemTable and
+    // tsFileResource will not be the same. And the endTime of these devices 
in resource will be
+    // Long.minValue.
+    // In the case, we need to delete the removed devices in tsFileResource.
+    if (lastTimeForEachDevice.size() != tsFileResource.getDevices().size()) {
+      
tsFileResource.deleteRemovedDeviceAndUpdateEndTime(lastTimeForEachDevice);
+    } else {
+      if (sequence) {
         tsFileResource.updateEndTime(lastTimeForEachDevice);
       }
     }
@@ -1039,7 +1034,7 @@ public class TsFileProcessor {
     return FlushManager.getInstance().registerTsFileProcessor(this);
   }
 
-  /** put back the memtable to MemTablePool and make metadata in writer 
visible */
+  /** Put back the memtable to MemTablePool and make metadata in writer 
visible */
   private void releaseFlushedMemTable(IMemTable memTable) {
     flushQueryLock.writeLock().lock();
     if (logger.isDebugEnabled()) {
@@ -1064,7 +1059,7 @@ public class TsFileProcessor {
       }
       memTable.release();
       MemTableManager.getInstance().decreaseMemtableNumber();
-      // reset the mem cost in StorageGroupProcessorInfo
+      // Reset the mem cost in StorageGroupProcessorInfo
       dataRegionInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
       if (logger.isDebugEnabled()) {
         logger.debug(
@@ -1074,7 +1069,7 @@ public class TsFileProcessor {
             tsFileResource.getTsFile().getName(),
             flushingMemTables.size());
       }
-      // report to System
+      // Report to System
       SystemInfo.getInstance().resetStorageGroupStatus(dataRegionInfo);
       
SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost());
       if (logger.isDebugEnabled()) {
@@ -1120,7 +1115,7 @@ public class TsFileProcessor {
   public void flushOneMemTable() {
     IMemTable memTableToFlush = flushingMemTables.getFirst();
 
-    // signal memtable only may appear when calling asyncClose()
+    // Signal memtable only may appear when calling asyncClose()
     if (!memTableToFlush.isSignalMemTable()) {
       if (memTableToFlush.isEmpty()) {
         logger.info(
@@ -1167,16 +1162,16 @@ public class TsFileProcessor {
                   tsFileResource.getTsFile().getAbsolutePath(),
                   e1);
             }
-            // release resource
+            // Release resource
             try {
               syncReleaseFlushedMemTable(memTableToFlush);
-              // make sure no read will search this file
+              // Make sure no read will search this file
               
tsFileResource.setTimeIndex(config.getTimeIndexLevel().getTimeIndex());
-              // this callback method will register this empty tsfile into 
TsFileManager
+              // This callback method will register this empty tsfile into 
TsFileManager
               for (CloseFileListener closeFileListener : closeFileListeners) {
                 closeFileListener.onClosed(this);
               }
-              // close writer
+              // Close writer
               writer.close();
               writer = null;
               synchronized (flushingMemTables) {
@@ -1229,7 +1224,7 @@ public class TsFileProcessor {
           memTableToFlush.isSignalMemTable());
     }
 
-    // for sync flush
+    // For sync flush
     syncReleaseFlushedMemTable(memTableToFlush);
     try {
       writer.getTsFileOutput().force();
@@ -1237,11 +1232,11 @@ public class TsFileProcessor {
       logger.error("fsync memTable data to disk error,", e);
     }
 
-    // call flushed listener after memtable is released safely
+    // Call flushed listener after memtable is released safely
     for (FlushListener flushListener : flushListeners) {
       flushListener.onMemTableFlushed(memTableToFlush);
     }
-    // retry to avoid unnecessary read-only mode
+    // Retry to avoid unnecessary read-only mode
     int retryCnt = 0;
     while (shouldClose && flushingMemTables.isEmpty() && writer != null) {
       try {
@@ -1267,7 +1262,7 @@ public class TsFileProcessor {
             storageGroupName,
             tsFileResource.getTsFile().getAbsolutePath(),
             e);
-        // truncate broken metadata
+        // Truncate broken metadata
         try {
           writer.reset();
         } catch (IOException e1) {
@@ -1277,7 +1272,7 @@ public class TsFileProcessor {
               tsFileResource.getTsFile().getAbsolutePath(),
               e1);
         }
-        // retry or set read-only
+        // Retry or set read-only
         if (retryCnt < 3) {
           logger.warn(
               "{} meet error when flush FileMetadata to {}, retry it again",
@@ -1296,7 +1291,7 @@ public class TsFileProcessor {
           break;
         }
       }
-      // for sync close
+      // For sync close
       if (logger.isDebugEnabled()) {
         logger.debug(
             "{}: {} try to get flushingMemtables lock.",
@@ -1341,8 +1336,8 @@ public class TsFileProcessor {
     if (logger.isDebugEnabled()) {
       logger.debug("Ended file {}", tsFileResource);
     }
-    // remove this processor from Closing list in StorageGroupProcessor,
-    // mark the TsFileResource closed, no need writer anymore
+    // Remove this processor from Closing list in StorageGroupProcessor,
+    // Mark the TsFileResource closed, no need writer anymore
     for (CloseFileListener closeFileListener : closeFileListeners) {
       closeFileListener.onClosed(this);
     }
@@ -1357,10 +1352,9 @@ public class TsFileProcessor {
   private void endEmptyFile() throws TsFileProcessorException, IOException {
     logger.info("Start to end empty file {}", tsFileResource);
 
-    // remove this processor from Closing list in DataRegion,
-    // mark the TsFileResource closed, no need writer anymore
+    // Remove this processor from Closing list in DataRegion,
+    // Mark the TsFileResource closed, no need writer anymore
     writer.close();
-    isTsFileFormatValidForPipe = false; // empty file, no need to be captured 
by pipe
     for (CloseFileListener closeFileListener : closeFileListeners) {
       closeFileListener.onClosed(this);
     }
@@ -1374,11 +1368,6 @@ public class TsFileProcessor {
     writer = null;
   }
 
-  /** Only useful after TsFile is closed. */
-  public boolean isTsFileFormatValidForPipe() {
-    return isTsFileFormatValidForPipe;
-  }
-
   public boolean isManagedByFlushManager() {
     return managedByFlushManager;
   }
@@ -1387,10 +1376,10 @@ public class TsFileProcessor {
     this.managedByFlushManager = managedByFlushManager;
   }
 
-  /** close this tsfile */
+  /** Close this tsfile */
   public void close() throws TsFileProcessorException {
     try {
-      // when closing resource file, its corresponding mod file is also closed.
+      // When closing resource file, its corresponding mod file is also closed.
       tsFileResource.close();
     } catch (IOException e) {
       throw new TsFileProcessorException(e);
@@ -1410,7 +1399,7 @@ public class TsFileProcessor {
   }
 
   /**
-   * get the chunk(s) in the memtable (one from work memtable and the other 
ones in flushing
+   * Get the chunk(s) in the memtable (one from work memtable and the other 
ones in flushing
    * memtables and then compact them into one TimeValuePairSorter). Then get 
the related
    * ChunkMetadata of data on disk.
    *
@@ -1498,7 +1487,7 @@ public class TsFileProcessor {
     this.timeRangeId = timeRangeId;
   }
 
-  /** release resource of a memtable */
+  /** Release resource of a memtable */
   public void putMemTableBackAndClose() throws TsFileProcessorException {
     if (workMemTable != null) {
       workMemTable.release();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/PipeReceiverStatusHandler.java
index 0cb9af17c81..287fa49a996 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/PipeReceiverStatusHandler.java
@@ -56,11 +56,11 @@ public class PipeReceiverStatusHandler {
   private final AtomicReference<String> exceptionRecordedMessage = new 
AtomicReference<>("");
 
   public PipeReceiverStatusHandler(
-      boolean isRetryAllowedWhenConflictOccurs,
-      long retryMaxSecondsWhenConflictOccurs,
-      boolean shouldRecordIgnoredDataWhenConflictOccurs,
-      long retryMaxSecondsWhenOtherExceptionsOccur,
-      boolean shouldRecordIgnoredDataWhenOtherExceptionsOccur) {
+      final boolean isRetryAllowedWhenConflictOccurs,
+      final long retryMaxSecondsWhenConflictOccurs,
+      final boolean shouldRecordIgnoredDataWhenConflictOccurs,
+      final long retryMaxSecondsWhenOtherExceptionsOccur,
+      final boolean shouldRecordIgnoredDataWhenOtherExceptionsOccur) {
     this.isRetryAllowedWhenConflictOccurs = isRetryAllowedWhenConflictOccurs;
     this.retryMaxMillisWhenConflictOccurs =
         retryMaxSecondsWhenConflictOccurs < 0
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index b2fdff35385..b2342966c20 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -84,28 +84,43 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     final PipeParameters parameters = validator.getParameters();
-    validator.validate(
-        args ->
-            (boolean) args[0]
-                || (((boolean) args[1] || (boolean) args[2]) && (boolean) 
args[3])
-                || (boolean) args[4]
-                || (((boolean) args[5] || (boolean) args[6]) && (boolean) 
args[7]),
-        String.format(
-            "One of %s, %s:%s, %s, %s:%s must be specified",
-            CONNECTOR_IOTDB_NODE_URLS_KEY,
-            CONNECTOR_IOTDB_HOST_KEY,
-            CONNECTOR_IOTDB_PORT_KEY,
-            SINK_IOTDB_NODE_URLS_KEY,
-            SINK_IOTDB_HOST_KEY,
-            SINK_IOTDB_PORT_KEY),
-        parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
-        parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
-        parameters.hasAttribute(CONNECTOR_IOTDB_HOST_KEY),
-        parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
-        parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
-        parameters.hasAttribute(SINK_IOTDB_IP_KEY),
-        parameters.hasAttribute(SINK_IOTDB_HOST_KEY),
-        parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
+    validator
+        .validate(
+            args ->
+                (boolean) args[0]
+                    || (((boolean) args[1] || (boolean) args[2]) && (boolean) 
args[3])
+                    || (boolean) args[4]
+                    || (((boolean) args[5] || (boolean) args[6]) && (boolean) 
args[7]),
+            String.format(
+                "One of %s, %s:%s, %s, %s:%s must be specified",
+                CONNECTOR_IOTDB_NODE_URLS_KEY,
+                CONNECTOR_IOTDB_HOST_KEY,
+                CONNECTOR_IOTDB_PORT_KEY,
+                SINK_IOTDB_NODE_URLS_KEY,
+                SINK_IOTDB_HOST_KEY,
+                SINK_IOTDB_PORT_KEY),
+            parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
+            parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
+            parameters.hasAttribute(CONNECTOR_IOTDB_HOST_KEY),
+            parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
+            parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
+            parameters.hasAttribute(SINK_IOTDB_IP_KEY),
+            parameters.hasAttribute(SINK_IOTDB_HOST_KEY),
+            parameters.hasAttribute(SINK_IOTDB_PORT_KEY))
+        .validate(
+            arg -> arg.equals("retry") || arg.equals("ignore"),
+            String.format(
+                "The value of key %s or %s must be either 'retry' or 
'ignore'.",
+                CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
+                SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
+            parameters
+                .getStringOrDefault(
+                    Arrays.asList(
+                        CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
+                        SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
+                    
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE)
+                .trim()
+                .toLowerCase());
   }
 
   @Override
@@ -129,7 +144,8 @@ public abstract class IoTDBConnector implements 
PipeConnector {
                         CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
                         SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
                     
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE)
-                .equals("retry"),
+                .trim()
+                .equalsIgnoreCase("retry"),
             parameters.getLongOrDefault(
                 Arrays.asList(
                     CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY,


Reply via email to