This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0fa3eb6395c Pipe: Fixed the bug that empty tsfile may not be marked
when already closed & Improved exception.conflict.resolve-strategy parsing &
Improved config/schema region snapshot listenning logger & Refactor (#12265)
0fa3eb6395c is described below
commit 0fa3eb6395c11e2e233534c6523b6fbcf67b4e2b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 2 17:54:53 2024 +0800
Pipe: Fixed the bug that empty tsfile may not be marked when already closed
& Improved exception.conflict.resolve-strategy parsing & Improved config/schema
region snapshot listenning logger & Refactor (#12265)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../parameter/PipeParameterValidator.java | 22 +--
.../statemachine/ConfigRegionStateMachine.java | 18 ++-
.../schemaregion/SchemaRegionStateMachine.java | 8 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 42 ++++--
.../realtime/PipeRealtimeDataRegionExtractor.java | 2 +-
.../visitor/PipeStatementExceptionVisitor.java | 21 +++
.../dataregion/memtable/TsFileProcessor.java | 163 ++++++++++-----------
.../pipe/connector/PipeReceiverStatusHandler.java | 10 +-
.../pipe/connector/protocol/IoTDBConnector.java | 62 +++++---
9 files changed, 195 insertions(+), 153 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
index a7f6401c64e..01e9507f7fe 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
@@ -28,7 +28,7 @@ public class PipeParameterValidator {
private final PipeParameters parameters;
- public PipeParameterValidator(PipeParameters parameters) {
+ public PipeParameterValidator(final PipeParameters parameters) {
this.parameters = parameters;
}
@@ -43,7 +43,7 @@ public class PipeParameterValidator {
* @param key key of the attribute
* @throws PipeAttributeNotProvidedException if the attribute is not provided
*/
- public PipeParameterValidator validateRequiredAttribute(String key)
+ public PipeParameterValidator validateRequiredAttribute(final String key)
throws PipeAttributeNotProvidedException {
if (!parameters.hasAttribute(key)) {
throw new PipeAttributeNotProvidedException(key);
@@ -52,7 +52,7 @@ public class PipeParameterValidator {
}
public PipeParameterValidator validateAttributeValueRange(
- String key, boolean canBeOptional, String... optionalValues)
+ final String key, final boolean canBeOptional, final String...
optionalValues)
throws PipeAttributeNotProvidedException {
if (!parameters.hasAttribute(key)) {
if (!canBeOptional) {
@@ -81,9 +81,9 @@ public class PipeParameterValidator {
* @throws PipeParameterNotValidException if the given argument is not valid
*/
public PipeParameterValidator validate(
- PipeParameterValidator.SingleObjectValidationRule validationRule,
- String messageToThrow,
- Object argument)
+ final PipeParameterValidator.SingleObjectValidationRule validationRule,
+ final String messageToThrow,
+ final Object argument)
throws PipeParameterNotValidException {
if (!validationRule.validate(argument)) {
throw new PipeParameterNotValidException(messageToThrow);
@@ -93,7 +93,7 @@ public class PipeParameterValidator {
public interface SingleObjectValidationRule {
- boolean validate(Object arg);
+ boolean validate(final Object arg);
}
/**
@@ -105,9 +105,9 @@ public class PipeParameterValidator {
* @throws PipeParameterNotValidException if the given arguments are not
valid
*/
public PipeParameterValidator validate(
- PipeParameterValidator.MultipleObjectsValidationRule validationRule,
- String messageToThrow,
- Object... arguments)
+ final PipeParameterValidator.MultipleObjectsValidationRule
validationRule,
+ final String messageToThrow,
+ final Object... arguments)
throws PipeParameterNotValidException {
if (!validationRule.validate(arguments)) {
throw new PipeParameterNotValidException(messageToThrow);
@@ -117,6 +117,6 @@ public class PipeParameterValidator {
public interface MultipleObjectsValidationRule {
- boolean validate(Object... args);
+ boolean validate(final Object... args);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index aca6ade457a..039624e49ce 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -205,9 +205,11 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
.tryListenToSnapshots(ConfignodeSnapshotParser.getSnapshots());
return true;
} catch (IOException e) {
- LOGGER.error(
- "Config Region Listening Queue Listen to snapshot failed, the
historical data may not be transferred.",
- e);
+ if (PipeConfigNodeAgent.runtime().listener().isOpened()) {
+ LOGGER.warn(
+ "Config Region Listening Queue Listen to snapshot failed, the
historical data may not be transferred.",
+ e);
+ }
}
}
return false;
@@ -223,9 +225,11 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
.listener()
.tryListenToSnapshots(ConfignodeSnapshotParser.getSnapshots());
} catch (IOException e) {
- LOGGER.error(
- "Config Region Listening Queue Listen to snapshot failed when
startup, snapshot will be tried again when starting schema transferring pipes",
- e);
+ if (PipeConfigNodeAgent.runtime().listener().isOpened()) {
+ LOGGER.warn(
+ "Config Region Listening Queue Listen to snapshot failed when
startup, snapshot will be tried again when starting schema transferring pipes",
+ e);
+ }
}
}
@@ -412,7 +416,7 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
startIndex = endIndex;
while (logReader.hasNext()) {
endIndex++;
- // read and re-serialize the PhysicalPlan
+ // Read and re-serialize the PhysicalPlan
ConfigPhysicalPlan nextPlan = logReader.next();
try {
TSStatus status = executor.executeNonQueryPlan(nextPlan);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
index f78b835d04b..3044c08be8d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
@@ -143,9 +143,11 @@ public class SchemaRegionStateMachine extends
BaseStateMachine {
SchemaRegionListeningQueue listener =
PipeAgent.runtime().schemaListener(schemaRegion.getSchemaRegionId());
if (Objects.isNull(snapshotPaths) ||
Objects.isNull(snapshotPaths.getLeft())) {
- logger.error(
- "Schema Region Listening Queue Listen to snapshot failed, the
historical data may not be transferred. snapshotPaths:{}",
- snapshotPaths);
+ if (listener.isOpened()) {
+ logger.warn(
+ "Schema Region Listening Queue Listen to snapshot failed, the
historical data may not be transferred. snapshotPaths:{}",
+ snapshotPaths);
+ }
return;
}
listener.tryListenToSnapshot(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 5bf8d979080..8e3fa5bd1f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -44,8 +44,6 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
- private boolean isTsFileFormatValid = true;
-
private final TsFileResource resource;
private File tsFile;
@@ -97,27 +95,42 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
this.isGeneratedByPipe = isGeneratedByPipe;
isClosed = new AtomicBoolean(resource.isClosed());
- // register close listener if TsFile is not closed
+ // Register close listener if TsFile is not closed
if (!isClosed.get()) {
final TsFileProcessor processor = resource.getProcessor();
if (processor != null) {
processor.addCloseFileListener(
o -> {
synchronized (isClosed) {
- isTsFileFormatValid = o.isTsFileFormatValidForPipe();
isClosed.set(true);
isClosed.notifyAll();
}
});
}
}
- // check again after register close listener in case TsFile is closed
during the process
+ // Check again after register close listener in case TsFile is closed
during the process
+ // TsFile flushing steps:
+ // 1. Flush tsFile
+ // 2. First listener (Set resource status "closed" -> Set processor ==
null -> processor == null
+ // is seen)
+ // 3. Other listeners (Set "closed" status for events)
+ // Then we can imply that:
+ // 1. If the listener cannot be executed because all listeners passed,
then resources status is
+ // set "closed" and can be set here
+ // 2. If the listener cannot be executed because processor == null is
seen, then resources
+ // status is set "closed" and can be set here
+ // Then we know:
+ // 1. The status in the event can be closed eventually.
+ // 2. If the status is "closed", then the resource status is "closed".
+ // Then we know:
+ // If the status is "closed", then the resource status is "closed", the
tsFile won't be altered
+ // and can be sent.
isClosed.set(resource.isClosed());
}
/**
- * @return {@code false} if this file can't be sent by pipe due to format
violations or is empty.
- * {@code true} otherwise.
+ * @return {@code false} if this file can't be sent by pipe because it is
empty. {@code true}
+ * otherwise.
*/
public boolean waitForTsFileClose() throws InterruptedException {
if (!isClosed.get()) {
@@ -127,7 +140,10 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
}
}
}
- return isTsFileFormatValid;
+ // From illustrations above we know If the status is "closed", then the
tsFile is flushed
+ // And here we guarantee that the isEmpty() is set before flushing if
tsFile is empty
+ // Then we know: "isClosed" --> tsFile flushed --> (isEmpty() <--> tsFile
is empty)
+ return !resource.isEmpty();
}
public File getTsFile() {
@@ -321,14 +337,8 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
@Override
public String toString() {
return String.format(
- "PipeTsFileInsertionEvent{isTsFileFormatValid=%s, resource=%s,
tsFile=%s, isLoaded=%s, isGeneratedByPipe=%s, isClosed=%s, dataContainer=%s}",
- isTsFileFormatValid,
- resource,
- tsFile,
- isLoaded,
- isGeneratedByPipe,
- isClosed.get(),
- dataContainer)
+ "PipeTsFileInsertionEvent{resource=%s, tsFile=%s, isLoaded=%s,
isGeneratedByPipe=%s, isClosed=%s, dataContainer=%s}",
+ resource, tsFile, isLoaded, isGeneratedByPipe, isClosed.get(),
dataContainer)
+ " - "
+ super.toString();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index 0f2492fa4ac..a10a5f7a1ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -242,7 +242,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
if (isDataRegionTimePartitionCoveredByTimeRange()) {
event.skipParsingTime();
} else {
- // Since we only record the upper and lower bounds that time partition
have ever reached, if
+ // Since we only record the upper and lower bounds that time partition
has ever reached, if
// the time partition cannot be covered by the time range during
query, it will not be
// possible later.
disableSkippingTimeParse = true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
index c99b2fb41dc..1ae7d106dc5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
@@ -27,7 +27,10 @@ import
org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateTimeSeriesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTimeSeriesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
@@ -69,6 +72,24 @@ public class PipeStatementExceptionVisitor extends
StatementVisitor<TSStatus, Ex
return visitGeneralCreateTimeSeries(statement, context);
}
+ @Override
+ public TSStatus visitCreateMultiTimeseries(
+ CreateMultiTimeSeriesStatement statement, Exception context) {
+ return visitGeneralCreateTimeSeries(statement, context);
+ }
+
+ @Override
+ public TSStatus visitInternalCreateTimeseries(
+ InternalCreateTimeSeriesStatement statement, Exception context) {
+ return visitGeneralCreateTimeSeries(statement, context);
+ }
+
+ @Override
+ public TSStatus visitInternalCreateMultiTimeSeries(
+ InternalCreateMultiTimeSeriesStatement statement, Exception context) {
+ return visitGeneralCreateTimeSeries(statement, context);
+ }
+
private TSStatus visitGeneralCreateTimeSeries(Statement statement, Exception
context) {
if (context instanceof SemanticException) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 71d93647a0d..d6b753705d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -99,40 +100,40 @@ import static
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.WORK
@SuppressWarnings("java:S1135") // ignore todos
public class TsFileProcessor {
- /** logger fot this class. */
+ /** Logger fot this class. */
private static final Logger logger =
LoggerFactory.getLogger(TsFileProcessor.class);
- /** storgae group name of this tsfile. */
+ /** Storage group name of this tsfile. */
private final String storageGroupName;
/** IoTDB config. */
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- /** database info for mem control. */
+ /** Database info for mem control. */
private final DataRegionInfo dataRegionInfo;
- /** tsfile processor info for mem control. */
+ /** Tsfile processor info for mem control. */
private TsFileProcessorInfo tsFileProcessorInfo;
- /** sync this object in read() and asyncTryToFlush(). */
+ /** Sync this object in read() and asyncTryToFlush(). */
private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new
ConcurrentLinkedDeque<>();
- /** modification to memtable mapping. */
+ /** Modification to memtable mapping. */
private final List<Pair<Modification, IMemTable>> modsToMemtable = new
ArrayList<>();
- /** writer for restore tsfile and flushing. */
+ /** Writer for restore tsfile and flushing. */
private RestorableTsFileIOWriter writer;
- /** tsfile resource for index this tsfile. */
+ /** Tsfile resource for index this tsfile. */
private final TsFileResource tsFileResource;
- /** time range index to indicate this processor belongs to which time range
*/
+ /** Time range index to indicate this processor belongs to which time range
*/
private long timeRangeId;
/**
* Whether the processor is in the queue of the FlushManager or being
flushed by a flush thread.
*/
private volatile boolean managedByFlushManager;
- /** a lock to mutual exclude read and read */
+ /** A lock to mutual exclude read and read */
private final ReadWriteLock flushQueryLock = new ReentrantReadWriteLock();
/**
* It is set by the StorageGroupProcessor and checked by flush threads. (If
shouldClose == true
@@ -140,38 +141,32 @@ public class TsFileProcessor {
*/
private volatile boolean shouldClose;
- /** working memtable. */
+ /** Working memtable. */
private IMemTable workMemTable;
- /** last flush time to flush the working memtable. */
+ /** Last flush time to flush the working memtable. */
private long lastWorkMemtableFlushTime;
- /** this callback is called before the workMemtable is added into the
flushingMemTables. */
+ /** This callback is called before the workMemtable is added into the
flushingMemTables. */
private final DataRegion.UpdateEndTimeCallBack updateLatestFlushTimeCallback;
- /** wal node. */
+ /** Wal node. */
private final IWALNode walNode;
- /** whether it's a sequence file or not. */
+ /** Whether it's a sequence file or not. */
private final boolean sequence;
- /** total memtable size for mem control. */
+ /** Total memtable size for mem control. */
private long totalMemTableSize;
private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get
flushQueryLock write lock";
private static final String FLUSH_QUERY_WRITE_RELEASE =
"{}: {} get flushQueryLock write lock released";
- /**
- * Whether this file keeps TsFile format. If the file violates TsFile
format, then it shouldn't be
- * captured by pipe engine.
- */
- private boolean isTsFileFormatValidForPipe = true;
-
- /** close file listener. */
+ /** Close file listener. */
private final List<CloseFileListener> closeFileListeners = new
CopyOnWriteArrayList<>();
- /** flush file listener. */
+ /** Flush file listener. */
private final List<FlushListener> flushListeners = new ArrayList<>();
private final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
@@ -233,7 +228,7 @@ public class TsFileProcessor {
}
/**
- * insert data in an InsertRowNode into the workingMemtable.
+ * Insert data in an InsertRowNode into the workingMemtable.
*
* @param insertRowNode physical plan of insertion
*/
@@ -249,7 +244,7 @@ public class TsFileProcessor {
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
}
- long[] memIncrements = null;
+ long[] memIncrements;
long memControlStartTime = System.nanoTime();
if (insertRowNode.isAligned()) {
@@ -304,16 +299,16 @@ public class TsFileProcessor {
workMemTable.insert(insertRowNode);
}
- // update start time of this memtable
+ // Update start time of this memtable
tsFileResource.updateStartTime(insertRowNode.getDeviceID(),
insertRowNode.getTime());
- // for sequence tsfile, we update the endTime only when the file is
prepared to be closed.
- // for unsequence tsfile, we have to update the endTime for each insertion.
+ // For sequence tsfile, we update the endTime only when the file is
prepared to be closed.
+ // For unsequence tsfile, we have to update the endTime for each insertion.
if (!sequence) {
tsFileResource.updateEndTime(insertRowNode.getDeviceID(),
insertRowNode.getTime());
}
tsFileResource.updateProgressIndex(insertRowNode.getProgressIndex());
- // recordScheduleMemTableCost
+ // RecordScheduleMemTableCost
costsForMetrics[3] += System.nanoTime() - startTime;
}
@@ -327,7 +322,7 @@ public class TsFileProcessor {
}
/**
- * insert batch data of insertTabletPlan into the workingMemtable. The rows
to be inserted are in
+ * Insert batch data of insertTabletPlan into the workingMemtable. The rows
to be inserted are in
* the range [start, end). Null value in each column values will be replaced
by the subsequent
* non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5}
*
@@ -348,7 +343,7 @@ public class TsFileProcessor {
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
}
- long[] memIncrements = null;
+ long[] memIncrements;
try {
long startTime = System.nanoTime();
if (insertTabletNode.isAligned()) {
@@ -426,8 +421,8 @@ public class TsFileProcessor {
tsFileResource.updateStartTime(
insertTabletNode.getDeviceID(), insertTabletNode.getTimes()[start]);
- // for sequence tsfile, we update the endTime only when the file is
prepared to be closed.
- // for unsequence tsfile, we have to update the endTime for each insertion.
+ // For sequence tsfile, we update the endTime only when the file is
prepared to be closed.
+ // For unsequence tsfile, we have to update the endTime for each insertion.
if (!sequence) {
tsFileResource.updateEndTime(
insertTabletNode.getDeviceID(), insertTabletNode.getTimes()[end -
1]);
@@ -438,17 +433,17 @@ public class TsFileProcessor {
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(System.nanoTime()
- startTime);
}
- @SuppressWarnings("squid:S3776") // high Cognitive Complexity
+ @SuppressWarnings("squid:S3776") // High Cognitive Complexity
private long[] checkMemCostAndAddToTspInfo(
IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes,
Object[] values)
throws WriteProcessException {
- // memory of increased PrimitiveArray and TEXT values, e.g., add a
long[128], add 128*8
+ // Memory of increased PrimitiveArray and TEXT values, e.g., add a
long[128], add 128*8
long memTableIncrement = 0L;
long textDataIncrement = 0L;
long chunkMetadataIncrement = 0L;
for (int i = 0; i < dataTypes.length; i++) {
- // skip failed Measurements
+ // Skip failed Measurements
if (dataTypes[i] == null || measurements[i] == null) {
continue;
}
@@ -477,20 +472,20 @@ public class TsFileProcessor {
private long[] checkAlignedMemCostAndAddToTspInfo(
IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes,
Object[] values)
throws WriteProcessException {
- // memory of increased PrimitiveArray and TEXT values, e.g., add a
long[128], add 128*8
+ // Memory of increased PrimitiveArray and TEXT values, e.g., add a
long[128], add 128*8
long memTableIncrement = 0L;
long textDataIncrement = 0L;
long chunkMetadataIncrement = 0L;
if (workMemTable.checkIfChunkDoesNotExist(deviceId,
AlignedPath.VECTOR_PLACEHOLDER)) {
- // for new device of this mem table
+ // For new device of this mem table
// ChunkMetadataIncrement
chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER,
TSDataType.VECTOR)
* dataTypes.length;
memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
for (int i = 0; i < dataTypes.length; i++) {
- // skip failed Measurements
+ // Skip failed Measurements
if (dataTypes[i] == null || measurements[i] == null) {
continue;
}
@@ -500,18 +495,18 @@ public class TsFileProcessor {
}
}
} else {
- // for existed device of this mem table
+ // For existed device of this mem table
AlignedWritableMemChunk alignedMemChunk =
((AlignedWritableMemChunkGroup)
workMemTable.getMemTableMap().get(deviceId))
.getAlignedMemChunk();
List<TSDataType> dataTypesInTVList = new ArrayList<>();
for (int i = 0; i < dataTypes.length; i++) {
- // skip failed Measurements
+ // Skip failed Measurements
if (dataTypes[i] == null || measurements[i] == null) {
continue;
}
- // extending the column of aligned mem chunk
+ // Extending the column of aligned mem chunk
if (!alignedMemChunk.containsMeasurement(measurements[i])) {
memTableIncrement +=
(alignedMemChunk.alignedListSize() /
PrimitiveArrayManager.ARRAY_SIZE + 1)
@@ -523,7 +518,7 @@ public class TsFileProcessor {
textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
}
}
- // here currentChunkPointNum >= 1
+ // Here currentChunkPointNum >= 1
if ((alignedMemChunk.alignedListSize() %
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
dataTypesInTVList.addAll(((AlignedTVList)
alignedMemChunk.getTVList()).getTsDataTypes());
memTableIncrement +=
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
@@ -547,7 +542,7 @@ public class TsFileProcessor {
long[] memIncrements = new long[3]; // memTable, text, chunk metadata
for (int i = 0; i < dataTypes.length; i++) {
- // skip failed Measurements
+ // Skip failed Measurements
if (dataTypes[i] == null || columns[i] == null || measurements[i] ==
null) {
continue;
}
@@ -662,7 +657,7 @@ public class TsFileProcessor {
if (dataType == null || column == null || measurement == null) {
continue;
}
- // extending the column of aligned mem chunk
+ // Extending the column of aligned mem chunk
if (!alignedMemChunk.containsMeasurement(measurementIds[i])) {
memIncrements[0] +=
(alignedMemChunk.alignedListSize() /
PrimitiveArrayManager.ARRAY_SIZE + 1)
@@ -753,7 +748,7 @@ public class TsFileProcessor {
deletion.getPath(), device, deletion.getStartTime(),
deletion.getEndTime());
}
}
- // flushing memTables are immutable, only record this deletion in these
memTables for read
+ // Flushing memTables are immutable, only record this deletion in these
memTables for read
if (!flushingMemTables.isEmpty()) {
modsToMemtable.add(new Pair<>(deletion, flushingMemTables.getLast()));
}
@@ -961,7 +956,7 @@ public class TsFileProcessor {
}
}
- /** put the working memtable into flushing list and set the working memtable
to null */
+ /** Put the working memtable into flushing list and set the working memtable
to null */
public void asyncFlush() {
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
@@ -991,21 +986,21 @@ public class TsFileProcessor {
}
/**
- * this method calls updateLatestFlushTimeCallback and move the given
memtable into the flushing
+ * This method calls updateLatestFlushTimeCallback and move the given
memtable into the flushing
* queue, set the current working memtable as null and then register the
tsfileProcessor into the
* flushManager again.
*/
private Future<?> addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws
IOException {
- Map<IDeviceID, Long> lastTimeForEachDevice = new HashMap<>();
- if (sequence) {
- lastTimeForEachDevice = tobeFlushed.getMaxTime();
- // If some devices have been removed in MemTable, the number of device
in MemTable and
- // tsFileResource will not be the same. And the endTime of these devices
in resource will be
- // Long.minValue.
- // In the case, we need to delete the removed devices in tsFileResource.
- if (lastTimeForEachDevice.size() != tsFileResource.getDevices().size()) {
-
tsFileResource.deleteRemovedDeviceAndUpdateEndTime(lastTimeForEachDevice);
- } else {
+ final Map<IDeviceID, Long> lastTimeForEachDevice =
tobeFlushed.getMaxTime();
+
+ // If some devices have been removed in MemTable, the number of device in
MemTable and
+ // tsFileResource will not be the same. And the endTime of these devices
in resource will be
+ // Long.minValue.
+ // In the case, we need to delete the removed devices in tsFileResource.
+ if (lastTimeForEachDevice.size() != tsFileResource.getDevices().size()) {
+
tsFileResource.deleteRemovedDeviceAndUpdateEndTime(lastTimeForEachDevice);
+ } else {
+ if (sequence) {
tsFileResource.updateEndTime(lastTimeForEachDevice);
}
}
@@ -1039,7 +1034,7 @@ public class TsFileProcessor {
return FlushManager.getInstance().registerTsFileProcessor(this);
}
- /** put back the memtable to MemTablePool and make metadata in writer
visible */
+ /** Put back the memtable to MemTablePool and make metadata in writer
visible */
private void releaseFlushedMemTable(IMemTable memTable) {
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
@@ -1064,7 +1059,7 @@ public class TsFileProcessor {
}
memTable.release();
MemTableManager.getInstance().decreaseMemtableNumber();
- // reset the mem cost in StorageGroupProcessorInfo
+ // Reset the mem cost in StorageGroupProcessorInfo
dataRegionInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
if (logger.isDebugEnabled()) {
logger.debug(
@@ -1074,7 +1069,7 @@ public class TsFileProcessor {
tsFileResource.getTsFile().getName(),
flushingMemTables.size());
}
- // report to System
+ // Report to System
SystemInfo.getInstance().resetStorageGroupStatus(dataRegionInfo);
SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost());
if (logger.isDebugEnabled()) {
@@ -1120,7 +1115,7 @@ public class TsFileProcessor {
public void flushOneMemTable() {
IMemTable memTableToFlush = flushingMemTables.getFirst();
- // signal memtable only may appear when calling asyncClose()
+ // Signal memtable only may appear when calling asyncClose()
if (!memTableToFlush.isSignalMemTable()) {
if (memTableToFlush.isEmpty()) {
logger.info(
@@ -1167,16 +1162,16 @@ public class TsFileProcessor {
tsFileResource.getTsFile().getAbsolutePath(),
e1);
}
- // release resource
+ // Release resource
try {
syncReleaseFlushedMemTable(memTableToFlush);
- // make sure no read will search this file
+ // Make sure no read will search this file
tsFileResource.setTimeIndex(config.getTimeIndexLevel().getTimeIndex());
- // this callback method will register this empty tsfile into
TsFileManager
+ // This callback method will register this empty tsfile into
TsFileManager
for (CloseFileListener closeFileListener : closeFileListeners) {
closeFileListener.onClosed(this);
}
- // close writer
+ // Close writer
writer.close();
writer = null;
synchronized (flushingMemTables) {
@@ -1229,7 +1224,7 @@ public class TsFileProcessor {
memTableToFlush.isSignalMemTable());
}
- // for sync flush
+ // For sync flush
syncReleaseFlushedMemTable(memTableToFlush);
try {
writer.getTsFileOutput().force();
@@ -1237,11 +1232,11 @@ public class TsFileProcessor {
logger.error("fsync memTable data to disk error,", e);
}
- // call flushed listener after memtable is released safely
+ // Call flushed listener after memtable is released safely
for (FlushListener flushListener : flushListeners) {
flushListener.onMemTableFlushed(memTableToFlush);
}
- // retry to avoid unnecessary read-only mode
+ // Retry to avoid unnecessary read-only mode
int retryCnt = 0;
while (shouldClose && flushingMemTables.isEmpty() && writer != null) {
try {
@@ -1267,7 +1262,7 @@ public class TsFileProcessor {
storageGroupName,
tsFileResource.getTsFile().getAbsolutePath(),
e);
- // truncate broken metadata
+ // Truncate broken metadata
try {
writer.reset();
} catch (IOException e1) {
@@ -1277,7 +1272,7 @@ public class TsFileProcessor {
tsFileResource.getTsFile().getAbsolutePath(),
e1);
}
- // retry or set read-only
+ // Retry or set read-only
if (retryCnt < 3) {
logger.warn(
"{} meet error when flush FileMetadata to {}, retry it again",
@@ -1296,7 +1291,7 @@ public class TsFileProcessor {
break;
}
}
- // for sync close
+ // For sync close
if (logger.isDebugEnabled()) {
logger.debug(
"{}: {} try to get flushingMemtables lock.",
@@ -1341,8 +1336,8 @@ public class TsFileProcessor {
if (logger.isDebugEnabled()) {
logger.debug("Ended file {}", tsFileResource);
}
- // remove this processor from Closing list in StorageGroupProcessor,
- // mark the TsFileResource closed, no need writer anymore
+ // Remove this processor from Closing list in StorageGroupProcessor,
+ // Mark the TsFileResource closed, no need writer anymore
for (CloseFileListener closeFileListener : closeFileListeners) {
closeFileListener.onClosed(this);
}
@@ -1357,10 +1352,9 @@ public class TsFileProcessor {
private void endEmptyFile() throws TsFileProcessorException, IOException {
logger.info("Start to end empty file {}", tsFileResource);
- // remove this processor from Closing list in DataRegion,
- // mark the TsFileResource closed, no need writer anymore
+ // Remove this processor from Closing list in DataRegion,
+ // Mark the TsFileResource closed, no need writer anymore
writer.close();
- isTsFileFormatValidForPipe = false; // empty file, no need to be captured
by pipe
for (CloseFileListener closeFileListener : closeFileListeners) {
closeFileListener.onClosed(this);
}
@@ -1374,11 +1368,6 @@ public class TsFileProcessor {
writer = null;
}
- /** Only useful after TsFile is closed. */
- public boolean isTsFileFormatValidForPipe() {
- return isTsFileFormatValidForPipe;
- }
-
public boolean isManagedByFlushManager() {
return managedByFlushManager;
}
@@ -1387,10 +1376,10 @@ public class TsFileProcessor {
this.managedByFlushManager = managedByFlushManager;
}
- /** close this tsfile */
+ /** Close this tsfile */
public void close() throws TsFileProcessorException {
try {
- // when closing resource file, its corresponding mod file is also closed.
+ // When closing resource file, its corresponding mod file is also closed.
tsFileResource.close();
} catch (IOException e) {
throw new TsFileProcessorException(e);
@@ -1410,7 +1399,7 @@ public class TsFileProcessor {
}
/**
- * get the chunk(s) in the memtable (one from work memtable and the other
ones in flushing
+ * Get the chunk(s) in the memtable (one from work memtable and the other
ones in flushing
* memtables and then compact them into one TimeValuePairSorter). Then get
the related
* ChunkMetadata of data on disk.
*
@@ -1498,7 +1487,7 @@ public class TsFileProcessor {
this.timeRangeId = timeRangeId;
}
- /** release resource of a memtable */
+ /** Release resource of a memtable */
public void putMemTableBackAndClose() throws TsFileProcessorException {
if (workMemTable != null) {
workMemTable.release();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/PipeReceiverStatusHandler.java
index 0cb9af17c81..287fa49a996 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/PipeReceiverStatusHandler.java
@@ -56,11 +56,11 @@ public class PipeReceiverStatusHandler {
private final AtomicReference<String> exceptionRecordedMessage = new
AtomicReference<>("");
public PipeReceiverStatusHandler(
- boolean isRetryAllowedWhenConflictOccurs,
- long retryMaxSecondsWhenConflictOccurs,
- boolean shouldRecordIgnoredDataWhenConflictOccurs,
- long retryMaxSecondsWhenOtherExceptionsOccur,
- boolean shouldRecordIgnoredDataWhenOtherExceptionsOccur) {
+ final boolean isRetryAllowedWhenConflictOccurs,
+ final long retryMaxSecondsWhenConflictOccurs,
+ final boolean shouldRecordIgnoredDataWhenConflictOccurs,
+ final long retryMaxSecondsWhenOtherExceptionsOccur,
+ final boolean shouldRecordIgnoredDataWhenOtherExceptionsOccur) {
this.isRetryAllowedWhenConflictOccurs = isRetryAllowedWhenConflictOccurs;
this.retryMaxMillisWhenConflictOccurs =
retryMaxSecondsWhenConflictOccurs < 0
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index b2fdff35385..b2342966c20 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -84,28 +84,43 @@ public abstract class IoTDBConnector implements
PipeConnector {
@Override
public void validate(PipeParameterValidator validator) throws Exception {
final PipeParameters parameters = validator.getParameters();
- validator.validate(
- args ->
- (boolean) args[0]
- || (((boolean) args[1] || (boolean) args[2]) && (boolean)
args[3])
- || (boolean) args[4]
- || (((boolean) args[5] || (boolean) args[6]) && (boolean)
args[7]),
- String.format(
- "One of %s, %s:%s, %s, %s:%s must be specified",
- CONNECTOR_IOTDB_NODE_URLS_KEY,
- CONNECTOR_IOTDB_HOST_KEY,
- CONNECTOR_IOTDB_PORT_KEY,
- SINK_IOTDB_NODE_URLS_KEY,
- SINK_IOTDB_HOST_KEY,
- SINK_IOTDB_PORT_KEY),
- parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
- parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
- parameters.hasAttribute(CONNECTOR_IOTDB_HOST_KEY),
- parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
- parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
- parameters.hasAttribute(SINK_IOTDB_IP_KEY),
- parameters.hasAttribute(SINK_IOTDB_HOST_KEY),
- parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
+ validator
+ .validate(
+ args ->
+ (boolean) args[0]
+ || (((boolean) args[1] || (boolean) args[2]) && (boolean)
args[3])
+ || (boolean) args[4]
+ || (((boolean) args[5] || (boolean) args[6]) && (boolean)
args[7]),
+ String.format(
+ "One of %s, %s:%s, %s, %s:%s must be specified",
+ CONNECTOR_IOTDB_NODE_URLS_KEY,
+ CONNECTOR_IOTDB_HOST_KEY,
+ CONNECTOR_IOTDB_PORT_KEY,
+ SINK_IOTDB_NODE_URLS_KEY,
+ SINK_IOTDB_HOST_KEY,
+ SINK_IOTDB_PORT_KEY),
+ parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
+ parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
+ parameters.hasAttribute(CONNECTOR_IOTDB_HOST_KEY),
+ parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
+ parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
+ parameters.hasAttribute(SINK_IOTDB_IP_KEY),
+ parameters.hasAttribute(SINK_IOTDB_HOST_KEY),
+ parameters.hasAttribute(SINK_IOTDB_PORT_KEY))
+ .validate(
+ arg -> arg.equals("retry") || arg.equals("ignore"),
+ String.format(
+ "The value of key %s or %s must be either 'retry' or
'ignore'.",
+ CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
+ SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(
+ CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
+ SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
+
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE)
+ .trim()
+ .toLowerCase());
}
@Override
@@ -129,7 +144,8 @@ public abstract class IoTDBConnector implements
PipeConnector {
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE)
- .equals("retry"),
+ .trim()
+ .equalsIgnoreCase("retry"),
parameters.getLongOrDefault(
Arrays.asList(
CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY,