This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 79ea1ef23d2 Pipe: Enable `'sink.format'='tsfile'` to use tsFile as
tablet event batch (#12737)
79ea1ef23d2 is described below
commit 79ea1ef23d29ce6b5290a01060c3614ded3e1f13
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 24 23:32:43 2024 +0800
Pipe: Enable `'sink.format'='tsfile'` to use tsFile as tablet event batch
(#12737)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/it/autocreate/IoTDBPipeDataSinkIT.java | 89 +++++
.../protocol/IoTDBConfigRegionConnector.java | 13 +-
.../evolvable/batch/PipeTabletEventBatch.java | 133 ++++++++
.../PipeTabletEventPlainBatch.java} | 117 ++-----
.../batch/PipeTabletEventTsFileBatch.java | 365 +++++++++++++++++++++
.../PipeTransferBatchReqBuilder.java | 96 ++++--
.../request/PipeTransferTabletRawReq.java | 4 +-
.../protocol/legacy/IoTDBLegacyPipeConnector.java | 11 +-
.../PipeConsensusTransferBatchReqBuilder.java | 8 +-
.../async/IoTDBDataRegionAsyncConnector.java | 228 +++++++------
.../PipeTransferTabletBatchEventHandler.java | 31 +-
...Handler.java => PipeTransferTsFileHandler.java} | 134 ++++++--
.../thrift/sync/IoTDBDataRegionSyncConnector.java | 126 +++++--
.../thrift/sync/IoTDBSchemaRegionConnector.java | 13 +-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 18 +-
.../tsfile/TsFileInsertionDataContainer.java | 9 +-
.../db/pipe/resource/memory/PipeMemoryManager.java | 63 +---
...oryWeighUtil.java => PipeMemoryWeightUtil.java} | 63 +++-
.../pipe/resource/tsfile/PipeTsFileResource.java | 9 +-
.../subtask/connector/PipeConnectorSubtask.java | 2 +-
.../SubscriptionPrefetchingTabletsQueue.java | 6 +-
.../config/constant/PipeConnectorConstant.java | 12 +-
.../pipe/connector/protocol/IoTDBConnector.java | 40 ++-
.../connector/protocol/IoTDBSslSyncConnector.java | 16 +-
.../iotdb/commons/pipe/event/EnrichedEvent.java | 2 +-
.../task/subtask/PipeAbstractConnectorSubtask.java | 13 +-
26 files changed, 1204 insertions(+), 417 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index 27893c49381..d7149a401b6 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -99,6 +99,95 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
}
}
+ @Test
+ public void testSinkTsFileFormat() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)",
"flush"))) {
+ return;
+ }
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.realtime.mode", "forced-log");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ connectorAttributes.put("connector.format", "tsfile");
+ connectorAttributes.put("connector.realtime-first", "false");
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes))
+ .getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList("insert into root.vehicle.d0(time, s1) values (2, 1)",
"flush"))) {
+ return;
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.**",
+ "Time,root.vehicle.d0.s1,",
+ Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,",
"2,1.0,"))));
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipe("testPipe").getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes))
+ .getCode());
+
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.vehicle.d0(time, s1) values (4, 1)",
+ "insert into root.vehicle.d0(time, s1) values (3, 1), (0, 1)",
+ "flush"))) {
+ return;
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.**",
+ "Time,root.vehicle.d0.s1,",
+ Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList("0,1.0,", "1,1.0,", "2,1.0,",
"3,1.0,", "4,1.0,"))));
+ }
+ }
+
@Test
public void testLegacyConnector() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 25fc9de720a..9ba081018b9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -188,9 +189,17 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
clientManager.getClient();
// 1. Transfer snapshotFile, and template File if exists
- transferFilePieces(pipeName, creationTime, snapshotFile, clientAndStatus,
true);
+ transferFilePieces(
+ Collections.singletonMap(new Pair<>(pipeName, creationTime), 1.0),
+ snapshotFile,
+ clientAndStatus,
+ true);
if (Objects.nonNull(templateFile)) {
- transferFilePieces(pipeName, creationTime, templateFile,
clientAndStatus, true);
+ transferFilePieces(
+ Collections.singletonMap(new Pair<>(pipeName, creationTime), 1.0),
+ templateFile,
+ clientAndStatus,
+ true);
}
// 2. Transfer file seal signal, which means the snapshots are transferred
completely
final TPipeTransferResp resp;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
new file mode 100644
index 00000000000..1f4123c6cc2
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
+
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import org.apache.tsfile.exception.write.WriteProcessException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public abstract class PipeTabletEventBatch implements AutoCloseable {
+
+ protected final List<EnrichedEvent> events = new ArrayList<>();
+
+ private final int maxDelayInMs;
+ private long firstEventProcessingTime = Long.MIN_VALUE;
+
+ protected long totalBufferSize = 0;
+
+ protected volatile boolean isClosed = false;
+
+ protected PipeTabletEventBatch(final int maxDelayInMs) {
+ this.maxDelayInMs = maxDelayInMs;
+ }
+
+ /**
+ * Try offer {@link Event} into batch if the given {@link Event} is not
duplicated.
+ *
+ * @param event the given {@link Event}
+ * @return {@code true} if the batch can be transferred
+ */
+ synchronized boolean onEvent(final TabletInsertionEvent event)
+ throws WALPipeException, IOException, WriteProcessException {
+ if (isClosed || !(event instanceof EnrichedEvent)) {
+ return false;
+ }
+
+ // The deduplication logic here is to avoid the accumulation of
+ // the same event in a batch when retrying.
+ if (events.isEmpty() || !Objects.equals(events.get(events.size() - 1),
event)) {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (((EnrichedEvent) event)
+
.increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName())) {
+
+ if (constructBatch(event)) {
+ events.add((EnrichedEvent) event);
+ }
+
+ if (firstEventProcessingTime == Long.MIN_VALUE) {
+ firstEventProcessingTime = System.currentTimeMillis();
+ }
+ } else {
+ ((EnrichedEvent) event)
+
.decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false);
+ }
+ }
+
+ return shouldEmit();
+ }
+
+ /**
+ * Added an {@link TabletInsertionEvent} into batch.
+ *
+ * @param event the {@link TabletInsertionEvent} in batch
+ * @return {@code true} if the event is calculated into batch, {@code false}
if the event is
+ * cached and not emitted in this batch. If there are failure
encountered, just throw
+ * exceptions and do not return {@code false} here.
+ */
+ protected abstract boolean constructBatch(final TabletInsertionEvent event)
+ throws WALPipeException, IOException, WriteProcessException;
+
+ private boolean shouldEmit() {
+ return totalBufferSize >= getMaxBatchSizeInBytes()
+ || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
+ }
+
+ protected abstract long getMaxBatchSizeInBytes();
+
+ public synchronized void onSuccess() {
+ events.clear();
+
+ totalBufferSize = 0;
+
+ firstEventProcessingTime = Long.MIN_VALUE;
+ }
+
+ @Override
+ public synchronized void close() {
+ isClosed = true;
+
+ clearEventsReferenceCount(PipeTabletEventBatch.class.getName());
+ events.clear();
+ }
+
+ public void decreaseEventsReferenceCount(final String holderMessage, final
boolean shouldReport) {
+ events.forEach(event -> event.decreaseReferenceCount(holderMessage,
shouldReport));
+ }
+
+ private void clearEventsReferenceCount(final String holderMessage) {
+ events.forEach(event -> event.clearReferenceCount(holderMessage));
+ }
+
+ public List<EnrichedEvent> deepCopyEvents() {
+ return new ArrayList<>(events);
+ }
+
+ boolean isEmpty() {
+ return events.isEmpty();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
similarity index 61%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 97070313308..0b97487b04e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
-import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.utils.Pair;
@@ -45,30 +44,22 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class PipeEventBatch implements AutoCloseable {
+public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeEventBatch.class);
-
- private final List<Event> events = new ArrayList<>();
- private final List<Long> requestCommitIds = new ArrayList<>();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTabletEventPlainBatch.class);
private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
- // limit in delayed time
- private final int maxDelayInMs;
- private long firstEventProcessingTime = Long.MIN_VALUE;
-
// limit in buffer size
private final PipeMemoryBlock allocatedMemoryBlock;
- private long totalBufferSize = 0;
// Used to rate limit when transferring data
private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new
HashMap<>();
- public PipeEventBatch(int maxDelayInMs, long requestMaxBatchSizeInBytes) {
- this.maxDelayInMs = maxDelayInMs;
+ PipeTabletEventPlainBatch(final int maxDelayInMs, final long
requestMaxBatchSizeInBytes) {
+ super(maxDelayInMs);
this.allocatedMemoryBlock =
PipeResourceManager.memory()
.tryAllocate(requestMaxBatchSizeInBytes)
@@ -86,68 +77,34 @@ public class PipeEventBatch implements AutoCloseable {
if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
LOGGER.info(
- "PipeTransferBatchReqBuilder: the max batch size is adjusted from {}
to {} due to the "
+ "PipeTabletEventBatch: the max batch size is adjusted from {} to {}
due to the "
+ "memory restriction",
requestMaxBatchSizeInBytes,
getMaxBatchSizeInBytes());
}
}
- /**
- * Try offer {@link Event} into batch if the given {@link Event} is not
duplicated.
- *
- * @param event the given {@link Event}
- * @return {@code true} if the batch can be transferred
- */
- public synchronized boolean onEvent(final TabletInsertionEvent event)
- throws IOException, WALPipeException {
- if (!(event instanceof EnrichedEvent)) {
- return false;
- }
-
- final long requestCommitId = ((EnrichedEvent) event).getCommitId();
-
- // The deduplication logic here is to avoid the accumulation of the same
event in a batch when
- // retrying.
- if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) {
- // We increase the reference count for this event to determine if the
event may be released.
- if (((EnrichedEvent) event)
-
.increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName())) {
- events.add(event);
- requestCommitIds.add(requestCommitId);
-
- final int bufferSize = buildTabletInsertionBuffer(event);
- totalBufferSize += bufferSize;
- pipe2BytesAccumulated.compute(
- new Pair<>(
- ((EnrichedEvent) event).getPipeName(), ((EnrichedEvent)
event).getCreationTime()),
- (pipeName, bytesAccumulated) ->
- bytesAccumulated == null ? bufferSize : bytesAccumulated +
bufferSize);
-
- if (firstEventProcessingTime == Long.MIN_VALUE) {
- firstEventProcessingTime = System.currentTimeMillis();
- }
- } else {
- ((EnrichedEvent) event)
-
.decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false);
- }
- }
-
- return totalBufferSize >= getMaxBatchSizeInBytes()
- || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
+ @Override
+ protected boolean constructBatch(final TabletInsertionEvent event)
+ throws WALPipeException, IOException {
+ final int bufferSize = buildTabletInsertionBuffer(event);
+ totalBufferSize += bufferSize;
+ pipe2BytesAccumulated.compute(
+ new Pair<>(
+ ((EnrichedEvent) event).getPipeName(), ((EnrichedEvent)
event).getCreationTime()),
+ (pipeName, bytesAccumulated) ->
+ bytesAccumulated == null ? bufferSize : bytesAccumulated +
bufferSize);
+ return true;
}
+ @Override
public synchronized void onSuccess() {
+ super.onSuccess();
+
binaryBuffers.clear();
insertNodeBuffers.clear();
tabletBuffers.clear();
- events.clear();
- requestCommitIds.clear();
-
- firstEventProcessingTime = Long.MIN_VALUE;
-
- totalBufferSize = 0;
pipe2BytesAccumulated.clear();
}
@@ -156,22 +113,11 @@ public class PipeEventBatch implements AutoCloseable {
binaryBuffers, insertNodeBuffers, tabletBuffers);
}
- private long getMaxBatchSizeInBytes() {
+ @Override
+ protected long getMaxBatchSizeInBytes() {
return allocatedMemoryBlock.getMemoryUsageInBytes();
}
- public boolean isEmpty() {
- return binaryBuffers.isEmpty() && insertNodeBuffers.isEmpty() &&
tabletBuffers.isEmpty();
- }
-
- public List<Event> deepCopyEvents() {
- return new ArrayList<>(events);
- }
-
- public List<Long> deepCopyRequestCommitIds() {
- return new ArrayList<>(requestCommitIds);
- }
-
public Map<Pair<String, Long>, Long> deepCopyPipeName2BytesAccumulated() {
return new HashMap<>(pipe2BytesAccumulated);
}
@@ -213,23 +159,8 @@ public class PipeEventBatch implements AutoCloseable {
@Override
public synchronized void close() {
- clearEventsReferenceCount(PipeTransferBatchReqBuilder.class.getName());
- allocatedMemoryBlock.close();
- }
+ super.close();
- public void decreaseEventsReferenceCount(final String holderMessage, final
boolean shouldReport) {
- for (final Event event : events) {
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent) event).decreaseReferenceCount(holderMessage,
shouldReport);
- }
- }
- }
-
- public void clearEventsReferenceCount(final String holderMessage) {
- for (final Event event : events) {
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent) event).clearReferenceCount(holderMessage);
- }
- }
+ allocatedMemoryBlock.close();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
new file mode 100644
index 00000000000..6abca7b3183
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
+import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTabletEventTsFileBatch.class);
+
+ private static final AtomicReference<FolderManager> FOLDER_MANAGER = new
AtomicReference<>();
+ private static final AtomicLong BATCH_ID_GENERATOR = new AtomicLong(0);
+ private final AtomicLong currentBatchId = new
AtomicLong(BATCH_ID_GENERATOR.incrementAndGet());
+ private final File batchFileBaseDir;
+
+ private static final String TS_FILE_PREFIX = "tb"; // tb means tablet batch
+ private final AtomicLong tsFileIdGenerator = new AtomicLong(0);
+
+ private final long maxSizeInBytes;
+
+ private final Map<Pair<String, Long>, Double> pipeName2WeightMap = new
HashMap<>();
+
+ private final List<Tablet> tabletList = new ArrayList<>();
+ private final List<Boolean> isTabletAlignedList = new ArrayList<>();
+
+ private volatile TsFileWriter fileWriter;
+
+ PipeTabletEventTsFileBatch(final int maxDelayInMs, final long
requestMaxBatchSizeInBytes) {
+ super(maxDelayInMs);
+
+ this.maxSizeInBytes = requestMaxBatchSizeInBytes;
+ try {
+ this.batchFileBaseDir = getNextBaseDir();
+ } catch (final Exception e) {
+ throw new PipeException(
+ String.format("Failed to create file dir for batch: %s",
e.getMessage()));
+ }
+ }
+
+ private File getNextBaseDir() throws DiskSpaceInsufficientException {
+ if (FOLDER_MANAGER.get() == null) {
+ synchronized (FOLDER_MANAGER) {
+ if (FOLDER_MANAGER.get() == null) {
+ FOLDER_MANAGER.set(
+ new FolderManager(
+
Arrays.stream(IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDirs())
+ .map(fileDir -> fileDir + File.separator + ".batch")
+ .collect(Collectors.toList()),
+ DirectoryStrategyType.SEQUENCE_STRATEGY));
+ }
+ }
+ }
+
+ final File baseDir =
+ new File(FOLDER_MANAGER.get().getNextFolder(),
Long.toString(currentBatchId.get()));
+ if (baseDir.exists()) {
+ FileUtils.deleteQuietly(baseDir);
+ }
+ if (!baseDir.exists() && !baseDir.mkdirs()) {
+ LOGGER.warn(
+ "Batch id = {}: Failed to create batch file dir {}.",
+ currentBatchId.get(),
+ baseDir.getPath());
+ throw new PipeException(
+ String.format(
+ "Failed to create batch file dir %s. (Batch id = %s)",
+ baseDir.getPath(), currentBatchId.get()));
+ }
+ LOGGER.info(
+ "Batch id = {}: Create batch dir successfully, batch file dir = {}.",
+ currentBatchId.get(),
+ baseDir.getPath());
+ return baseDir;
+ }
+
+ @Override
+ protected boolean constructBatch(final TabletInsertionEvent event) {
+ if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+ final PipeInsertNodeTabletInsertionEvent insertNodeTabletInsertionEvent =
+ (PipeInsertNodeTabletInsertionEvent) event;
+ final List<Tablet> tablets =
insertNodeTabletInsertionEvent.convertToTablets();
+ for (int i = 0; i < tablets.size(); ++i) {
+ final Tablet tablet = tablets.get(i);
+ if (tablet.rowSize == 0) {
+ continue;
+ }
+ bufferTablet(
+ insertNodeTabletInsertionEvent.getPipeName(),
+ insertNodeTabletInsertionEvent.getCreationTime(),
+ tablet,
+ insertNodeTabletInsertionEvent.isAligned(i));
+ }
+ } else if (event instanceof PipeRawTabletInsertionEvent) {
+ final PipeRawTabletInsertionEvent rawTabletInsertionEvent =
+ (PipeRawTabletInsertionEvent) event;
+ final Tablet tablet = rawTabletInsertionEvent.convertToTablet();
+ if (tablet.rowSize == 0) {
+ return true;
+ }
+ bufferTablet(
+ rawTabletInsertionEvent.getPipeName(),
+ rawTabletInsertionEvent.getCreationTime(),
+ tablet,
+ rawTabletInsertionEvent.isAligned());
+ } else {
+ LOGGER.warn(
+ "Batch id = {}: Unsupported event {} type {} when constructing
tsfile batch",
+ currentBatchId.get(),
+ event,
+ event.getClass());
+ }
+ return true;
+ }
+
+ private void bufferTablet(
+ final String pipeName, long creationTime, Tablet tablet, boolean
isAligned) {
+ totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
+
+ pipeName2WeightMap.compute(
+ new Pair<>(pipeName, creationTime),
+ (pipe, weight) -> Objects.nonNull(weight) ? ++weight : 1);
+
+ tabletList.add(tablet);
+ isTabletAlignedList.add(isAligned);
+ }
+
+ public Map<Pair<String, Long>, Double> deepCopyPipe2WeightMap() {
+ final double sum =
pipeName2WeightMap.values().stream().reduce(Double::sum).orElse(0.0);
+ if (sum == 0.0) {
+ return Collections.emptyMap();
+ }
+ pipeName2WeightMap.entrySet().forEach(entry ->
entry.setValue(entry.getValue() / sum));
+ return new HashMap<>(pipeName2WeightMap);
+ }
+
+ public synchronized List<File> sealTsFiles() throws IOException,
WriteProcessException {
+ return isClosed ? Collections.emptyList() : writeTabletsToTsFiles();
+ }
+
+ private List<File> writeTabletsToTsFiles() throws IOException,
WriteProcessException {
+ final Map<String, List<Tablet>> device2Tablets = new HashMap<>();
+ final Map<String, Boolean> device2Aligned = new HashMap<>();
+
+ // Sort the tablets by device id
+ for (int i = 0, size = tabletList.size(); i < size; ++i) {
+ final Tablet tablet = tabletList.get(i);
+ final String deviceId = tablet.deviceId;
+ device2Tablets.computeIfAbsent(deviceId, k -> new
ArrayList<>()).add(tablet);
+ device2Aligned.put(deviceId, isTabletAlignedList.get(i));
+ }
+
+ // Sort the tablets by start time in each device
+ for (final List<Tablet> tablets : device2Tablets.values()) {
+ tablets.sort(
+ // Each tablet has at least one timestamp
+ Comparator.comparingLong(tablet -> tablet.timestamps[0]));
+ }
+
+ // Replace ArrayList with LinkedList to improve performance
+ final Map<String, LinkedList<Tablet>> device2TabletsLinkedList = new
HashMap<>();
+ for (final Map.Entry<String, List<Tablet>> entry :
device2Tablets.entrySet()) {
+ device2TabletsLinkedList.put(entry.getKey(), new
LinkedList<>(entry.getValue()));
+ }
+ // Clear the original device2Tablets to release memory
+ device2Tablets.clear();
+
+ // Write the tablets to the tsfile device by device, and the tablets
+ // in the same device are written in order of start time. Tablets in
+ // the same device should not be written if their time ranges overlap.
+ // If overlapped, we try to write the tablets whose device id is not
+ // the same as the previous one. For the tablets not written in the
+ // previous round, we write them in a new tsfile.
+ final List<File> sealedFiles = new ArrayList<>();
+
+ // Try making the tsfile size as large as possible
+ while (!device2TabletsLinkedList.isEmpty()) {
+ if (Objects.isNull(fileWriter)) {
+ fileWriter =
+ new TsFileWriter(
+ new File(
+ batchFileBaseDir,
+ TS_FILE_PREFIX
+ + "_"
+ +
IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
+ + "_"
+ + currentBatchId.get()
+ + "_"
+ + tsFileIdGenerator.getAndIncrement()
+ + TsFileConstant.TSFILE_SUFFIX));
+ }
+
+ final Iterator<Map.Entry<String, LinkedList<Tablet>>> iterator =
+ device2TabletsLinkedList.entrySet().iterator();
+
+ while (iterator.hasNext()) {
+ final Map.Entry<String, LinkedList<Tablet>> entry = iterator.next();
+ final String deviceId = entry.getKey();
+ final LinkedList<Tablet> tablets = entry.getValue();
+
+ final List<Tablet> tabletsToWrite = new ArrayList<>();
+
+ Tablet lastTablet = null;
+ while (!tablets.isEmpty()) {
+ final Tablet tablet = tablets.peekFirst();
+ if (Objects.isNull(lastTablet)
+ // lastTablet.rowSize is not 0
+ || lastTablet.timestamps[lastTablet.rowSize - 1] <
tablet.timestamps[0]) {
+ tabletsToWrite.add(tablet);
+ lastTablet = tablet;
+ tablets.pollFirst();
+ } else {
+ break;
+ }
+ }
+
+ if (tablets.isEmpty()) {
+ iterator.remove();
+ }
+
+ final boolean isAligned = device2Aligned.get(deviceId);
+ for (final Tablet tablet : tabletsToWrite) {
+ if (isAligned) {
+ try {
+ fileWriter.registerAlignedTimeseries(new Path(tablet.deviceId),
tablet.getSchemas());
+ } catch (final WriteProcessException ignore) {
+ // Do nothing if the timeSeries has been registered
+ }
+
+ fileWriter.writeAligned(tablet);
+ } else {
+ for (final MeasurementSchema schema : tablet.getSchemas()) {
+ try {
+ fileWriter.registerTimeseries(new Path(tablet.deviceId),
schema);
+ } catch (final WriteProcessException ignore) {
+ // Do nothing if the timeSeries has been registered
+ }
+ }
+
+ fileWriter.write(tablet);
+ }
+ }
+ }
+
+ fileWriter.close();
+ final File sealedFile = fileWriter.getIOWriter().getFile();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Batch id = {}: Seal tsfile {} successfully.",
+ currentBatchId.get(),
+ sealedFile.getPath());
+ }
+ sealedFiles.add(sealedFile);
+ fileWriter = null;
+ }
+
+ return sealedFiles;
+ }
+
+ @Override
+ protected long getMaxBatchSizeInBytes() {
+ return maxSizeInBytes;
+ }
+
+ @Override
+ public synchronized void onSuccess() {
+ super.onSuccess();
+
+ pipeName2WeightMap.clear();
+
+ tabletList.clear();
+ isTabletAlignedList.clear();
+
+ // We don't need to delete the tsFile here, because the tsFile
+ // will be deleted after the file is transferred.
+ fileWriter = null;
+ }
+
+ @Override
+ public synchronized void close() {
+ super.close();
+
+ pipeName2WeightMap.clear();
+
+ tabletList.clear();
+ isTabletAlignedList.clear();
+
+ if (Objects.nonNull(fileWriter)) {
+ try {
+ fileWriter.close();
+ } catch (final Exception e) {
+ LOGGER.info(
+ "Batch id = {}: Failed to close the tsfile {} when trying to close
batch, because {}",
+ currentBatchId.get(),
+ fileWriter.getIOWriter().getFile().getPath(),
+ e.getMessage(),
+ e);
+ }
+
+ try {
+ FileUtils.delete(fileWriter.getIOWriter().getFile());
+ } catch (final Exception e) {
+ LOGGER.info(
+ "Batch id = {}: Failed to delete the tsfile {} when trying to
close batch, because {}",
+ currentBatchId.get(),
+ fileWriter.getIOWriter().getFile().getPath(),
+ e.getMessage(),
+ e);
+ }
+
+ fileWriter = null;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
similarity index 55%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 33b524b1a9a..f28f19e4da1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -29,6 +29,7 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,12 +42,18 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY;
@@ -62,28 +69,53 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
// If the leader cache is disabled (or unable to find the endpoint of event
in the leader cache),
// the event will be stored in the default batch.
- private final PipeEventBatch defaultBatch;
+ private final PipeTabletEventBatch defaultBatch;
// If the leader cache is enabled, the batch will be divided by the leader
endpoint,
// each endpoint has a batch.
- private final Map<TEndPoint, PipeEventBatch> endPointToBatch = new
HashMap<>();
+ // This is only used in plain batch since tsfile does not return redirection
info.
+ private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
new HashMap<>();
public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
+ final boolean usingTsFileBatch =
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
CONNECTOR_FORMAT_HYBRID_VALUE)
+ .equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
+
useLeaderCache =
- parameters.getBooleanOrDefault(
- Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY,
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
- CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE);
-
- requestMaxDelayInMs =
- parameters.getIntOrDefault(
- Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY,
SINK_IOTDB_BATCH_DELAY_KEY),
- CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
- * 1000;
- requestMaxBatchSizeInBytes =
- parameters.getLongOrDefault(
- Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
- CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
-
- this.defaultBatch = new PipeEventBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes);
+ !usingTsFileBatch
+ && parameters.getBooleanOrDefault(
+ Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY,
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
+ CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE);
+
+ final int requestMaxDelayInSeconds;
+ if (usingTsFileBatch) {
+ requestMaxDelayInSeconds =
+ parameters.getIntOrDefault(
+ Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY,
SINK_IOTDB_BATCH_DELAY_KEY),
+ CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE);
+ requestMaxDelayInMs =
+ requestMaxDelayInSeconds < 0 ? Integer.MAX_VALUE :
requestMaxDelayInSeconds * 1000;
+ requestMaxBatchSizeInBytes =
+ parameters.getLongOrDefault(
+ Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
+ CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE);
+ this.defaultBatch =
+ new PipeTabletEventTsFileBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes);
+ } else {
+ requestMaxDelayInSeconds =
+ parameters.getIntOrDefault(
+ Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY,
SINK_IOTDB_BATCH_DELAY_KEY),
+ CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE);
+ requestMaxDelayInMs =
+ requestMaxDelayInSeconds < 0 ? Integer.MAX_VALUE :
requestMaxDelayInSeconds * 1000;
+ requestMaxBatchSizeInBytes =
+ parameters.getLongOrDefault(
+ Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
+ CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
+ this.defaultBatch =
+ new PipeTabletEventPlainBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes);
+ }
}
/**
@@ -91,12 +123,13 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
* duplicated.
*
* @param event the given {@link Event}
- * @return {@link Pair}<{@link TEndPoint}, {@link PipeEventBatch}> not null
means this {@link
- * PipeEventBatch} can be transferred. the first element is the leader
endpoint to transfer to
- * (might be null), the second element is the batch to be transferred.
+ * @return {@link Pair}<{@link TEndPoint}, {@link
PipeTabletEventPlainBatch}> not null means this
+ * {@link PipeTabletEventPlainBatch} can be transferred. the first
element is the leader
+ * endpoint to transfer to (might be null), the second element is the
batch to be transferred.
*/
- public synchronized Pair<TEndPoint, PipeEventBatch> onEvent(final
TabletInsertionEvent event)
- throws IOException, WALPipeException {
+ public synchronized Pair<TEndPoint, PipeTabletEventBatch> onEvent(
+ final TabletInsertionEvent event)
+ throws IOException, WALPipeException, WriteProcessException {
if (!(event instanceof EnrichedEvent)) {
LOGGER.warn(
"Unsupported event {} type {} when building transfer request",
event, event.getClass());
@@ -124,15 +157,16 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) :
null;
}
- final PipeEventBatch batch =
+ final PipeTabletEventPlainBatch batch =
endPointToBatch.computeIfAbsent(
- endPoint, k -> new PipeEventBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes));
+ endPoint,
+ k -> new PipeTabletEventPlainBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes));
return batch.onEvent(event) ? new Pair<>(endPoint, batch) : null;
}
/** Get all batches that have at least 1 event. */
- public synchronized List<Pair<TEndPoint, PipeEventBatch>>
getAllNonEmptyBatches() {
- final List<Pair<TEndPoint, PipeEventBatch>> nonEmptyBatches = new
ArrayList<>();
+ public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>>
getAllNonEmptyBatches() {
+ final List<Pair<TEndPoint, PipeTabletEventBatch>> nonEmptyBatches = new
ArrayList<>();
if (!defaultBatch.isEmpty()) {
nonEmptyBatches.add(new Pair<>(null, defaultBatch));
}
@@ -147,12 +181,12 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
public boolean isEmpty() {
return defaultBatch.isEmpty()
- && endPointToBatch.values().stream().allMatch(PipeEventBatch::isEmpty);
+ &&
endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
}
@Override
public synchronized void close() {
defaultBatch.close();
- endPointToBatch.values().forEach(PipeEventBatch::close);
+ endPointToBatch.values().forEach(PipeTabletEventPlainBatch::close);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
index f6d677389e7..33a98d84e42 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -91,7 +91,7 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
}
}
- private static boolean checkSorted(final Tablet tablet) {
+ public static boolean checkSorted(final Tablet tablet) {
for (int i = 1; i < tablet.rowSize; i++) {
if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
return false;
@@ -100,7 +100,7 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
return true;
}
- private static void sortTablet(final Tablet tablet) {
+ public static void sortTablet(final Tablet tablet) {
/*
* following part of code sort the batch data by time,
* so we can insert continuous data in value list to get a better
performance
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 355d354c385..e964ddffd26 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -65,6 +65,8 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -325,8 +327,13 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
private void doTransfer(final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeInsertionEvent)
throws IoTDBConnectionException, StatementExecutionException {
- for (final Tablet tablet :
pipeInsertNodeInsertionEvent.convertToTablets()) {
- if (pipeInsertNodeInsertionEvent.isAligned()) {
+ final List<Tablet> tablets =
pipeInsertNodeInsertionEvent.convertToTablets();
+ for (int i = 0; i < tablets.size(); ++i) {
+ final Tablet tablet = tablets.get(i);
+ if (Objects.isNull(tablet) || tablet.rowSize == 0) {
+ continue;
+ }
+ if (pipeInsertNodeInsertionEvent.isAligned(i)) {
sessionPool.insertAlignedTablet(tablet);
} else {
sessionPool.insertTablet(tablet);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
index daa70051d59..bf116c36798 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
@@ -46,10 +46,10 @@ import java.util.Arrays;
import java.util.List;
import java.util.Objects;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
@@ -76,7 +76,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder
implements AutoClosea
maxDelayInMs =
parameters.getIntOrDefault(
Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY,
SINK_IOTDB_BATCH_DELAY_KEY),
- CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
+ CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE)
* 1000;
this.consensusGroupId = consensusGroupId;
@@ -85,7 +85,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder
implements AutoClosea
final long requestMaxBatchSizeInBytes =
parameters.getLongOrDefault(
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
- CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
+ CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
allocatedMemoryBlock =
PipeResourceManager.memory()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 3edcedc31d8..b5b4397fd60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -24,21 +24,24 @@ import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeEventBatch;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeTransferBatchReqBuilder;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTransferBatchReqBuilder;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletBatchEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletInsertNodeEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletRawEventHandler;
-import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileInsertionEventHandler;
+import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.PipeConnector;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -49,24 +52,26 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
@@ -112,13 +117,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration
configuration)
throws Exception {
super.customize(parameters, configuration);
-
- // Disable batch mode for retry connector, in case retry events are never
sent again
- final PipeParameters retryParameters =
- new PipeParameters(new HashMap<>(parameters.getAttribute()));
- retryParameters.getAttribute().put(SINK_IOTDB_BATCH_MODE_ENABLE_KEY,
"false");
- retryParameters.getAttribute().put(CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY,
"false");
- retryConnector.customize(retryParameters, configuration);
+ retryConnector.customize(parameters, configuration);
clientManager =
new IoTDBDataNodeAsyncClientManager(
@@ -157,69 +156,104 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
return;
}
- transferWithoutCheck(tabletInsertionEvent);
- }
-
- private void transferWithoutCheck(final TabletInsertionEvent
tabletInsertionEvent)
- throws Exception {
if (isTabletBatchModeEnabled) {
- final Pair<TEndPoint, PipeEventBatch> endPointAndBatch =
+ final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch =
tabletBatchBuilder.onEvent(tabletInsertionEvent);
- if (Objects.nonNull(endPointAndBatch)) {
- transfer(
- endPointAndBatch.getLeft(),
- new
PipeTransferTabletBatchEventHandler(endPointAndBatch.getRight(), this));
- endPointAndBatch.getRight().onSuccess();
+ if (Objects.isNull(endPointAndBatch)) {
+ return;
}
+ transferInBatchWithoutCheck(endPointAndBatch);
} else {
- if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent =
- (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
- IoTDBDataRegionAsyncConnector.class.getName())) {
- pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
- IoTDBDataRegionAsyncConnector.class.getName(), false);
- return;
- }
+ transferInEventWithoutCheck(tabletInsertionEvent);
+ }
+ }
- final InsertNode insertNode =
-
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
- final TPipeTransferReq pipeTransferReq =
- compressIfNeeded(
- Objects.isNull(insertNode)
- ? PipeTransferTabletBinaryReq.toTPipeTransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer())
- :
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
- final PipeTransferTabletInsertNodeEventHandler
pipeTransferInsertNodeReqHandler =
- new PipeTransferTabletInsertNodeEventHandler(
- pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
+ private void transferInBatchWithoutCheck(
+ final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch)
+ throws IOException, WriteProcessException {
+ final PipeTabletEventBatch batch = endPointAndBatch.getRight();
+ if (batch instanceof PipeTabletEventPlainBatch) {
+ transfer(
+ endPointAndBatch.getLeft(),
+ new PipeTransferTabletBatchEventHandler((PipeTabletEventPlainBatch)
batch, this));
+ } else if (batch instanceof PipeTabletEventTsFileBatch) {
+ final PipeTabletEventTsFileBatch tsFileBatch =
(PipeTabletEventTsFileBatch) batch;
+ final List<File> sealedFiles = tsFileBatch.sealTsFiles();
+ final Map<Pair<String, Long>, Double> pipe2WeightMap =
tsFileBatch.deepCopyPipe2WeightMap();
+ final List<EnrichedEvent> events = tsFileBatch.deepCopyEvents();
+ final AtomicInteger eventsReferenceCount = new
AtomicInteger(sealedFiles.size());
+ final AtomicBoolean eventsHadBeenAddedToRetryQueue = new
AtomicBoolean(false);
+
+ for (final File sealedFile : sealedFiles) {
transfer(
- // getDeviceId() may return null for InsertRowsNode
- pipeInsertNodeTabletInsertionEvent.getDeviceId(),
pipeTransferInsertNodeReqHandler);
- } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
- final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
- (PipeRawTabletInsertionEvent) tabletInsertionEvent;
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
- IoTDBDataRegionAsyncConnector.class.getName())) {
- pipeRawTabletInsertionEvent.decreaseReferenceCount(
- IoTDBDataRegionAsyncConnector.class.getName(), false);
- return;
- }
+ new PipeTransferTsFileHandler(
+ this,
+ pipe2WeightMap,
+ events,
+ eventsReferenceCount,
+ eventsHadBeenAddedToRetryQueue,
+ sealedFile,
+ null,
+ false));
+ }
+ } else {
+ LOGGER.warn(
+ "Unsupported batch type {} when transferring tablet insertion
event.", batch.getClass());
+ }
- final TPipeTransferReq pipeTransferTabletRawReq =
- compressIfNeeded(
- PipeTransferTabletRawReq.toTPipeTransferReq(
- pipeRawTabletInsertionEvent.convertToTablet(),
- pipeRawTabletInsertionEvent.isAligned()));
- final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
- new PipeTransferTabletRawEventHandler(
- pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
+ endPointAndBatch.getRight().onSuccess();
+ }
+
+ private void transferInEventWithoutCheck(final TabletInsertionEvent
tabletInsertionEvent)
+ throws Exception {
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent =
+ (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+ IoTDBDataRegionAsyncConnector.class.getName())) {
+ pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
+ IoTDBDataRegionAsyncConnector.class.getName(), false);
+ return;
+ }
- transfer(pipeRawTabletInsertionEvent.getDeviceId(),
pipeTransferTabletReqHandler);
+ final InsertNode insertNode =
+ pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
+ final TPipeTransferReq pipeTransferReq =
+ compressIfNeeded(
+ Objects.isNull(insertNode)
+ ? PipeTransferTabletBinaryReq.toTPipeTransferReq(
+ pipeInsertNodeTabletInsertionEvent.getByteBuffer())
+ :
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
+ final PipeTransferTabletInsertNodeEventHandler
pipeTransferInsertNodeReqHandler =
+ new PipeTransferTabletInsertNodeEventHandler(
+ pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
+
+ transfer(
+ // getDeviceId() may return null for InsertRowsNode
+ pipeInsertNodeTabletInsertionEvent.getDeviceId(),
pipeTransferInsertNodeReqHandler);
+ } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
+ final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
+ (PipeRawTabletInsertionEvent) tabletInsertionEvent;
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
+ IoTDBDataRegionAsyncConnector.class.getName())) {
+ pipeRawTabletInsertionEvent.decreaseReferenceCount(
+ IoTDBDataRegionAsyncConnector.class.getName(), false);
+ return;
}
+
+ final TPipeTransferReq pipeTransferTabletRawReq =
+ compressIfNeeded(
+ PipeTransferTabletRawReq.toTPipeTransferReq(
+ pipeRawTabletInsertionEvent.convertToTablet(),
+ pipeRawTabletInsertionEvent.isAligned()));
+ final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
+ new PipeTransferTabletRawEventHandler(
+ pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
+
+ transfer(pipeRawTabletInsertionEvent.getDeviceId(),
pipeTransferTabletReqHandler);
}
}
@@ -295,11 +329,24 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
throw new
FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
}
- final PipeTransferTsFileInsertionEventHandler
pipeTransferTsFileInsertionEventHandler =
- new
PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent, this);
-
- transfer(pipeTransferTsFileInsertionEventHandler);
- } catch (Exception e) {
+ final PipeTransferTsFileHandler pipeTransferTsFileHandler =
+ new PipeTransferTsFileHandler(
+ this,
+ Collections.singletonMap(
+ new Pair<>(
+ pipeTsFileInsertionEvent.getPipeName(),
+ pipeTsFileInsertionEvent.getCreationTime()),
+ 1.0),
+ Collections.singletonList(pipeTsFileInsertionEvent),
+ new AtomicInteger(1),
+ new AtomicBoolean(false),
+ pipeTsFileInsertionEvent.getTsFile(),
+ pipeTsFileInsertionEvent.getModFile(),
+ pipeTsFileInsertionEvent.isWithMod()
+ && clientManager.supportModsIfIsDataNodeReceiver());
+
+ transfer(pipeTransferTsFileHandler);
+ } catch (final Exception e) {
// Just in case. To avoid the case that exception occurred when
constructing the handler.
pipeTsFileInsertionEvent.decreaseReferenceCount(
IoTDBDataRegionAsyncConnector.class.getName(), false);
@@ -307,15 +354,14 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
}
}
- private void transfer(
- final PipeTransferTsFileInsertionEventHandler
pipeTransferTsFileInsertionEventHandler) {
+ private void transfer(final PipeTransferTsFileHandler
pipeTransferTsFileHandler) {
AsyncPipeDataTransferServiceClient client = null;
try {
client = clientManager.borrowClient();
- pipeTransferTsFileInsertionEventHandler.transfer(clientManager, client);
+ pipeTransferTsFileHandler.transfer(clientManager, client);
} catch (final Exception ex) {
logOnClientException(client, ex);
- pipeTransferTsFileInsertionEventHandler.onError(ex);
+ pipeTransferTsFileHandler.onError(ex);
}
}
@@ -401,27 +447,27 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
}
}
}
+
+ // Trigger cron heartbeat event in retry connector to send batch in time
+ retryConnector.transfer(PipeConnectorSubtask.CRON_HEARTBEAT_EVENT);
}
/** Try its best to commit data in order. Flush can also be a trigger to
transfer batched data. */
- private void transferBatchedEventsIfNecessary() throws IOException {
+ private void transferBatchedEventsIfNecessary() throws IOException,
WriteProcessException {
if (!isTabletBatchModeEnabled || tabletBatchBuilder.isEmpty()) {
return;
}
- for (final Pair<TEndPoint, PipeEventBatch> endPointAndBatch :
+ for (final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch :
tabletBatchBuilder.getAllNonEmptyBatches()) {
- transfer(
- endPointAndBatch.getLeft(),
- new PipeTransferTabletBatchEventHandler(endPointAndBatch.getRight(),
this));
- endPointAndBatch.getRight().onSuccess();
+ transferInBatchWithoutCheck(endPointAndBatch);
}
}
/**
- * Add failure event to retry queue.
+ * Add failure {@link Event} to retry queue.
*
- * @param event event to retry
+ * @param event {@link Event} to retry
*/
public void addFailureEventToRetryQueue(final Event event) {
if (isClosed.get()) {
@@ -444,14 +490,12 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
}
/**
- * Add failure events to retry queue.
+ * Add failure {@link EnrichedEvent}s to retry queue.
*
- * @param events events to retry
+ * @param events {@link EnrichedEvent}s to retry
*/
- public void addFailureEventsToRetryQueue(final Iterable<Event> events) {
- for (final Event event : events) {
- addFailureEventToRetryQueue(event);
- }
+ public void addFailureEventsToRetryQueue(final Iterable<EnrichedEvent>
events) {
+ events.forEach(this::addFailureEventToRetryQueue);
}
public synchronized void clearRetryEventsReferenceCount() {
@@ -463,10 +507,6 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
}
}
- public boolean supportModsIfIsDataNodeReceiver() {
- return clientManager.supportModsIfIsDataNodeReceiver();
- }
-
//////////////////////////// Operations for close
////////////////////////////
/**
@@ -520,7 +560,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
}
});
return count.get();
- } catch (Exception e) {
+ } catch (final Exception e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Failed to get retry event count for pipe {}.", pipeName,
e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index adc508e47e8..fb879c7b1dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -24,10 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeEventBatch;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.connector.util.LeaderCacheUtils;
-import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -49,8 +48,7 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTabletBatchEventHandler.class);
- private final List<Long> requestCommitIds;
- private final List<Event> events;
+ private final List<EnrichedEvent> events;
private final Map<Pair<String, Long>, Long> pipeName2BytesAccumulated;
private final TPipeTransferReq req;
@@ -59,10 +57,9 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
private final IoTDBDataRegionAsyncConnector connector;
public PipeTransferTabletBatchEventHandler(
- final PipeEventBatch batch, final IoTDBDataRegionAsyncConnector
connector)
+ final PipeTabletEventPlainBatch batch, final
IoTDBDataRegionAsyncConnector connector)
throws IOException {
- // Deep copy to keep Ids' and events' reference
- requestCommitIds = batch.deepCopyRequestCommitIds();
+ // Deep copy to keep events' reference
events = batch.deepCopyEvents();
pipeName2BytesAccumulated = batch.deepCopyPipeName2BytesAccumulated();
@@ -111,12 +108,10 @@ public class PipeTransferTabletBatchEventHandler
implements AsyncMethodCallback<
connector.updateLeaderCache(redirectPair.getLeft(),
redirectPair.getRight());
}
- for (final Event event : events) {
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent) event)
-
.decreaseReferenceCount(PipeTransferTabletBatchEventHandler.class.getName(),
true);
- }
- }
+ events.forEach(
+ event ->
+ event.decreaseReferenceCount(
+ PipeTransferTabletBatchEventHandler.class.getName(), true));
} catch (final Exception e) {
onError(e);
}
@@ -127,14 +122,8 @@ public class PipeTransferTabletBatchEventHandler
implements AsyncMethodCallback<
try {
LOGGER.warn(
"Failed to transfer TabletInsertionEvent batch {} (request commit
ids={}).",
- events.stream()
- .map(
- event ->
- event instanceof EnrichedEvent
- ? ((EnrichedEvent) event).coreReportMessage()
- : event.toString())
- .collect(Collectors.toList()),
- requestCommitIds,
+
events.stream().map(EnrichedEvent::coreReportMessage).collect(Collectors.toList()),
+
events.stream().map(EnrichedEvent::getCommitId).collect(Collectors.toList()),
exception);
} finally {
connector.addFailureEventsToRetryQueue(events);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
similarity index 66%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 0c15b3560fe..a5f126d7123 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
@@ -35,8 +36,10 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.commons.io.FileUtils;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,18 +48,29 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
-public class PipeTransferTsFileInsertionEventHandler
- implements AsyncMethodCallback<TPipeTransferResp> {
+public class PipeTransferTsFileHandler implements
AsyncMethodCallback<TPipeTransferResp> {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(PipeTransferTsFileInsertionEventHandler.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
- private final PipeTsFileInsertionEvent event;
+ // Used to transfer the file
private final IoTDBDataRegionAsyncConnector connector;
+ // Used to rate limit the transfer
+ private final Map<Pair<String, Long>, Double> pipeName2WeightMap;
+
+ // The original events to be transferred, can be multiple events if
+ // the file is batched with other events
+ private final List<EnrichedEvent> events;
+ private final AtomicInteger eventsReferenceCount;
+ private final AtomicBoolean eventsHadBeenAddedToRetryQueue;
+
private final File tsFile;
private final File modFile;
private File currentFile;
@@ -74,15 +88,27 @@ public class PipeTransferTsFileInsertionEventHandler
private IoTDBDataNodeAsyncClientManager clientManager;
private AsyncPipeDataTransferServiceClient client;
- public PipeTransferTsFileInsertionEventHandler(
- final PipeTsFileInsertionEvent event, final
IoTDBDataRegionAsyncConnector connector)
+ public PipeTransferTsFileHandler(
+ final IoTDBDataRegionAsyncConnector connector,
+ final Map<Pair<String, Long>, Double> pipeName2WeightMap,
+ final List<EnrichedEvent> events,
+ final AtomicInteger eventsReferenceCount,
+ final AtomicBoolean eventsHadBeenAddedToRetryQueue,
+ final File tsFile,
+ final File modFile,
+ final boolean transferMod)
throws FileNotFoundException {
- this.event = event;
this.connector = connector;
- tsFile = event.getTsFile();
- modFile = event.getModFile();
- transferMod = event.isWithMod() &&
connector.supportModsIfIsDataNodeReceiver();
+ this.pipeName2WeightMap = pipeName2WeightMap;
+
+ this.events = events;
+ this.eventsReferenceCount = eventsReferenceCount;
+ this.eventsHadBeenAddedToRetryQueue = eventsHadBeenAddedToRetryQueue;
+
+ this.tsFile = tsFile;
+ this.modFile = modFile;
+ this.transferMod = transferMod;
currentFile = transferMod ? modFile : tsFile;
readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
@@ -98,7 +124,7 @@ public class PipeTransferTsFileInsertionEventHandler
}
public void transfer(
- IoTDBDataNodeAsyncClientManager clientManager,
+ final IoTDBDataNodeAsyncClientManager clientManager,
final AsyncPipeDataTransferServiceClient client)
throws TException, IOException {
this.clientManager = clientManager;
@@ -134,11 +160,13 @@ public class PipeTransferTsFileInsertionEventHandler
uncompressedReq, connector.getCompressors())
: uncompressedReq;
- connector.rateLimitIfNeeded(
- event.getPipeName(),
- event.getCreationTime(),
- client.getEndPoint(),
- req.getBody().length);
+ pipeName2WeightMap.forEach(
+ (pipePair, weight) ->
+ connector.rateLimitIfNeeded(
+ pipePair.getLeft(),
+ pipePair.getRight(),
+ client.getEndPoint(),
+ (long) (req.getBody().length * weight)));
client.pipeTransfer(req, this);
}
@@ -161,8 +189,13 @@ public class PipeTransferTsFileInsertionEventHandler
uncompressedReq, connector.getCompressors())
: uncompressedReq;
- connector.rateLimitIfNeeded(
- event.getPipeName(), event.getCreationTime(), client.getEndPoint(),
req.getBody().length);
+ pipeName2WeightMap.forEach(
+ (pipePair, weight) ->
+ connector.rateLimitIfNeeded(
+ pipePair.getLeft(),
+ pipePair.getRight(),
+ client.getEndPoint(),
+ (long) (req.getBody().length * weight)));
client.pipeTransfer(req, this);
@@ -194,16 +227,35 @@ public class PipeTransferTsFileInsertionEventHandler
if (reader != null) {
reader.close();
}
+
+ // Delete current file when using tsFile as batch
+ if (events.stream().anyMatch(event -> !(event instanceof
PipeTsFileInsertionEvent))) {
+ FileUtils.delete(currentFile);
+ }
} catch (final IOException e) {
- LOGGER.warn("Failed to close file reader when successfully transferred
file.", e);
+ LOGGER.warn(
+ "Failed to close file reader or delete tsFile when successfully
transferred file.", e);
} finally {
-
event.decreaseReferenceCount(PipeTransferTsFileInsertionEventHandler.class.getName(),
true);
+ final int referenceCount = eventsReferenceCount.decrementAndGet();
+ if (referenceCount <= 0) {
+ events.forEach(
+ event ->
+
event.decreaseReferenceCount(PipeTransferTsFileHandler.class.getName(), true));
+ }
- LOGGER.info(
- "Successfully transferred file {} (committer key={}, commit
id={}).",
- tsFile,
- event.getCommitterKey(),
- event.getCommitId());
+ if (events.size() <= 1 || LOGGER.isDebugEnabled()) {
+ LOGGER.info(
+ "Successfully transferred file {} (committer key={}, commit
id={}, reference count={}).",
+ tsFile,
+
events.stream().map(EnrichedEvent::getCommitterKey).collect(Collectors.toList()),
+
events.stream().map(EnrichedEvent::getCommitId).collect(Collectors.toList()),
+ referenceCount);
+ } else {
+ LOGGER.info(
+ "Successfully transferred file {} (batched TableInsertionEvents,
reference count={}).",
+ tsFile,
+ referenceCount);
+ }
if (client != null) {
client.setShouldReturnSelf(true);
@@ -246,12 +298,19 @@ public class PipeTransferTsFileInsertionEventHandler
@Override
public void onError(final Exception exception) {
try {
- LOGGER.warn(
- "Failed to transfer TsFileInsertionEvent {} (committer key {},
commit id {}).",
- tsFile,
- event.getCommitterKey(),
- event.getCommitId(),
- exception);
+ if (events.size() <= 1 || LOGGER.isDebugEnabled()) {
+ LOGGER.warn(
+ "Failed to transfer TsFileInsertionEvent {} (committer key {},
commit id {}).",
+ tsFile,
+
events.stream().map(EnrichedEvent::getCommitterKey).collect(Collectors.toList()),
+
events.stream().map(EnrichedEvent::getCommitId).collect(Collectors.toList()),
+ exception);
+ } else {
+ LOGGER.warn(
+ "Failed to transfer TsFileInsertionEvent {} (batched
TableInsertionEvents)",
+ tsFile,
+ exception);
+ }
} catch (final Exception e) {
LOGGER.warn("Failed to log error when failed to transfer file.", e);
}
@@ -268,8 +327,13 @@ public class PipeTransferTsFileInsertionEventHandler
if (reader != null) {
reader.close();
}
+
+ // Delete current file when using tsFile as batch
+ if (events.stream().anyMatch(event -> !(event instanceof
PipeTsFileInsertionEvent))) {
+ FileUtils.delete(currentFile);
+ }
} catch (final IOException e) {
- LOGGER.warn("Failed to close file reader when failed to transfer file.",
e);
+ LOGGER.warn("Failed to close file reader or delete tsFile when failed to
transfer file.", e);
} finally {
try {
if (client != null) {
@@ -277,7 +341,9 @@ public class PipeTransferTsFileInsertionEventHandler
client.returnSelf();
}
} finally {
- connector.addFailureEventToRetryQueue(event);
+ if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
+ connector.addFailureEventsToRetryQueue(events);
+ }
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 0509c53beed..3b664ac50c1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -24,8 +24,10 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeEventBatch;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeTransferBatchReqBuilder;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTransferBatchReqBuilder;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
@@ -51,12 +53,17 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -105,10 +112,10 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
try {
if (isTabletBatchModeEnabled) {
- final Pair<TEndPoint, PipeEventBatch> endPointAndBatch =
+ final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch =
tabletBatchBuilder.onEvent(tabletInsertionEvent);
if (Objects.nonNull(endPointAndBatch)) {
- doTransfer(endPointAndBatch);
+ doTransferWrapper(endPointAndBatch);
}
} else {
if (tabletInsertionEvent instanceof
PipeInsertNodeTabletInsertionEvent) {
@@ -139,7 +146,7 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
try {
// In order to commit in order
if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
- doTransfer();
+ doTransferWrapper();
}
doTransferWrapper((PipeTsFileInsertionEvent) tsFileInsertionEvent);
@@ -162,7 +169,7 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
// in order to commit in order
if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
- doTransfer();
+ doTransferWrapper();
}
if (!(event instanceof PipeHeartbeatEvent)) {
@@ -171,10 +178,30 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
}
}
- private void doTransfer(final Pair<TEndPoint, PipeEventBatch>
endPointAndBatch) {
- final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
- clientManager.getClient(endPointAndBatch.getLeft());
- final PipeEventBatch batchToTransfer = endPointAndBatch.getRight();
+ private void doTransferWrapper() throws IOException, WriteProcessException {
+ for (final Pair<TEndPoint, PipeTabletEventBatch> nonEmptyBatch :
+ tabletBatchBuilder.getAllNonEmptyBatches()) {
+ doTransferWrapper(nonEmptyBatch);
+ }
+ }
+
+ private void doTransferWrapper(final Pair<TEndPoint, PipeTabletEventBatch>
endPointAndBatch)
+ throws IOException, WriteProcessException {
+ final PipeTabletEventBatch batch = endPointAndBatch.getRight();
+ if (batch instanceof PipeTabletEventPlainBatch) {
+ doTransfer(endPointAndBatch.getLeft(), (PipeTabletEventPlainBatch)
batch);
+ } else if (batch instanceof PipeTabletEventTsFileBatch) {
+ doTransfer((PipeTabletEventTsFileBatch) batch);
+ } else {
+ LOGGER.warn("Unsupported batch type {}.", batch.getClass());
+ }
+
batch.decreaseEventsReferenceCount(IoTDBDataRegionSyncConnector.class.getName(),
true);
+ batch.onSuccess();
+ }
+
+ private void doTransfer(
+ final TEndPoint endPoint, final PipeTabletEventPlainBatch
batchToTransfer) {
+ final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
clientManager.getClient(endPoint);
final TPipeTransferResp resp;
try {
@@ -217,14 +244,24 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
LeaderCacheUtils.parseRecommendedRedirections(status)) {
clientManager.updateLeaderCache(redirectPair.getLeft(),
redirectPair.getRight());
}
-
- batchToTransfer.decreaseEventsReferenceCount(
- IoTDBDataRegionSyncConnector.class.getName(), true);
- batchToTransfer.onSuccess();
}
- private void doTransfer() {
- tabletBatchBuilder.getAllNonEmptyBatches().forEach(this::doTransfer);
+ private void doTransfer(final PipeTabletEventTsFileBatch batchToTransfer)
+ throws IOException, WriteProcessException {
+ final List<File> sealedFiles = batchToTransfer.sealTsFiles();
+ final Map<Pair<String, Long>, Double> pipe2WeightMap =
batchToTransfer.deepCopyPipe2WeightMap();
+
+ for (final File tsFile : sealedFiles) {
+ doTransfer(pipe2WeightMap, tsFile, null);
+ try {
+ FileUtils.delete(tsFile);
+ } catch (final NoSuchFileException e) {
+ LOGGER.info("The file {} is not found, may already be deleted.",
tsFile);
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to delete batch file {}, this file should be deleted
manually later", tsFile);
+ }
+ }
}
private void doTransferWrapper(
@@ -363,37 +400,49 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
IoTDBDataRegionSyncConnector.class.getName())) {
return;
}
- doTransfer(pipeTsFileInsertionEvent);
+ doTransfer(
+ Collections.singletonMap(
+ new Pair<>(
+ pipeTsFileInsertionEvent.getPipeName(),
+ pipeTsFileInsertionEvent.getCreationTime()),
+ 1.0),
+ pipeTsFileInsertionEvent.getTsFile(),
+ pipeTsFileInsertionEvent.isWithMod() ?
pipeTsFileInsertionEvent.getModFile() : null);
} finally {
pipeTsFileInsertionEvent.decreaseReferenceCount(
IoTDBDataRegionSyncConnector.class.getName(), false);
}
}
- private void doTransfer(final PipeTsFileInsertionEvent
pipeTsFileInsertionEvent)
+ private void doTransfer(
+ final Map<Pair<String, Long>, Double> pipeName2WeightMap,
+ final File tsFile,
+ final File modFile)
throws PipeException, IOException {
- final String pipeName = pipeTsFileInsertionEvent.getPipeName();
- final long creationTime = pipeTsFileInsertionEvent.getCreationTime();
- final File tsFile = pipeTsFileInsertionEvent.getTsFile();
- final File modFile = pipeTsFileInsertionEvent.getModFile();
+
final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
clientManager.getClient();
final TPipeTransferResp resp;
// 1. Transfer tsFile, and mod file if exists and receiver's version >= 2
- if (pipeTsFileInsertionEvent.isWithMod() &&
clientManager.supportModsIfIsDataNodeReceiver()) {
- transferFilePieces(pipeName, creationTime, modFile, clientAndStatus,
true);
- transferFilePieces(pipeName, creationTime, tsFile, clientAndStatus,
true);
+ if (Objects.nonNull(modFile) &&
clientManager.supportModsIfIsDataNodeReceiver()) {
+ transferFilePieces(pipeName2WeightMap, modFile, clientAndStatus, true);
+ transferFilePieces(pipeName2WeightMap, tsFile, clientAndStatus, true);
+
// 2. Transfer file seal signal with mod, which means the file is
transferred completely
try {
final TPipeTransferReq req =
compressIfNeeded(
PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length()));
- rateLimitIfNeeded(
- pipeTsFileInsertionEvent.getPipeName(),
- pipeTsFileInsertionEvent.getCreationTime(),
- clientAndStatus.getLeft().getEndPoint(),
- req.getBody().length);
+
+ pipeName2WeightMap.forEach(
+ (pipePair, weight) ->
+ rateLimitIfNeeded(
+ pipePair.getLeft(),
+ pipePair.getRight(),
+ clientAndStatus.getLeft().getEndPoint(),
+ (long) (req.getBody().length * weight)));
+
resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
@@ -403,17 +452,22 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
e);
}
} else {
- transferFilePieces(pipeName, creationTime, tsFile, clientAndStatus,
false);
+ transferFilePieces(pipeName2WeightMap, tsFile, clientAndStatus, false);
+
// 2. Transfer file seal signal without mod, which means the file is
transferred completely
try {
final TPipeTransferReq req =
compressIfNeeded(
PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(),
tsFile.length()));
- rateLimitIfNeeded(
- pipeTsFileInsertionEvent.getPipeName(),
- pipeTsFileInsertionEvent.getCreationTime(),
- clientAndStatus.getLeft().getEndPoint(),
- req.getBody().length);
+
+ pipeName2WeightMap.forEach(
+ (pipePair, weight) ->
+ rateLimitIfNeeded(
+ pipePair.getLeft(),
+ pipePair.getRight(),
+ clientAndStatus.getLeft().getEndPoint(),
+ (long) (req.getBody().length * weight)));
+
resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
index 93ec5e4ea43..878430e027b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.Objects;
public class IoTDBSchemaRegionConnector extends IoTDBDataNodeSyncConnector {
@@ -96,9 +97,17 @@ public class IoTDBSchemaRegionConnector extends
IoTDBDataNodeSyncConnector {
final TPipeTransferResp resp;
// 1. Transfer mTreeSnapshotFile, and tLog file if exists
- transferFilePieces(pipeName, creationTime, mTreeSnapshotFile,
clientAndStatus, true);
+ transferFilePieces(
+ Collections.singletonMap(new Pair<>(pipeName, creationTime), 1.0),
+ mTreeSnapshotFile,
+ clientAndStatus,
+ true);
if (Objects.nonNull(tagLogSnapshotFile)) {
- transferFilePieces(pipeName, creationTime, tagLogSnapshotFile,
clientAndStatus, true);
+ transferFilePieces(
+ Collections.singletonMap(new Pair<>(pipeName, creationTime), 1.0),
+ tagLogSnapshotFile,
+ clientAndStatus,
+ true);
}
// 2. Transfer file seal signal, which means the snapshots are transferred
completely
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index abb0fcb572c..faba8482c4d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -303,8 +303,8 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
/////////////////////////// convertToTablet ///////////////////////////
- public boolean isAligned() {
- return isAligned;
+ public boolean isAligned(final int i) {
+ return initDataContainers().get(i).isAligned();
}
public List<Tablet> convertToTablets() {
@@ -345,7 +345,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
return dataContainers;
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new PipeException("Initialize data container error.", e);
}
}
@@ -362,11 +362,17 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
final List<PipeRawTabletInsertionEvent> events =
- convertToTablets().stream()
+ initDataContainers().stream()
.map(
- tablet ->
+ container ->
new PipeRawTabletInsertionEvent(
- tablet, isAligned, pipeName, creationTime,
pipeTaskMeta, this, false))
+ container.convertToTablet(),
+ container.isAligned(),
+ pipeName,
+ creationTime,
+ pipeTaskMeta,
+ this,
+ false))
.filter(event -> !event.hasNoNeedParsingAndIsEmpty())
.collect(Collectors.toList());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 8030cce9324..6366cf8bb95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeighUtil;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -124,18 +124,19 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
} else {
// We need to create these objects here and remove them later.
deviceIsAlignedMap = readDeviceIsAlignedMap();
- memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
+ memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
// Filter devices that may overlap with pattern first
// to avoid reading all time-series of all devices.
final Set<IDeviceID> devices =
filterDevicesByPattern(deviceIsAlignedMap.keySet());
measurementDataTypeMap = readFilteredFullPathDataTypeMap(devices);
- memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
+ memoryRequiredInBytes +=
+
PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
deviceMeasurementsMap = readFilteredDeviceMeasurementsMap(devices);
memoryRequiredInBytes +=
-
PipeMemoryWeighUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
+
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
}
allocatedMemoryBlock =
PipeResourceManager.memory().forceAllocate(memoryRequiredInBytes);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 68e377ecfd3..8c7bc2c8ed2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -24,10 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,7 +107,8 @@ public class PipeMemoryManager {
synchronized (this) {
final PipeTabletMemoryBlock block =
- (PipeTabletMemoryBlock)
forceAllocate(calculateTabletSizeInBytes(tablet), true);
+ (PipeTabletMemoryBlock)
+
forceAllocate(PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet), true);
usedMemorySizeInBytesOfTablets += block.getMemoryUsageInBytes();
return block;
}
@@ -183,63 +181,6 @@ public class PipeMemoryManager {
return null;
}
- public static long calculateTabletSizeInBytes(Tablet tablet) {
- long totalSizeInBytes = 0;
-
- if (tablet == null) {
- return totalSizeInBytes;
- }
-
- // timestamps
- if (tablet.timestamps != null) {
- totalSizeInBytes += tablet.timestamps.length * 8L;
- }
-
- // values
- final List<MeasurementSchema> timeseries = tablet.getSchemas();
- if (timeseries != null) {
- for (int column = 0; column < timeseries.size(); column++) {
- final MeasurementSchema measurementSchema = timeseries.get(column);
- if (measurementSchema == null) {
- continue;
- }
-
- final TSDataType tsDataType = measurementSchema.getType();
- if (tsDataType == null) {
- continue;
- }
-
- if (tsDataType.isBinary()) {
- if (tablet.values == null || tablet.values.length <= column) {
- continue;
- }
- final Binary[] values = ((Binary[]) tablet.values[column]);
- if (values == null) {
- continue;
- }
- for (Binary value : values) {
- totalSizeInBytes +=
- value == null ? 0 : (value.getLength() == -1 ? 0 :
value.getLength());
- }
- } else {
- totalSizeInBytes += (long) tablet.timestamps.length *
tsDataType.getDataTypeSize();
- }
- }
- }
-
- // bitMaps
- if (tablet.bitMaps != null) {
- for (int i = 0; i < tablet.bitMaps.length; i++) {
- totalSizeInBytes += tablet.bitMaps[i] == null ? 0 :
tablet.bitMaps[i].getSize();
- }
- }
-
- // estimate other dataStructures size
- totalSizeInBytes += 100;
-
- return totalSizeInBytes;
- }
-
public synchronized PipeMemoryBlock tryAllocate(long sizeInBytes) {
return tryAllocate(sizeInBytes, currentSize -> currentSize * 2 / 3);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeighUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
similarity index 56%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeighUtil.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index d93f946a087..f93d8f33fbf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeighUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -23,11 +23,15 @@ import org.apache.iotdb.db.utils.MemUtils;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import java.util.List;
import java.util.Map;
-public class PipeMemoryWeighUtil {
+public class PipeMemoryWeightUtil {
+
/** Estimates memory usage of a {@link Map}<{@link IDeviceID}, {@link
Boolean}>. */
public static long memoryOfIDeviceId2Bool(Map<IDeviceID, Boolean> map) {
long usageInBytes = 0L;
@@ -57,4 +61,61 @@ public class PipeMemoryWeighUtil {
}
return usageInBytes + 16L; // add the overhead of map
}
+
+ public static long calculateTabletSizeInBytes(Tablet tablet) {
+ long totalSizeInBytes = 0;
+
+ if (tablet == null) {
+ return totalSizeInBytes;
+ }
+
+ // timestamps
+ if (tablet.timestamps != null) {
+ totalSizeInBytes += tablet.timestamps.length * 8L;
+ }
+
+ // values
+ final List<MeasurementSchema> timeseries = tablet.getSchemas();
+ if (timeseries != null) {
+ for (int column = 0; column < timeseries.size(); column++) {
+ final MeasurementSchema measurementSchema = timeseries.get(column);
+ if (measurementSchema == null) {
+ continue;
+ }
+
+ final TSDataType tsDataType = measurementSchema.getType();
+ if (tsDataType == null) {
+ continue;
+ }
+
+ if (tsDataType.isBinary()) {
+ if (tablet.values == null || tablet.values.length <= column) {
+ continue;
+ }
+ final Binary[] values = ((Binary[]) tablet.values[column]);
+ if (values == null) {
+ continue;
+ }
+ for (Binary value : values) {
+ totalSizeInBytes +=
+ value == null ? 0 : (value.getLength() == -1 ? 0 :
value.getLength());
+ }
+ } else {
+ totalSizeInBytes += (long) tablet.timestamps.length *
tsDataType.getDataTypeSize();
+ }
+ }
+ }
+
+ // bitMaps
+ if (tablet.bitMaps != null) {
+ for (int i = 0; i < tablet.bitMaps.length; i++) {
+ totalSizeInBytes += tablet.bitMaps[i] == null ? 0 :
tablet.bitMaps[i].getSize();
+ }
+ }
+
+ // estimate other dataStructures size
+ totalSizeInBytes += 100;
+
+ return totalSizeInBytes;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index b49ca5de4e2..090af91fb3c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.resource.tsfile;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeighUtil;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -205,7 +205,8 @@ public class PipeTsFileResource implements AutoCloseable {
try (TsFileSequenceReader sequenceReader =
new TsFileSequenceReader(hardlinkOrCopiedFile.getPath(), true, true)) {
deviceMeasurementsMap = sequenceReader.getDeviceMeasurementsMap();
- memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
+ memoryRequiredInBytes +=
+
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
deviceIsAlignedMap = new HashMap<>();
final TsFileDeviceIterator deviceIsAlignedIterator =
@@ -214,10 +215,10 @@ public class PipeTsFileResource implements AutoCloseable {
final Pair<IDeviceID, Boolean> deviceIsAlignedPair =
deviceIsAlignedIterator.next();
deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(),
deviceIsAlignedPair.getRight());
}
- memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
+ memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
measurementDataTypeMap = sequenceReader.getFullPathDataTypeMap();
- memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
+ memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
}
// Release memory of TsFileSequenceReader.
allocatedMemoryBlock.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index cae63102111..2994f41a3f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -62,7 +62,7 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
// to trigger the general event transfer function, causing potentially such
as
// the random delay of the batch transmission. Therefore, here we inject
cron events
// when no event can be pulled.
- private static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
+ public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
new PipeHeartbeatEvent("cron", false);
private static final long CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS =
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()
* 1000;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
index 935494537b3..67b78e6a046 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
@@ -27,7 +27,7 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.event.SubscriptionEventBinaryCache;
import org.apache.iotdb.pipe.api.event.Event;
@@ -158,7 +158,7 @@ public class SubscriptionPrefetchingTabletsQueue extends
SubscriptionPrefetching
tablets.addAll(currentTablets);
calculatedTabletsSizeInBytes +=
currentTablets.stream()
- .map((PipeMemoryManager::calculateTabletSizeInBytes))
+ .map((PipeMemoryWeightUtil::calculateTabletSizeInBytes))
.reduce(Long::sum)
.orElse(0L);
enrichedEvents.add((EnrichedEvent) event);
@@ -172,7 +172,7 @@ public class SubscriptionPrefetchingTabletsQueue extends
SubscriptionPrefetching
tablets.addAll(currentTablets);
calculatedTabletsSizeInBytes +=
currentTablets.stream()
- .map((PipeMemoryManager::calculateTabletSizeInBytes))
+ .map((PipeMemoryWeightUtil::calculateTabletSizeInBytes))
.reduce(Long::sum)
.orElse(0L);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index dfcf35c03ca..61e773da3e4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -65,11 +65,13 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY =
"connector.batch.max-delay-seconds";
public static final String SINK_IOTDB_BATCH_DELAY_KEY =
"sink.batch.max-delay-seconds";
- public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 1;
+ public static final int CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE = 1;
+ public static final int CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE =
5;
public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY =
"connector.batch.size-bytes";
public static final String SINK_IOTDB_BATCH_SIZE_KEY =
"sink.batch.size-bytes";
- public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
+ public static final long CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE = 16
* MB;
+ public static final long CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE =
80 * MB;
public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
public static final String SINK_IOTDB_USER_KEY = "sink.user";
@@ -191,6 +193,12 @@ public class PipeConnectorConstant {
public static final String SINK_RATE_LIMIT_KEY =
"sink.rate-limit-bytes-per-second";
public static final double CONNECTOR_RATE_LIMIT_DEFAULT_VALUE = -1;
+ public static final String CONNECTOR_FORMAT_KEY = "connector.format";
+ public static final String SINK_FORMAT_KEY = "sink.format";
+ public static final String CONNECTOR_FORMAT_TABLET_VALUE = "tablet";
+ public static final String CONNECTOR_FORMAT_TS_FILE_VALUE = "tsfile";
+ public static final String CONNECTOR_FORMAT_HYBRID_VALUE = "hybrid";
+
public static final String SINK_TOPIC_KEY = "sink.topic";
public static final String SINK_CONSUMER_GROUP_KEY = "sink.consumer-group";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index aa398d6e9ff..6ac6f9432df 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -66,11 +66,17 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_HOST_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
@@ -84,7 +90,9 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_HOST_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
@@ -144,6 +152,18 @@ public abstract class IoTDBConnector implements
PipeConnector {
parameters.hasAttribute(SINK_IOTDB_HOST_KEY),
parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
+ validator.validate(
+ requestMaxBatchSizeInBytes -> (long) requestMaxBatchSizeInBytes > 0,
+ String.format(
+ "%s must be > 0, but got %s",
+ SINK_IOTDB_BATCH_SIZE_KEY,
+ parameters.getLongOrDefault(
+ Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
+ CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE)),
+ parameters.getLongOrDefault(
+ Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
+ CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE));
+
loadBalanceStrategy =
parameters
.getStringOrDefault(
@@ -230,6 +250,15 @@ public abstract class IoTDBConnector implements
PipeConnector {
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE)
.trim()
.toLowerCase());
+
+ validator.validateAttributeValueRange(
+ validator.getParameters().hasAttribute(CONNECTOR_FORMAT_KEY)
+ ? CONNECTOR_FORMAT_KEY
+ : SINK_FORMAT_KEY,
+ true,
+ CONNECTOR_FORMAT_TABLET_VALUE,
+ CONNECTOR_FORMAT_HYBRID_VALUE,
+ CONNECTOR_FORMAT_TS_FILE_VALUE);
}
@Override
@@ -242,8 +271,15 @@ public abstract class IoTDBConnector implements
PipeConnector {
isTabletBatchModeEnabled =
parameters.getBooleanOrDefault(
- Arrays.asList(CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY,
SINK_IOTDB_BATCH_MODE_ENABLE_KEY),
- CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE);
+ Arrays.asList(
+ CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY,
SINK_IOTDB_BATCH_MODE_ENABLE_KEY),
+ CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE)
+ || parameters
+ .getStringOrDefault(
+ Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
+ CONNECTOR_FORMAT_HYBRID_VALUE)
+ .equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
+
LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}",
isTabletBatchModeEnabled);
receiverStatusHandler =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index a51edf46cd4..be93018bf9c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -44,6 +44,7 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
@@ -144,8 +145,7 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
}
protected void transferFilePieces(
- final String pipeName,
- final long creationTime,
+ final Map<Pair<String, Long>, Double> pipe2WeightMap,
final File file,
final Pair<IoTDBSyncClient, Boolean> clientAndStatus,
final boolean isMultiFile)
@@ -171,11 +171,13 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
isMultiFile
? getTransferMultiFilePieceReq(file.getName(), position,
payLoad)
: getTransferSingleFilePieceReq(file.getName(),
position, payLoad));
- rateLimitIfNeeded(
- pipeName,
- creationTime,
- clientAndStatus.getLeft().getEndPoint(),
- req.getBody().length);
+ pipe2WeightMap.forEach(
+ (namePair, weight) ->
+ rateLimitIfNeeded(
+ namePair.getLeft(),
+ namePair.getRight(),
+ clientAndStatus.getLeft().getEndPoint(),
+ (long) (req.getBody().length * weight)));
resp =
PipeTransferFilePieceResp.fromTPipeTransferResp(
clientAndStatus.getLeft().pipeTransfer(req));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 6d68b607294..6d44d26c332 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -161,7 +161,7 @@ public abstract class EnrichedEvent implements Event {
isReleased.set(true);
}
if (newReferenceCount < 0) {
- LOGGER.warn(
+ LOGGER.debug(
"reference count is decreased to {}, event: {}, stack trace: {}",
newReferenceCount,
coreReportMessage(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
index 78e913d6575..a95badf6a94 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
@@ -94,10 +94,15 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
// Notice that the PipeRuntimeConnectorCriticalException must be thrown
here
// because the upper layer relies on this to stop all the related pipe
tasks
// Other exceptions may cause the subtask to stop forever and can not be
restarted
- super.onFailure(
- throwable instanceof PipeRuntimeConnectorCriticalException
- ? throwable
- : new
PipeRuntimeConnectorCriticalException(throwable.getMessage()));
+ if (throwable instanceof PipeRuntimeConnectorCriticalException) {
+ super.onFailure(throwable);
+ } else {
+ // Print stack trace for better debugging
+ LOGGER.warn(
+ "A non PipeRuntimeConnectorCriticalException occurred, will throw a
PipeRuntimeConnectorCriticalException.",
+ throwable);
+ super.onFailure(new
PipeRuntimeConnectorCriticalException(throwable.getMessage()));
+ }
}
/**