This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7640df9a2b9 Pipe: Introduce PipeEventCommitManager to manage event
commit order to avoid losing data during parallel connector scheduling (#11489)
7640df9a2b9 is described below
commit 7640df9a2b91e06b13931429994e72cf0caccc52
Author: Zikun Ma <[email protected]>
AuthorDate: Wed Nov 22 00:13:16 2023 +0800
Pipe: Introduce PipeEventCommitManager to manage event commit order to
avoid losing data during parallel connector scheduling (#11489)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../java/org/apache/iotdb/CountPointProcessor.java | 3 +-
.../db/pipe/agent/plugin/PipePluginAgent.java | 6 +-
.../db/pipe/commit/PipeEventCommitManager.java | 106 +++++++++++++++++++
.../iotdb/db/pipe/commit/PipeEventCommitter.java | 79 ++++++++++++++
.../env/PipeTaskConnectorRuntimeEnvironment.java | 34 +++++++
...oTDBThriftAsyncPipeTransferBatchReqBuilder.java | 49 ---------
...IoTDBThriftSyncPipeTransferBatchReqBuilder.java | 45 +-------
.../builder/PipeTransferBatchReqBuilder.java | 45 ++++++++
.../thrift/async/IoTDBThriftAsyncConnector.java | 113 +++++++--------------
.../PipeTransferTabletBatchEventHandler.java | 13 +--
.../PipeTransferTabletInsertNodeEventHandler.java | 3 +-
.../PipeTransferTabletInsertionEventHandler.java | 39 +++----
.../handler/PipeTransferTabletRawEventHandler.java | 3 +-
.../PipeTransferTsFileInsertionEventHandler.java | 20 ++--
.../protocol/websocket/WebSocketConnector.java | 53 ++--------
.../websocket/WebSocketConnectorServer.java | 83 ++++++++-------
.../apache/iotdb/db/pipe/event/EnrichedEvent.java | 39 ++++++-
.../event/common/heartbeat/PipeHeartbeatEvent.java | 10 +-
.../db/pipe/event/common/row/PipeRowCollector.java | 8 +-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 18 +++-
.../common/tablet/PipeRawTabletInsertionEvent.java | 16 +--
.../tablet/TabletInsertionDataContainer.java | 10 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 17 +++-
.../tsfile/TsFileInsertionDataContainer.java | 14 ++-
.../db/pipe/event/realtime/PipeRealtimeEvent.java | 8 +-
.../execution/executor/PipeSubtaskExecutor.java | 1 +
.../PipeHistoricalDataRegionTsFileExtractor.java | 3 +
.../realtime/assigner/PipeDataRegionAssigner.java | 2 +-
.../legacy/IoTDBLegacyPipeReceiverAgent.java | 5 +-
.../receiver/legacy/loader/DeletionLoader.java | 5 +-
.../pipe/receiver/legacy/loader/TsFileLoader.java | 5 +-
.../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 3 +-
.../pipe/task/connection/PipeEventCollector.java | 10 +-
.../db/pipe/task/stage/PipeTaskConnectorStage.java | 17 +++-
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 2 +-
.../iotdb/db/pipe/task/stage/PipeTaskStage.java | 6 +-
.../subtask/connector/PipeConnectorSubtask.java | 11 ++
.../connector/PipeConnectorSubtaskLifeCycle.java | 51 +++++++++-
.../connector/PipeConnectorSubtaskManager.java | 32 ++++--
.../event/TsFileInsertionDataContainerTest.java | 2 +-
40 files changed, 633 insertions(+), 356 deletions(-)
diff --git
a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
index 13fd6804aab..766c1ce67b0 100644
---
a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
+++
b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
@@ -71,7 +71,8 @@ public class CountPointProcessor implements PipeProcessor {
tablet.rowSize = 1;
tablet.addTimestamp(0, System.currentTimeMillis());
tablet.addValue(aggregateSeries.getMeasurement(), 0,
writePointCount.get());
- eventCollector.collect(new PipeRawTabletInsertionEvent(tablet, false,
null, null, false));
+ eventCollector.collect(
+ new PipeRawTabletInsertionEvent(tablet, false, null, null, null,
false));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 9ea0684e15f..7ee2f99b1ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -218,7 +218,7 @@ public class PipePluginAgent {
try {
temporaryExtractor.close();
} catch (Exception e) {
- LOGGER.warn("Failed to close temporary extractor: {}", e.getMessage());
+ LOGGER.warn("Failed to close temporary extractor: {}", e.getMessage(),
e);
}
}
@@ -231,7 +231,7 @@ public class PipePluginAgent {
try {
temporaryProcessor.close();
} catch (Exception e) {
- LOGGER.warn("Failed to close temporary processor: {}", e.getMessage());
+ LOGGER.warn("Failed to close temporary processor: {}", e.getMessage(),
e);
}
}
@@ -244,7 +244,7 @@ public class PipePluginAgent {
try {
temporaryConnector.close();
} catch (Exception e) {
- LOGGER.warn("Failed to close temporary connector: {}", e.getMessage());
+ LOGGER.warn("Failed to close temporary connector: {}", e.getMessage(),
e);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitManager.java
new file mode 100644
index 00000000000..8305578ba29
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitManager.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.commit;
+
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeEventCommitManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeEventCommitManager.class);
+
+ // key: pipeName_dataRegionId
+ private final Map<String, PipeEventCommitter> eventCommitterMap = new
ConcurrentHashMap<>();
+
+ public void register(String pipeName, int dataRegionId, String
pipePluginName) {
+ if (pipeName == null || pipePluginName == null) {
+ return;
+ }
+
+ final String committerKey = generateCommitterKey(pipeName, dataRegionId);
+ if (eventCommitterMap.containsKey(committerKey)) {
+ LOGGER.warn(
+ "Pipe with same name is already registered on this data region,
overwriting: {}",
+ committerKey);
+ }
+ eventCommitterMap.put(committerKey, new PipeEventCommitter());
+ LOGGER.info("Pipe committer registered for pipe on data region: {}",
committerKey);
+ }
+
+ public void deregister(String pipeName, int dataRegionId) {
+ final String committerKey = generateCommitterKey(pipeName, dataRegionId);
+ eventCommitterMap.remove(committerKey);
+ LOGGER.info("Pipe committer deregistered for pipe on data region: {}",
committerKey);
+ }
+
+ /**
+ * Assign a commit id and a key for commit. Make sure {@code
EnrichedEvent.pipeName} is set before
+ * calling this.
+ */
+ public void enrichWithCommitterKeyAndCommitId(EnrichedEvent event, int
dataRegionId) {
+ if (event == null || event instanceof PipeHeartbeatEvent ||
event.getPipeName() == null) {
+ return;
+ }
+
+ final String committerKey = generateCommitterKey(event.getPipeName(),
dataRegionId);
+ final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
+ if (committer == null) {
+ return;
+ }
+ event.setCommitterKeyAndCommitId(committerKey,
committer.generateCommitId());
+ }
+
+ public void commit(EnrichedEvent event, String committerKey) {
+ if (event == null
+ || event instanceof PipeHeartbeatEvent
+ || event.getCommitId() <= EnrichedEvent.NO_COMMIT_ID
+ || committerKey == null) {
+ return;
+ }
+
+ final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
+ if (committer == null) {
+ return;
+ }
+ committer.commit(event);
+ }
+
+ private static String generateCommitterKey(String pipeName, int
dataRegionId) {
+ return String.format("%s_%s", pipeName, dataRegionId);
+ }
+
+ private PipeEventCommitManager() {
+ // Do nothing but make it private.
+ }
+
+ private static class PipeEventCommitManagerHolder {
+ private static final PipeEventCommitManager INSTANCE = new
PipeEventCommitManager();
+ }
+
+ public static PipeEventCommitManager getInstance() {
+ return PipeEventCommitManagerHolder.INSTANCE;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitter.java
new file mode 100644
index 00000000000..fff2599f099
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.commit;
+
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** Used to queue events for one pipe of one dataRegion to commit in order. */
+public class PipeEventCommitter {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeEventCommitter.class);
+
+ private final AtomicLong commitIdGenerator = new AtomicLong(0);
+ private final AtomicLong lastCommitId = new AtomicLong(0);
+
+ private final PriorityBlockingQueue<EnrichedEvent> commitQueue =
+ new PriorityBlockingQueue<>(
+ 11,
+ Comparator.comparing(
+ event ->
+ Objects.requireNonNull(event, "committable event cannot be
null").getCommitId()));
+
+ PipeEventCommitter() {
+ // Do nothing but make it package-private.
+ }
+
+ public synchronized long generateCommitId() {
+ return commitIdGenerator.incrementAndGet();
+ }
+
+ public synchronized void commit(EnrichedEvent event) {
+ commitQueue.offer(event);
+
+ while (!commitQueue.isEmpty()) {
+ final EnrichedEvent e = commitQueue.peek();
+
+ if (e.getCommitId() <= lastCommitId.get()) {
+ LOGGER.warn(
+ "commit id must be monotonically increasing, lastCommitId: {},
event: {}",
+ lastCommitId.get(),
+ e);
+ commitQueue.poll();
+ continue;
+ }
+
+ if (e.getCommitId() != lastCommitId.get() + 1) {
+ break;
+ }
+
+ e.onCommitted();
+ lastCommitId.incrementAndGet();
+ commitQueue.poll();
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
new file mode 100644
index 00000000000..a8fe2187750
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.config.plugin.env;
+
+public class PipeTaskConnectorRuntimeEnvironment extends
PipeTaskRuntimeEnvironment {
+
+ private final int regionId;
+
+ public PipeTaskConnectorRuntimeEnvironment(String pipeName, long
creationTime, int regionId) {
+ super(pipeName, creationTime);
+ this.regionId = regionId;
+ }
+
+ public int getRegionId() {
+ return regionId;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
index d08b9d50233..1c87115420d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
@@ -19,67 +19,18 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
-import org.apache.iotdb.db.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class IoTDBThriftAsyncPipeTransferBatchReqBuilder extends
PipeTransferBatchReqBuilder {
- protected final List<Long> requestCommitIds = new ArrayList<>();
-
public IoTDBThriftAsyncPipeTransferBatchReqBuilder(PipeParameters
parameters) {
super(parameters);
}
- /**
- * Try offer event into cache if the given event is not duplicated.
- *
- * @param event the given event
- * @return true if the batch can be transferred
- */
- public boolean onEvent(TabletInsertionEvent event, long requestCommitId)
- throws IOException, WALPipeException {
- final TPipeTransferReq req = buildTabletInsertionReq(event);
-
- if (requestCommitIds.isEmpty()
- || !requestCommitIds.get(requestCommitIds.size() -
1).equals(requestCommitId)) {
- reqs.add(req);
-
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent)
event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName());
- }
- events.add(event);
- requestCommitIds.add(requestCommitId);
-
- if (firstEventProcessingTime == Long.MIN_VALUE) {
- firstEventProcessingTime = System.currentTimeMillis();
- }
-
- bufferSize += req.getBody().length;
- }
-
- return bufferSize >= getMaxBatchSizeInBytes()
- || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
- }
-
- public void onSuccess() {
- reqs.clear();
-
- events.clear();
- requestCommitIds.clear();
-
- firstEventProcessingTime = Long.MIN_VALUE;
-
- bufferSize = 0;
- }
-
public List<Event> deepcopyEvents() {
return new ArrayList<>(events);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
index bd6244ed600..4c292749f41 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
@@ -19,15 +19,9 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
-import
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-
-import java.io.IOException;
public class IoTDBThriftSyncPipeTransferBatchReqBuilder extends
PipeTransferBatchReqBuilder {
@@ -35,47 +29,16 @@ public class IoTDBThriftSyncPipeTransferBatchReqBuilder
extends PipeTransferBatc
super(parameters);
}
- /**
- * Try offer event into cache if the given event is not duplicated.
- *
- * @param event the given event
- * @return true if the batch can be transferred
- */
- public boolean onEvent(TabletInsertionEvent event) throws IOException,
WALPipeException {
- final TPipeTransferReq req = buildTabletInsertionReq(event);
-
- if (events.isEmpty() || !events.get(events.size() - 1).equals(event)) {
- reqs.add(req);
-
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent)
event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName());
- }
- events.add(event);
-
- if (firstEventProcessingTime == Long.MIN_VALUE) {
- firstEventProcessingTime = System.currentTimeMillis();
- }
-
- bufferSize += req.getBody().length;
- }
-
- return bufferSize >= getMaxBatchSizeInBytes()
- || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
- }
-
+ @Override
public void onSuccess() {
- reqs.clear();
-
for (final Event event : events) {
if (event instanceof EnrichedEvent) {
((EnrichedEvent) event)
- .decreaseReferenceCount(IoTDBThriftSyncConnector.class.getName(),
true);
+ .decreaseReferenceCount(
+ IoTDBThriftSyncPipeTransferBatchReqBuilder.class.getName(),
true);
}
}
- events.clear();
-
- firstEventProcessingTime = Long.MIN_VALUE;
- bufferSize = 0;
+ super.onSuccess();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index ff0e066dc8a..9367ec2c184 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
@@ -53,6 +54,7 @@ public abstract class PipeTransferBatchReqBuilder implements
AutoCloseable {
protected final List<TPipeTransferReq> reqs = new ArrayList<>();
protected final List<Event> events = new ArrayList<>();
+ protected final List<Long> requestCommitIds = new ArrayList<>();
// limit in delayed time
protected final int maxDelayInMs;
@@ -98,6 +100,49 @@ public abstract class PipeTransferBatchReqBuilder
implements AutoCloseable {
}
}
+ /**
+ * Try offer event into cache if the given event is not duplicated.
+ *
+ * @param event the given event
+ * @return true if the batch can be transferred
+ */
+ public boolean onEvent(TabletInsertionEvent event) throws IOException,
WALPipeException {
+ if (!(event instanceof EnrichedEvent)) {
+ return false;
+ }
+
+ final TPipeTransferReq req = buildTabletInsertionReq(event);
+ final long requestCommitId = ((EnrichedEvent) event).getCommitId();
+
+ if (requestCommitIds.isEmpty()
+ || !requestCommitIds.get(requestCommitIds.size() -
1).equals(requestCommitId)) {
+ reqs.add(req);
+ events.add(event);
+ requestCommitIds.add(requestCommitId);
+
+ ((EnrichedEvent)
event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName());
+
+ if (firstEventProcessingTime == Long.MIN_VALUE) {
+ firstEventProcessingTime = System.currentTimeMillis();
+ }
+
+ bufferSize += req.getBody().length;
+ }
+
+ return bufferSize >= getMaxBatchSizeInBytes()
+ || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
+ }
+
+ public void onSuccess() {
+ reqs.clear();
+ events.clear();
+ requestCommitIds.clear();
+
+ firstEventProcessingTime = Long.MIN_VALUE;
+
+ bufferSize = 0;
+ }
+
public List<TPipeTransferReq> getTPipeTransferReqs() {
return reqs;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 1e34ef7c4be..b51b4284c8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -52,23 +52,17 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.Optional;
-import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
@@ -90,13 +84,14 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
asyncPipeDataTransferClientManager;
private final IoTDBThriftSyncConnector retryConnector = new
IoTDBThriftSyncConnector();
- private final PriorityBlockingQueue<Pair<Long, Event>> retryEventQueue =
- new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
-
- private final AtomicLong commitIdGenerator = new AtomicLong(0);
- private final AtomicLong lastCommitId = new AtomicLong(0);
- private final PriorityQueue<Pair<Long, Runnable>> commitQueue =
- new PriorityQueue<>(Comparator.comparing(o -> o.left));
+ private final PriorityBlockingQueue<Event> retryEventQueue =
+ new PriorityBlockingQueue<>(
+ 11,
+ Comparator.comparing(
+ e ->
+ // Non-enriched events will be put at the front of the queue,
+ // because they are more likely to be lost and need to be
retried first.
+ e instanceof EnrichedEvent ? ((EnrichedEvent)
e).getCommitId() : 0));
private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder;
@@ -176,13 +171,15 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
return;
}
+ final long commitId = ((EnrichedEvent) tabletInsertionEvent).getCommitId();
+
if (isTabletBatchModeEnabled) {
- final long requestCommitId = commitIdGenerator.incrementAndGet();
- if (tabletBatchBuilder.onEvent(tabletInsertionEvent, requestCommitId)) {
+ if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
final PipeTransferTabletBatchEventHandler
pipeTransferTabletBatchEventHandler =
new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
- transfer(requestCommitId, pipeTransferTabletBatchEventHandler);
+ transfer(commitId, pipeTransferTabletBatchEventHandler);
+
tabletBatchBuilder.onSuccess();
}
} else {
@@ -195,13 +192,11 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
pipeInsertNodeTabletInsertionEvent.getByteBuffer())
: PipeTransferTabletInsertNodeReq.toTPipeTransferReq(
pipeInsertNodeTabletInsertionEvent.getInsertNode());
-
- final long requestCommitId = commitIdGenerator.incrementAndGet();
final PipeTransferTabletInsertNodeEventHandler
pipeTransferInsertNodeReqHandler =
new PipeTransferTabletInsertNodeEventHandler(
- requestCommitId, pipeInsertNodeTabletInsertionEvent,
pipeTransferReq, this);
+ pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
- transfer(requestCommitId, pipeTransferInsertNodeReqHandler);
+ transfer(commitId, pipeTransferInsertNodeReqHandler);
} else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
(PipeRawTabletInsertionEvent) tabletInsertionEvent;
@@ -209,13 +204,11 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
PipeTransferTabletRawReq.toTPipeTransferReq(
pipeRawTabletInsertionEvent.convertToTablet(),
pipeRawTabletInsertionEvent.isAligned());
-
- final long requestCommitId = commitIdGenerator.incrementAndGet();
final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
new PipeTransferTabletRawEventHandler(
- requestCommitId, pipeRawTabletInsertionEvent,
pipeTransferTabletRawReq, this);
+ pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
- transfer(requestCommitId, pipeTransferTabletReqHandler);
+ transfer(commitId, pipeTransferTabletReqHandler);
}
}
}
@@ -305,12 +298,10 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
throw new
FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
}
- final long requestCommitId = commitIdGenerator.incrementAndGet();
final PipeTransferTsFileInsertionEventHandler
pipeTransferTsFileInsertionEventHandler =
- new PipeTransferTsFileInsertionEventHandler(
- requestCommitId, pipeTsFileInsertionEvent, this);
+ new PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent,
this);
- transfer(requestCommitId, pipeTransferTsFileInsertionEventHandler);
+ transfer(pipeTsFileInsertionEvent.getCommitId(),
pipeTransferTsFileInsertionEventHandler);
}
private void transfer(
@@ -441,9 +432,7 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
*/
private synchronized void transferQueuedEventsIfNecessary() throws Exception
{
while (!retryEventQueue.isEmpty()) {
- final Pair<Long, Event> queuedEventPair = retryEventQueue.peek();
- final long requestCommitId = queuedEventPair.getLeft();
- final Event event = queuedEventPair.getRight();
+ final Event event = retryEventQueue.peek();
if (event instanceof PipeInsertNodeTabletInsertionEvent) {
retryConnector.transfer((PipeInsertNodeTabletInsertionEvent) event);
@@ -456,7 +445,10 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
"IoTDBThriftAsyncConnector does not support transfer generic
event: {}.", event);
}
- commit(requestCommitId, event instanceof EnrichedEvent ? (EnrichedEvent)
event : null);
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent) event)
+ .decreaseReferenceCount(IoTDBThriftAsyncConnector.class.getName(),
true);
+ }
retryEventQueue.poll();
}
@@ -481,58 +473,23 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
}
/**
- * Commit the event. Decrease the reference count of the event. If the
reference count is 0, the
- * progress index of the event will be recalculated and the resources of the
event will be
- * released.
- *
- * <p>The synchronization is necessary because the commit order must be the
same as the order of
- * the events. Concurrent commit may cause the commit order to be
inconsistent with the order of
- * the events.
+ * Add failure event to retry queue.
*
- * @param requestCommitId commit id of the request
- * @param enrichedEvent event to commit
+ * @param event event to retry
*/
- public synchronized void commit(long requestCommitId, @Nullable
EnrichedEvent enrichedEvent) {
- commitQueue.offer(
- new Pair<>(
- requestCommitId,
- () ->
- Optional.ofNullable(enrichedEvent)
- .ifPresent(
- event ->
- event.decreaseReferenceCount(
- IoTDBThriftAsyncConnector.class.getName(),
true))));
-
- while (!commitQueue.isEmpty()) {
- final Pair<Long, Runnable> committer = commitQueue.peek();
-
- // If the commit id is less than or equals to the last commit id, it
means that
- // the event has been committed before, and has been retried. So the
event can
- // be ignored.
- if (committer.left <= lastCommitId.get()) {
- commitQueue.poll();
- continue;
- }
-
- if (committer.left != lastCommitId.get() + 1) {
- break;
- }
-
- committer.right.run();
- lastCommitId.incrementAndGet();
-
- commitQueue.poll();
- }
+ public void addFailureEventToRetryQueue(Event event) {
+ retryEventQueue.offer(event);
}
/**
- * Add failure event to retry queue.
- *
- * @param requestCommitId commit id of the request
- * @param event event to retry
+ * When a pipe is dropped, the connector maybe reused and will not be
closed. So we just discard
+ * its queued events in the output pipe connector.
*/
- public void addFailureEventToRetryQueue(long requestCommitId, Event event) {
- retryEventQueue.offer(new Pair<>(requestCommitId, event));
+ public synchronized void discardEventsOfPipe(String pipeNameToDrop) {
+ retryEventQueue.removeIf(
+ event ->
+ event instanceof EnrichedEvent
+ && pipeNameToDrop.equals(((EnrichedEvent)
event).getPipeName()));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index a3a316983cf..cb7b8066203 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -73,10 +73,11 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
}
if (response.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- for (int i = 0; i < events.size(); ++i) {
- connector.commit(
- requestCommitIds.get(i),
- events.get(i) instanceof EnrichedEvent ? (EnrichedEvent)
events.get(i) : null);
+ for (final Event event : events) {
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent) event)
+
.decreaseReferenceCount(PipeTransferTabletBatchEventHandler.class.getName(),
true);
+ }
}
} else {
onError(new PipeException(response.getStatus().getMessage()));
@@ -91,8 +92,8 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
requestCommitIds,
exception);
- for (int i = 0; i < events.size(); ++i) {
- connector.addFailureEventToRetryQueue(requestCommitIds.get(i),
events.get(i));
+ for (final Event event : events) {
+ connector.addFailureEventToRetryQueue(event);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index dbdda7df226..c0dcf8670f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -31,11 +31,10 @@ public class PipeTransferTabletInsertNodeEventHandler
extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
public PipeTransferTabletInsertNodeEventHandler(
- long requestCommitId,
PipeInsertNodeTabletInsertionEvent event,
TPipeTransferReq req,
IoTDBThriftAsyncConnector connector) {
- super(requestCommitId, event, req, connector);
+ super(event, req, connector);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 619a0f961b8..d1b0ae18c50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -22,7 +22,7 @@ package
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
-import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -33,39 +33,27 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Optional;
-
public abstract class PipeTransferTabletInsertionEventHandler<E extends
TPipeTransferResp>
implements AsyncMethodCallback<E> {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTabletInsertionEventHandler.class);
- private final long requestCommitId;
- private final Event event;
+ private final TabletInsertionEvent event;
private final TPipeTransferReq req;
private final IoTDBThriftAsyncConnector connector;
protected PipeTransferTabletInsertionEventHandler(
- long requestCommitId,
- Event event,
- TPipeTransferReq req,
- IoTDBThriftAsyncConnector connector) {
- this.requestCommitId = requestCommitId;
+ TabletInsertionEvent event, TPipeTransferReq req,
IoTDBThriftAsyncConnector connector) {
this.event = event;
this.req = req;
this.connector = connector;
- Optional.ofNullable(event)
- .ifPresent(
- e -> {
- if (e instanceof EnrichedEvent) {
- ((EnrichedEvent) e)
- .increaseReferenceCount(
-
PipeTransferTabletInsertionEventHandler.class.getName());
- }
- });
+ if (this.event instanceof EnrichedEvent) {
+ ((EnrichedEvent) this.event)
+
.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName());
+ }
}
public void transfer(AsyncPipeDataTransferServiceClient client) throws
TException {
@@ -84,8 +72,10 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
}
if (response.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- connector.commit(
- requestCommitId, event instanceof EnrichedEvent ? (EnrichedEvent)
event : null);
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent) event)
+
.decreaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName(),
true);
+ }
} else {
onError(new PipeException(response.getStatus().getMessage()));
}
@@ -94,11 +84,12 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
@Override
public void onError(Exception exception) {
LOGGER.warn(
- "Failed to transfer TabletInsertionEvent {} (request commit id={}).",
+ "Failed to transfer TabletInsertionEvent {} (committer key={}, commit
id={}).",
event,
- requestCommitId,
+ event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getCommitterKey() : null,
+ event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId()
: null,
exception);
- connector.addFailureEventToRetryQueue(requestCommitId, event);
+ connector.addFailureEventToRetryQueue(event);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
index db0c5f235a1..4cf8a768048 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
@@ -31,11 +31,10 @@ public class PipeTransferTabletRawEventHandler
extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
public PipeTransferTabletRawEventHandler(
- long requestCommitId,
PipeRawTabletInsertionEvent event,
TPipeTransferReq req,
IoTDBThriftAsyncConnector connector) {
- super(requestCommitId, event, req, connector);
+ super(event, req, connector);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index f3b130eae64..70b11ba2ce5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -48,7 +48,6 @@ public class PipeTransferTsFileInsertionEventHandler
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTsFileInsertionEventHandler.class);
- private final long requestCommitId;
private final PipeTsFileInsertionEvent event;
private final IoTDBThriftAsyncConnector connector;
@@ -64,9 +63,8 @@ public class PipeTransferTsFileInsertionEventHandler
private AsyncPipeDataTransferServiceClient client;
public PipeTransferTsFileInsertionEventHandler(
- long requestCommitId, PipeTsFileInsertionEvent event,
IoTDBThriftAsyncConnector connector)
+ PipeTsFileInsertionEvent event, IoTDBThriftAsyncConnector connector)
throws FileNotFoundException {
- this.requestCommitId = requestCommitId;
this.event = event;
this.connector = connector;
@@ -79,7 +77,7 @@ public class PipeTransferTsFileInsertionEventHandler
isSealSignalSent = new AtomicBoolean(false);
-
event.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName());
+
event.increaseReferenceCount(PipeTransferTsFileInsertionEventHandler.class.getName());
}
public void transfer(AsyncPipeDataTransferServiceClient client) throws
TException, IOException {
@@ -124,10 +122,13 @@ public class PipeTransferTsFileInsertionEventHandler
} catch (IOException e) {
LOGGER.warn("Failed to close file reader when successfully transferred
file.", e);
} finally {
- connector.commit(requestCommitId, event);
+
event.decreaseReferenceCount(PipeTransferTsFileInsertionEventHandler.class.getName(),
true);
LOGGER.info(
- "Successfully transferred file {}. Request commit id is {}.",
tsFile, requestCommitId);
+ "Successfully transferred file {} (committer key={}, commit
id={}).",
+ tsFile,
+ event.getCommitterKey(),
+ event.getCommitId());
if (client != null) {
client.setShouldReturnSelf(true);
@@ -164,9 +165,10 @@ public class PipeTransferTsFileInsertionEventHandler
@Override
public void onError(Exception exception) {
LOGGER.warn(
- "Failed to transfer TsFileInsertionEvent {} (request commit id {}).",
+ "Failed to transfer TsFileInsertionEvent {} (committer key {}, commit
id {}).",
tsFile,
- requestCommitId,
+ event.getCommitterKey(),
+ event.getCommitId(),
exception);
try {
@@ -176,7 +178,7 @@ public class PipeTransferTsFileInsertionEventHandler
} catch (IOException e) {
LOGGER.warn("Failed to close file reader when failed to transfer file.",
e);
} finally {
- connector.addFailureEventToRetryQueue(requestCommitId, event);
+ connector.addFailureEventToRetryQueue(event);
if (client != null) {
client.setShouldReturnSelf(true);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 4fc3c9a41e5..75bd571f0b6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -39,15 +39,13 @@ import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
-import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
public class WebSocketConnector implements PipeConnector {
+
private static final Logger LOGGER =
LoggerFactory.getLogger(WebSocketConnector.class);
private static final Map<Integer, Pair<AtomicInteger,
WebSocketConnectorServer>>
@@ -56,11 +54,6 @@ public class WebSocketConnector implements PipeConnector {
private Integer port;
private WebSocketConnectorServer server;
- public final AtomicLong commitIdGenerator = new AtomicLong(0);
- private final AtomicLong lastCommitId = new AtomicLong(0);
- private final PriorityQueue<Pair<Long, Runnable>> commitQueue =
- new PriorityQueue<>(Comparator.comparing(o -> o.left));
-
@Override
public void validate(PipeParameterValidator validator) throws Exception {
// Do nothing
@@ -108,10 +101,11 @@ public class WebSocketConnector implements PipeConnector {
tabletInsertionEvent);
return;
}
- long commitId = commitIdGenerator.incrementAndGet();
+
((EnrichedEvent) tabletInsertionEvent)
.increaseReferenceCount(WebSocketConnector.class.getName());
- server.addEvent(new Pair<>(commitId, tabletInsertionEvent));
+
+ server.addEvent(tabletInsertionEvent);
}
@Override
@@ -122,11 +116,12 @@ public class WebSocketConnector implements PipeConnector {
tsFileInsertionEvent);
return;
}
+
try {
for (TabletInsertionEvent event :
tsFileInsertionEvent.toTabletInsertionEvents()) {
- long commitId = commitIdGenerator.incrementAndGet();
((EnrichedEvent)
event).increaseReferenceCount(WebSocketConnector.class.getName());
- server.addEvent(new Pair<>(commitId, event));
+
+ server.addEvent(event);
}
} finally {
tsFileInsertionEvent.close();
@@ -159,36 +154,8 @@ public class WebSocketConnector implements PipeConnector {
}
}
- public synchronized void commit(long requestCommitId, @Nullable
EnrichedEvent enrichedEvent) {
- commitQueue.offer(
- new Pair<>(
- requestCommitId,
- () ->
- Optional.ofNullable(enrichedEvent)
- .ifPresent(
- event ->
- event.decreaseReferenceCount(
- WebSocketConnector.class.getName(), true))));
-
- while (!commitQueue.isEmpty()) {
- final Pair<Long, Runnable> committer = commitQueue.peek();
-
- // If the commit id is less than or equals to the last commit id, it
means that
- // the event has been committed before, and has been retried. So the
event can
- // be ignored.
- if (committer.left <= lastCommitId.get()) {
- commitQueue.poll();
- continue;
- }
-
- if (committer.left != lastCommitId.get() + 1) {
- break;
- }
-
- committer.right.run();
- lastCommitId.incrementAndGet();
-
- commitQueue.poll();
- }
+ public synchronized void commit(@Nullable EnrichedEvent enrichedEvent) {
+ Optional.ofNullable(enrichedEvent)
+ .ifPresent(event ->
event.decreaseReferenceCount(WebSocketConnector.class.getName(), true));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
index cb4aaef0f97..3ba0f7feb89 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
@@ -40,14 +40,18 @@ import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
public class WebSocketConnectorServer extends WebSocketServer {
+
private static final Logger LOGGER =
LoggerFactory.getLogger(WebSocketConnectorServer.class);
- private final PriorityBlockingQueue<Pair<Long, Event>> events =
+
+ private final AtomicLong eventIdGenerator = new AtomicLong(0);
+ private final PriorityBlockingQueue<Pair<Long, Event>>
eventsWaitingForTransfer =
new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
- private final WebSocketConnector websocketConnector;
+ private final ConcurrentMap<Long, Event> eventsWaitingForAck = new
ConcurrentHashMap<>();
- private final ConcurrentMap<Long, Event> eventMap = new
ConcurrentHashMap<>();
+ private final WebSocketConnector websocketConnector;
public WebSocketConnectorServer(
InetSocketAddress address, WebSocketConnector websocketConnector) {
@@ -120,12 +124,12 @@ public class WebSocketConnectorServer extends
WebSocketServer {
LOGGER.info(log);
}
- public void addEvent(Pair<Long, Event> event) {
- if (events.size() >= 5) {
- synchronized (events) {
- while (events.size() >= 5) {
+ public void addEvent(Event event) {
+ if (eventsWaitingForTransfer.size() >= 5) {
+ synchronized (eventsWaitingForTransfer) {
+ while (eventsWaitingForTransfer.size() >= 5) {
try {
- events.wait();
+ eventsWaitingForTransfer.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PipeException(e.getMessage());
@@ -133,22 +137,22 @@ public class WebSocketConnectorServer extends
WebSocketServer {
}
}
}
- events.put(event);
+
+ eventsWaitingForTransfer.put(new
Pair<>(eventIdGenerator.incrementAndGet(), event));
}
private void handleStart(WebSocket webSocket) {
try {
while (true) {
- Pair<Long, Event> eventPair = events.take();
- synchronized (events) {
- events.notifyAll();
+ Pair<Long, Event> eventPair = eventsWaitingForTransfer.take();
+ synchronized (eventsWaitingForTransfer) {
+ eventsWaitingForTransfer.notifyAll();
}
boolean transferred = transfer(eventPair, webSocket);
if (transferred) {
break;
} else {
websocketConnector.commit(
- eventPair.getLeft(),
eventPair.getRight() instanceof EnrichedEvent
? (EnrichedEvent) eventPair.getRight()
: null);
@@ -162,30 +166,6 @@ public class WebSocketConnectorServer extends
WebSocketServer {
}
}
- private void handleAck(WebSocket webSocket, String s) {
- long commitId = Long.parseLong(s.replace("ACK:", ""));
- Event event = eventMap.remove(commitId);
- if (event != null) {
- websocketConnector.commit(
- commitId, event instanceof EnrichedEvent ? (EnrichedEvent) event :
null);
- }
- handleStart(webSocket);
- }
-
- private void handleError(WebSocket webSocket, String s) {
- long commitId = Long.parseLong(s.replace("ERROR:", ""));
- String log =
- String.format(
- "The tablet of commitId: %d can't be parsed by client, it will be
retried later.",
- commitId);
- LOGGER.warn(log);
- Event event = eventMap.remove(commitId);
- if (event != null) {
- events.put(new Pair<>(commitId, event));
- }
- handleStart(webSocket);
- }
-
private boolean transfer(Pair<Long, Event> eventPair, WebSocket webSocket) {
Long commitId = eventPair.getLeft();
Event event = eventPair.getRight();
@@ -203,16 +183,41 @@ public class WebSocketConnectorServer extends
WebSocketServer {
if (tabletBuffer == null) {
return false;
}
+
ByteBuffer payload = ByteBuffer.allocate(Long.BYTES +
tabletBuffer.limit());
payload.putLong(commitId);
payload.put(tabletBuffer);
payload.flip();
+
this.broadcast(payload, Collections.singletonList(webSocket));
- eventMap.put(eventPair.getLeft(), eventPair.getRight());
+ eventsWaitingForAck.put(eventPair.getLeft(), eventPair.getRight());
} catch (Exception e) {
- events.put(eventPair);
+ eventsWaitingForTransfer.put(eventPair);
throw new PipeException(e.getMessage());
}
return true;
}
+
+ private void handleAck(WebSocket webSocket, String s) {
+ long commitId = Long.parseLong(s.replace("ACK:", ""));
+ Event event = eventsWaitingForAck.remove(commitId);
+ if (event != null) {
+ websocketConnector.commit(event instanceof EnrichedEvent ?
(EnrichedEvent) event : null);
+ }
+ handleStart(webSocket);
+ }
+
+ private void handleError(WebSocket webSocket, String s) {
+ long commitId = Long.parseLong(s.replace("ERROR:", ""));
+ String log =
+ String.format(
+ "The tablet of commitId: %d can't be parsed by client, it will be
retried later.",
+ commitId);
+ LOGGER.warn(log);
+ Event event = eventsWaitingForAck.remove(commitId);
+ if (event != null) {
+ eventsWaitingForTransfer.put(new Pair<>(commitId, event));
+ }
+ handleStart(webSocket);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 801b92cc51f..b01cd181ccc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.commit.PipeEventCommitManager;
import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
import org.apache.iotdb.pipe.api.event.Event;
@@ -42,15 +43,23 @@ public abstract class EnrichedEvent implements Event {
private final AtomicInteger referenceCount;
+ protected final String pipeName;
protected final PipeTaskMeta pipeTaskMeta;
+ private String committerKey;
+ public static final long NO_COMMIT_ID = -1;
+ private long commitId = NO_COMMIT_ID;
+
private final String pattern;
protected boolean isPatternParsed;
protected boolean isTimeParsed = true;
- protected EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) {
+ private boolean shouldReportOnCommit = false;
+
+ protected EnrichedEvent(String pipeName, PipeTaskMeta pipeTaskMeta, String
pattern) {
referenceCount = new AtomicInteger(0);
+ this.pipeName = pipeName;
this.pipeTaskMeta = pipeTaskMeta;
this.pattern = pattern;
isPatternParsed =
getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
@@ -97,8 +106,9 @@ public abstract class EnrichedEvent implements Event {
if (referenceCount.get() == 1) {
isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
if (shouldReport) {
- reportProgress();
+ shouldReportOnCommit = true;
}
+ PipeEventCommitManager.getInstance().commit(this, committerKey);
}
final int newReferenceCount = referenceCount.decrementAndGet();
if (newReferenceCount < 0) {
@@ -154,6 +164,10 @@ public abstract class EnrichedEvent implements Event {
return referenceCount.get();
}
+ public final String getPipeName() {
+ return pipeName;
+ }
+
/**
* Get the pattern of this event.
*
@@ -176,7 +190,7 @@ public abstract class EnrichedEvent implements Event {
}
public abstract EnrichedEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- PipeTaskMeta pipeTaskMeta, String pattern);
+ String pipeName, PipeTaskMeta pipeTaskMeta, String pattern);
public void reportException(PipeRuntimeException pipeRuntimeException) {
if (pipeTaskMeta != null) {
@@ -187,4 +201,23 @@ public abstract class EnrichedEvent implements Event {
}
public abstract boolean isGeneratedByPipe();
+
+ public void setCommitterKeyAndCommitId(String committerKey, long commitId) {
+ this.committerKey = committerKey;
+ this.commitId = commitId;
+ }
+
+ public String getCommitterKey() {
+ return committerKey;
+ }
+
+ public long getCommitId() {
+ return commitId;
+ }
+
+ public void onCommitted() {
+ if (shouldReportOnCommit) {
+ reportProgress();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index 2c54d93401a..e94e43e61fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -68,17 +68,18 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private final boolean shouldPrintMessage;
public PipeHeartbeatEvent(String dataRegionId, boolean shouldPrintMessage) {
- super(null, null);
+ super(null, null, null);
this.dataRegionId = dataRegionId;
this.shouldPrintMessage = shouldPrintMessage;
}
public PipeHeartbeatEvent(
+ String pipeName,
PipeTaskMeta pipeTaskMeta,
String dataRegionId,
long timePublished,
boolean shouldPrintMessage) {
- super(pipeTaskMeta, null);
+ super(pipeName, pipeTaskMeta, null);
this.dataRegionId = dataRegionId;
this.timePublished = timePublished;
this.shouldPrintMessage = shouldPrintMessage;
@@ -106,9 +107,10 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- PipeTaskMeta pipeTaskMeta, String pattern) {
+ String pipeName, PipeTaskMeta pipeTaskMeta, String pattern) {
// Should record PipeTaskMeta, for sometimes HeartbeatEvents should report
exceptions.
- return new PipeHeartbeatEvent(pipeTaskMeta, dataRegionId, timePublished,
shouldPrintMessage);
+ return new PipeHeartbeatEvent(
+ pipeName, pipeTaskMeta, dataRegionId, timePublished,
shouldPrintMessage);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index 76c4016936f..aea3964399b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -95,7 +95,13 @@ public class PipeRowCollector implements RowCollector {
private void collectTabletInsertionEvent() {
if (tablet != null) {
tabletInsertionEventList.add(
- new PipeRawTabletInsertionEvent(tablet, isAligned, pipeTaskMeta,
sourceEvent, false));
+ new PipeRawTabletInsertionEvent(
+ tablet,
+ isAligned,
+ sourceEvent == null ? null : sourceEvent.getPipeName(),
+ pipeTaskMeta,
+ sourceEvent,
+ false));
}
this.tablet = null;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 9bb99a21bb2..cdc689e6ac0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -57,7 +57,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
ProgressIndex progressIndex,
boolean isAligned,
boolean isGeneratedByPipe) {
- this(walEntryHandler, progressIndex, isAligned, isGeneratedByPipe, null,
null);
+ this(walEntryHandler, progressIndex, isAligned, isGeneratedByPipe, null,
null, null);
}
private PipeInsertNodeTabletInsertionEvent(
@@ -65,9 +65,10 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
ProgressIndex progressIndex,
boolean isAligned,
boolean isGeneratedByPipe,
+ String pipeName,
PipeTaskMeta pipeTaskMeta,
String pattern) {
- super(pipeTaskMeta, pattern);
+ super(pipeName, pipeTaskMeta, pattern);
this.walEntryHandler = walEntryHandler;
this.progressIndex = progressIndex;
this.isAligned = isAligned;
@@ -129,9 +130,15 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public PipeInsertNodeTabletInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- PipeTaskMeta pipeTaskMeta, String pattern) {
+ String pipeName, PipeTaskMeta pipeTaskMeta, String pattern) {
return new PipeInsertNodeTabletInsertionEvent(
- walEntryHandler, progressIndex, isAligned, isGeneratedByPipe,
pipeTaskMeta, pattern);
+ walEntryHandler,
+ progressIndex,
+ isAligned,
+ isGeneratedByPipe,
+ pipeName,
+ pipeTaskMeta,
+ pattern);
}
@Override
@@ -188,7 +195,8 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
/////////////////////////// parsePattern ///////////////////////////
public TabletInsertionEvent parseEventWithPattern() {
- return new PipeRawTabletInsertionEvent(convertToTablet(), isAligned,
pipeTaskMeta, this, true);
+ return new PipeRawTabletInsertionEvent(
+ convertToTablet(), isAligned, pipeName, pipeTaskMeta, this, true);
}
/////////////////////////// Object ///////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 2ad67057995..91269fd1541 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -51,9 +51,10 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
boolean isAligned,
EnrichedEvent sourceEvent,
boolean needToReport,
+ String pipeName,
PipeTaskMeta pipeTaskMeta,
String pattern) {
- super(pipeTaskMeta, pattern);
+ super(pipeName, pipeTaskMeta, pattern);
this.tablet = Objects.requireNonNull(tablet);
this.isAligned = isAligned;
this.sourceEvent = sourceEvent;
@@ -63,20 +64,21 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
public PipeRawTabletInsertionEvent(
Tablet tablet,
boolean isAligned,
+ String pipeName,
PipeTaskMeta pipeTaskMeta,
EnrichedEvent sourceEvent,
boolean needToReport) {
- this(tablet, isAligned, sourceEvent, needToReport, pipeTaskMeta, null);
+ this(tablet, isAligned, sourceEvent, needToReport, pipeName, pipeTaskMeta,
null);
}
@TestOnly
public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned) {
- this(tablet, isAligned, null, false, null, null);
+ this(tablet, isAligned, null, false, null, null, null);
}
@TestOnly
public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned, String
pattern) {
- this(tablet, isAligned, null, false, null, pattern);
+ this(tablet, isAligned, null, false, null, null, pattern);
}
@Override
@@ -105,9 +107,9 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- PipeTaskMeta pipeTaskMeta, String pattern) {
+ String pipeName, PipeTaskMeta pipeTaskMeta, String pattern) {
return new PipeRawTabletInsertionEvent(
- tablet, isAligned, sourceEvent, needToReport, pipeTaskMeta, pattern);
+ tablet, isAligned, sourceEvent, needToReport, pipeName, pipeTaskMeta,
pattern);
}
@Override
@@ -162,6 +164,6 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
public TabletInsertionEvent parseEventWithPattern() {
return new PipeRawTabletInsertionEvent(
- convertToTablet(), isAligned, pipeTaskMeta, this, needToReport);
+ convertToTablet(), isAligned, pipeName, pipeTaskMeta, this,
needToReport);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 3e7506adef3..09d8cdea112 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.event.common.tablet;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector;
@@ -64,10 +65,6 @@ public class TabletInsertionDataContainer {
private Tablet tablet;
- public TabletInsertionDataContainer(InsertNode insertNode, String pattern) {
- this(null, null, insertNode, pattern);
- }
-
public TabletInsertionDataContainer(
PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent, InsertNode
insertNode, String pattern) {
this.pipeTaskMeta = pipeTaskMeta;
@@ -95,6 +92,11 @@ public class TabletInsertionDataContainer {
parse(tablet, isAligned, pattern);
}
+ @TestOnly
+ public TabletInsertionDataContainer(InsertNode insertNode, String pattern) {
+ this(null, null, insertNode, pattern);
+ }
+
//////////////////////////// parse ////////////////////////////
private void parse(InsertRowNode insertRowNode, String pattern) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 0acefd60f54..7f49919eb32 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -58,19 +58,29 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
public PipeTsFileInsertionEvent(
TsFileResource resource, boolean isLoaded, boolean isGeneratedByPipe) {
- this(resource, isLoaded, isGeneratedByPipe, null, null, Long.MIN_VALUE,
Long.MAX_VALUE, false);
+ this(
+ resource,
+ isLoaded,
+ isGeneratedByPipe,
+ null,
+ null,
+ null,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ false);
}
public PipeTsFileInsertionEvent(
TsFileResource resource,
boolean isLoaded,
boolean isGeneratedByPipe,
+ String pipeName,
PipeTaskMeta pipeTaskMeta,
String pattern,
long startTime,
long endTime,
boolean needParseTime) {
- super(pipeTaskMeta, pattern);
+ super(pipeName, pipeTaskMeta, pattern);
this.startTime = startTime;
this.endTime = endTime;
@@ -176,11 +186,12 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
@Override
public PipeTsFileInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- PipeTaskMeta pipeTaskMeta, String pattern) {
+ String pipeName, PipeTaskMeta pipeTaskMeta, String pattern) {
return new PipeTsFileInsertionEvent(
resource,
isLoaded,
isGeneratedByPipe,
+ pipeName,
pipeTaskMeta,
pattern,
startTime,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 664f0f6804e..10b746b3f36 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -210,12 +210,22 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
if (!hasNext()) {
next =
new PipeRawTabletInsertionEvent(
- tablet, isAligned, pipeTaskMeta, sourceEvent, true);
+ tablet,
+ isAligned,
+ sourceEvent != null ? sourceEvent.getPipeName() : null,
+ pipeTaskMeta,
+ sourceEvent,
+ true);
close();
} else {
next =
new PipeRawTabletInsertionEvent(
- tablet, isAligned, pipeTaskMeta, sourceEvent, false);
+ tablet,
+ isAligned,
+ sourceEvent != null ? sourceEvent.getPipeName() : null,
+ pipeTaskMeta,
+ sourceEvent,
+ false);
}
return next;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 600df727f2c..4375b598b6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -45,7 +45,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
// pipeTaskMeta is used to report the progress of the event, the
PipeRealtimeEvent
// is only used in the realtime event extractor, which does not need to
report the progress
// of the event, so the pipeTaskMeta is always null.
- super(null, pattern);
+ super(event != null ? event.getPipeName() : null, null, pattern);
this.event = event;
this.tsFileEpoch = tsFileEpoch;
@@ -61,7 +61,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
// pipeTaskMeta is used to report the progress of the event, the
PipeRealtimeEvent
// is only used in the realtime event extractor, which does not need to
report the progress
// of the event, so the pipeTaskMeta is always null.
- super(pipeTaskMeta, pattern);
+ super(event != null ? event.getPipeName() : null, pipeTaskMeta, pattern);
this.event = event;
this.tsFileEpoch = tsFileEpoch;
@@ -136,9 +136,9 @@ public class PipeRealtimeEvent extends EnrichedEvent {
@Override
public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- PipeTaskMeta pipeTaskMeta, String pattern) {
+ String pipeName, PipeTaskMeta pipeTaskMeta, String pattern) {
return new PipeRealtimeEvent(
-
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeTaskMeta,
pattern),
+ event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeName,
pipeTaskMeta, pattern),
this.tsFileEpoch,
this.device2Measurements,
pipeTaskMeta,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index d1befd3e08a..0dadae365f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -112,6 +112,7 @@ public abstract class PipeSubtaskExecutor {
if (subtask != null) {
try {
subtask.close();
+ LOGGER.info("The subtask {} is closed successfully.", subTaskID);
} catch (Exception e) {
LOGGER.error("Failed to close the subtask {}.", subTaskID, e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 9c9fb01ff25..522622bca6c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -72,6 +72,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
private static final Map<Integer, Long>
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap<>();
private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000;
+ private String pipeName;
private PipeTaskMeta pipeTaskMeta;
private ProgressIndex startIndex;
@@ -100,6 +101,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
final PipeTaskExtractorRuntimeEnvironment environment =
(PipeTaskExtractorRuntimeEnvironment)
configuration.getRuntimeEnvironment();
+ pipeName = environment.getPipeName();
pipeTaskMeta = environment.getPipeTaskMeta();
startIndex = environment.getPipeTaskMeta().getProgressIndex();
@@ -351,6 +353,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
resource,
false,
false,
+ pipeName,
pipeTaskMeta,
pattern,
historicalDataExtractionStartTime,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
index 0042c9d3c91..279f20f2a0c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
@@ -69,7 +69,7 @@ public class PipeDataRegionAssigner {
final PipeRealtimeEvent copiedEvent =
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- extractor.getPipeTaskMeta(), extractor.getPattern());
+ extractor.getPipeName(), extractor.getPipeTaskMeta(),
extractor.getPattern());
copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
extractor.extract(copiedEvent);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
index bc5b79e4c79..8bb22523e51 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -24,10 +24,12 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.legacy.PipeData;
import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData;
import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
@@ -48,6 +50,7 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.time.ZoneId;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -136,7 +139,7 @@ public class IoTDBLegacyPipeReceiverAgent {
.execute(
statement,
queryId,
- null,
+ new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault().getId()),
"",
partitionFetcher,
schemaFetcher,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
index f7c2b188277..3f779641530 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
@@ -20,9 +20,11 @@
package org.apache.iotdb.db.pipe.receiver.legacy.loader;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -35,6 +37,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.ZoneId;
import java.util.Collections;
/** This loader is used to load deletion plan. */
@@ -61,7 +64,7 @@ public class DeletionLoader implements ILoader {
.execute(
statement,
queryId,
- null,
+ new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault().getId()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
index 19e2cf5d080..6288d74371c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.db.pipe.receiver.legacy.loader;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
@@ -34,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.time.ZoneId;
/** This loader is used to load tsFiles. If .mods file exists, it will be
loaded as well. */
public class TsFileLoader implements ILoader {
@@ -63,7 +66,7 @@ public class TsFileLoader implements ILoader {
.execute(
statement,
queryId,
- null,
+ new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault().getId()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 57ed8834a72..3621158e1b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -55,7 +55,8 @@ public class PipeTaskBuilder {
new PipeTaskConnectorStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
- pipeStaticMeta.getConnectorParameters());
+ pipeStaticMeta.getConnectorParameters(),
+ dataRegionId);
// The processor connects the extractor and connector.
final PipeTaskProcessorStage processorStage =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index cfb0aabeb9d..b8a5042707f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.task.connection;
+import org.apache.iotdb.db.pipe.commit.PipeEventCommitManager;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -33,10 +34,13 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
private final EnrichedDeque<Event> bufferQueue;
+ private final int dataRegionId;
+
private final AtomicBoolean isClosed = new AtomicBoolean(false);
- public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
+ public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue,
int dataRegionId) {
this.pendingQueue = pendingQueue;
+ this.dataRegionId = dataRegionId;
bufferQueue = new EnrichedDeque<>(new LinkedList<>());
}
@@ -44,6 +48,10 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
public synchronized void collect(Event event) {
if (event instanceof EnrichedEvent) {
((EnrichedEvent)
event).increaseReferenceCount(PipeEventCollector.class.getName());
+
+ // Assign a commit id for this event in order to report progress in
order.
+ PipeEventCommitManager.getInstance()
+ .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event,
dataRegionId);
}
if (event instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) event).recordBufferQueueSize(bufferQueue);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 131b2e7c32e..a78dcfe9ed8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -19,7 +19,8 @@
package org.apache.iotdb.db.pipe.task.stage;
-import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager;
@@ -29,12 +30,19 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
public class PipeTaskConnectorStage extends PipeTaskStage {
+ private final String pipeName;
+ private final int dataRegionId;
protected final PipeParameters pipeConnectorParameters;
protected String connectorSubtaskId;
public PipeTaskConnectorStage(
- String pipeName, long creationTime, PipeParameters
pipeConnectorParameters) {
+ String pipeName,
+ long creationTime,
+ PipeParameters pipeConnectorParameters,
+ TConsensusGroupId dataRegionId) {
+ this.pipeName = pipeName;
+ this.dataRegionId = dataRegionId.getId();
this.pipeConnectorParameters = pipeConnectorParameters;
connectorSubtaskId =
@@ -42,7 +50,8 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
.register(
PipeSubtaskExecutorManager.getInstance().getConnectorSubtaskExecutor(),
pipeConnectorParameters,
- new PipeTaskRuntimeEnvironment(pipeName, creationTime));
+ new PipeTaskConnectorRuntimeEnvironment(
+ this.pipeName, creationTime, this.dataRegionId));
}
@Override
@@ -62,7 +71,7 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
@Override
public void dropSubtask() throws PipeException {
- PipeConnectorSubtaskManager.instance().deregister(connectorSubtaskId);
+ PipeConnectorSubtaskManager.instance().deregister(pipeName, dataRegionId,
connectorSubtaskId);
}
public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index d050594a9ff..4bcf0e44c76 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -83,7 +83,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
// old one, so we need creationTime to make their hash code different in
the map.
final String taskId = pipeName + "_" + dataRegionId.getId() + "_" +
creationTime;
final PipeEventCollector pipeConnectorOutputEventCollector =
- new PipeEventCollector(pipeConnectorOutputPendingQueue);
+ new PipeEventCollector(pipeConnectorOutputPendingQueue,
dataRegionId.getId());
this.pipeProcessorSubtask =
new PipeProcessorSubtask(
taskId,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
index 40d257480be..e16c0448274 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -130,7 +130,11 @@ public abstract class PipeTaskStage {
return;
}
- // status == PipeStatus.RUNNING or PipeStatus.STOPPED, drop the connector
+ // MUST stop the subtask before dropping it, otherwise
+ // the subtask might be in an inconsistent state!
+ stop();
+
+ // status == PipeStatus.STOPPED, drop the connector
dropSubtask();
status = PipeStatus.DROPPED;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index c9b8c20a3db..b8acefbada3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask.connector;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
@@ -332,6 +333,16 @@ public class PipeConnectorSubtask extends PipeSubtask {
}
}
+ /**
+ * When a pipe is dropped, the connector maybe reused and will not be
closed. So we just discard
+ * its queued events in the output pipe connector.
+ */
+ public void discardEventsOfPipe(String pipeNameToDrop) {
+ if (outputPipeConnector instanceof IoTDBThriftAsyncConnector) {
+ ((IoTDBThriftAsyncConnector)
outputPipeConnector).discardEventsOfPipe(pipeNameToDrop);
+ }
+ }
+
//////////////////////////// APIs provided for metric framework
////////////////////////////
public String getAttributeSortedString() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index 854811028a1..abee2971a11 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -23,8 +23,13 @@ import
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConnectorSubtaskLifeCycle.class);
+
private final PipeConnectorSubtaskExecutor executor;
private final PipeConnectorSubtask subtask;
private final BoundedBlockingPendingQueue<Event> pendingQueue;
@@ -56,51 +61,89 @@ public class PipeConnectorSubtaskLifeCycle implements
AutoCloseable {
if (aliveTaskCount < 0) {
throw new IllegalStateException("aliveTaskCount < 0");
}
+
if (aliveTaskCount == 0) {
executor.register(subtask);
runningTaskCount = 0;
}
+
aliveTaskCount++;
+ LOGGER.info(
+ "Register subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+ subtask,
+ runningTaskCount,
+ aliveTaskCount);
}
/**
* Deregister the subtask. If the subtask is the last one, close the subtask.
*
+ * <p>Note that this method should be called after the subtask is stopped.
Otherwise, the
+ * runningTaskCount might be inconsistent with the aliveTaskCount because of
parallel connector
+ * scheduling.
+ *
+ * @param pipeNameToDeregister pipe name
* @return true if the subtask is out of life cycle, indicating that the
subtask should never be
* used again
* @throws IllegalStateException if aliveTaskCount <= 0
*/
- public synchronized boolean deregister() {
+ public synchronized boolean deregister(String pipeNameToDeregister) {
if (aliveTaskCount <= 0) {
throw new IllegalStateException("aliveTaskCount <= 0");
}
- if (aliveTaskCount == 1) {
+
+ subtask.discardEventsOfPipe(pipeNameToDeregister);
+
+ try {
+ if (aliveTaskCount > 1) {
+ return false;
+ }
+
close();
// This subtask is out of life cycle, should never be used again
return true;
+ } finally {
+ aliveTaskCount--;
+ LOGGER.info(
+ "Deregister subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+ subtask,
+ runningTaskCount,
+ aliveTaskCount);
}
- aliveTaskCount--;
- return false;
}
public synchronized void start() {
if (runningTaskCount < 0) {
throw new IllegalStateException("runningTaskCount < 0");
}
+
if (runningTaskCount == 0) {
executor.start(subtask.getTaskID());
}
+
runningTaskCount++;
+ LOGGER.info(
+ "Start subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+ subtask,
+ runningTaskCount,
+ aliveTaskCount);
}
public synchronized void stop() {
if (runningTaskCount <= 0) {
throw new IllegalStateException("runningTaskCount <= 0");
}
+
if (runningTaskCount == 1) {
executor.stop(subtask.getTaskID());
}
+
runningTaskCount--;
+ LOGGER.info(
+ "Stop subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+ subtask,
+ runningTaskCount,
+ aliveTaskCount);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 6f1f488785c..e29d09bb103 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -20,13 +20,15 @@
package org.apache.iotdb.db.pipe.task.subtask.connector;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.commit.PipeEventCommitManager;
import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
import
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
import
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeConnector;
-import
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
@@ -50,7 +52,18 @@ public class PipeConnectorSubtaskManager {
public synchronized String register(
PipeConnectorSubtaskExecutor executor,
PipeParameters pipeConnectorParameters,
- PipeRuntimeEnvironment pipeRuntimeEnvironment) {
+ PipeTaskConnectorRuntimeEnvironment environment) {
+ final String connectorKey =
+ pipeConnectorParameters
+ .getStringOrDefault(
+ Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY,
PipeConnectorConstant.SINK_KEY),
+ BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+ // Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase
+ // for matching in `CONNECTOR_CONSTRUCTORS`
+ .toLowerCase();
+ PipeEventCommitManager.getInstance()
+ .register(environment.getPipeName(), environment.getRegionId(),
connectorKey);
+
final String attributeSortedString =
new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
@@ -78,7 +91,7 @@ public class PipeConnectorSubtaskManager {
try {
pipeConnector.validate(new
PipeParameterValidator(pipeConnectorParameters));
pipeConnector.customize(
- pipeConnectorParameters, new
PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment));
+ pipeConnectorParameters, new
PipeTaskRuntimeConfiguration(environment));
pipeConnector.handshake();
} catch (Exception e) {
throw new PipeException(
@@ -90,10 +103,8 @@ public class PipeConnectorSubtaskManager {
new PipeConnectorSubtask(
String.format(
"%s_%s_%s",
- attributeSortedString,
- pipeRuntimeEnvironment.getCreationTime(),
- connectorIndex),
- pipeRuntimeEnvironment.getCreationTime(),
+ attributeSortedString, environment.getCreationTime(),
connectorIndex),
+ environment.getCreationTime(),
attributeSortedString,
connectorIndex,
pendingQueue,
@@ -115,18 +126,21 @@ public class PipeConnectorSubtaskManager {
return attributeSortedString;
}
- public synchronized void deregister(String attributeSortedString) {
+ public synchronized void deregister(
+ String pipeName, int dataRegionId, String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
final List<PipeConnectorSubtaskLifeCycle> lifeCycles =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
- lifeCycles.removeIf(PipeConnectorSubtaskLifeCycle::deregister);
+ lifeCycles.removeIf(o -> o.deregister(pipeName));
if (lifeCycles.isEmpty()) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
}
+
+ PipeEventCommitManager.getInstance().deregister(pipeName, dataRegionId);
}
public synchronized void start(String attributeSortedString) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 642a66dc660..da2a7a00667 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -194,7 +194,7 @@ public class TsFileInsertionDataContainerTest {
try (final TsFileInsertionDataContainer alignedContainer =
new TsFileInsertionDataContainer(alignedTsFile, "root", startTime,
endTime);
final TsFileInsertionDataContainer nonalignedContainer =
- new TsFileInsertionDataContainer(nonalignedTsFile, "root",
startTime, endTime); ) {
+ new TsFileInsertionDataContainer(nonalignedTsFile, "root",
startTime, endTime)) {
AtomicInteger count1 = new AtomicInteger(0);
AtomicInteger count2 = new AtomicInteger(0);
AtomicInteger count3 = new AtomicInteger(0);