This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new e74d1b48047 Pipe: Fix data lost when syncing between clusters cause by
senders' data region leader change (#13532) (#13536)
e74d1b48047 is described below
commit e74d1b480477755ddb27731fb033d973e6d5ab5c
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Sep 18 15:48:19 2024 +0800
Pipe: Fix data lost when syncing between clusters cause by senders' data
region leader change (#13532) (#13536)
(cherry picked from commit 6b71f769693eb8f61ac71f5a0088242fbf55665e)
---
.../payload/evolvable/batch/PipeTabletEventBatch.java | 4 ++--
.../payload/evolvable/batch/PipeTransferBatchReqBuilder.java | 6 +++---
.../protocol/thrift/async/IoTDBDataRegionAsyncConnector.java | 7 ++++---
.../protocol/thrift/sync/IoTDBDataRegionSyncConnector.java | 4 ++--
.../db/pipe/task/subtask/connector/PipeConnectorSubtask.java | 12 +++++++-----
.../subtask/connector/PipeConnectorSubtaskLifeCycle.java | 5 +++--
.../task/subtask/connector/PipeConnectorSubtaskManager.java | 6 +++---
.../subtask/connector/PipeRealtimePriorityBlockingQueue.java | 7 ++++---
.../task/subtask/SubscriptionConnectorSubtaskLifeCycle.java | 2 +-
.../task/subtask/SubscriptionConnectorSubtaskManager.java | 6 +++---
.../commons/pipe/connector/protocol/IoTDBConnector.java | 2 +-
.../org/apache/iotdb/commons/pipe/event/EnrichedEvent.java | 5 +++++
.../commons/pipe/task/connection/BlockingPendingQueue.java | 5 +++--
13 files changed, 41 insertions(+), 30 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index 75d4b2f9e3c..c7b8e1f0615 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -123,10 +123,10 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
* Discard all events of the given pipe. This method only clears the
reference count of the events
* and discard them, but do not modify other objects (such as buffers) for
simplicity.
*/
- public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
+ public synchronized void discardEventsOfPipe(final String pipeNameToDrop,
final int regionId) {
events.removeIf(
event -> {
- if (pipeNameToDrop.equals(event.getPipeName())) {
+ if (pipeNameToDrop.equals(event.getPipeName()) && regionId ==
event.getRegionId()) {
event.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 77fd73f8aba..9b787ee30f6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -184,9 +184,9 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
&&
endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
}
- public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
- defaultBatch.discardEventsOfPipe(pipeNameToDrop);
- endPointToBatch.values().forEach(batch ->
batch.discardEventsOfPipe(pipeNameToDrop));
+ public synchronized void discardEventsOfPipe(final String pipeNameToDrop,
final int regionId) {
+ defaultBatch.discardEventsOfPipe(pipeNameToDrop, regionId);
+ endPointToBatch.values().forEach(batch ->
batch.discardEventsOfPipe(pipeNameToDrop, regionId));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index f68d86dd7e8..b266d8cca3b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -511,14 +511,15 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
//////////////////////////// Operations for close
////////////////////////////
@Override
- public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
+ public synchronized void discardEventsOfPipe(final String pipeNameToDrop,
final int regionId) {
if (isTabletBatchModeEnabled) {
- tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
+ tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
}
retryEventQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()))
{
+ && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
+ && regionId == ((EnrichedEvent) event).getRegionId()) {
((EnrichedEvent) event)
.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 772a55b0e17..f67a2b88dd6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -493,8 +493,8 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
}
@Override
- public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
- tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
+ public synchronized void discardEventsOfPipe(final String pipeNameToDrop,
final int regionId) {
+ tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index d55f18f9838..7c88c962466 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -218,9 +218,9 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
* When a pipe is dropped, the connector maybe reused and will not be
closed. So we just discard
* its queued events in the output pipe connector.
*/
- public void discardEventsOfPipe(final String pipeNameToDrop) {
+ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
// Try to remove the events as much as possible
- inputPendingQueue.discardEventsOfPipe(pipeNameToDrop);
+ inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
// synchronized to use the lastEvent & lastExceptionEvent
synchronized (this) {
@@ -230,7 +230,8 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
// Note that since we use a new thread to stop all the pipes, we will
not encounter deadlock
// here. Or else we will.
if (lastEvent instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName()))
{
+ && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
+ && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
// Do not clear last event's reference count because it may be on
transferring
lastEvent = null;
// Submit self to avoid that the lastEvent has been retried "max
times" times and has
@@ -252,13 +253,14 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
// clear the lastExceptionEvent. It's safe to potentially clear it twice
because we have the
// "nonnull" detection.
if (lastExceptionEvent instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent)
lastExceptionEvent).getPipeName())) {
+ && pipeNameToDrop.equals(((EnrichedEvent)
lastExceptionEvent).getPipeName())
+ && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
clearReferenceCountAndReleaseLastExceptionEvent();
}
}
if (outputPipeConnector instanceof IoTDBConnector) {
- ((IoTDBConnector)
outputPipeConnector).discardEventsOfPipe(pipeNameToDrop);
+ ((IoTDBConnector)
outputPipeConnector).discardEventsOfPipe(pipeNameToDrop, regionId);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index e3d4d35a171..26c18b3dad6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -85,16 +85,17 @@ public class PipeConnectorSubtaskLifeCycle implements
AutoCloseable {
* connector scheduling.
*
* @param pipeNameToDeregister pipe name
+ * @param regionId region id
* @return {@code true} if the {@link PipeConnectorSubtask} is out of life
cycle, indicating that
* the {@link PipeConnectorSubtask} should never be used again
* @throws IllegalStateException if {@link
PipeConnectorSubtaskLifeCycle#registeredTaskCount} <= 0
*/
- public synchronized boolean deregister(final String pipeNameToDeregister) {
+ public synchronized boolean deregister(final String pipeNameToDeregister,
int regionId) {
if (registeredTaskCount <= 0) {
throw new IllegalStateException("registeredTaskCount <= 0");
}
- subtask.discardEventsOfPipe(pipeNameToDeregister);
+ subtask.discardEventsOfPipe(pipeNameToDeregister, regionId);
try {
if (registeredTaskCount > 1) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index f8809b6b689..5bb14073648 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -179,7 +179,7 @@ public class PipeConnectorSubtaskManager {
public synchronized void deregister(
final String pipeName,
final long creationTime,
- final int dataRegionId,
+ final int regionId,
final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
@@ -187,13 +187,13 @@ public class PipeConnectorSubtaskManager {
final List<PipeConnectorSubtaskLifeCycle> lifeCycles =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
- lifeCycles.removeIf(o -> o.deregister(pipeName));
+ lifeCycles.removeIf(o -> o.deregister(pipeName, regionId));
if (lifeCycles.isEmpty()) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
}
- PipeEventCommitManager.getInstance().deregister(pipeName, creationTime,
dataRegionId);
+ PipeEventCommitManager.getInstance().deregister(pipeName, creationTime,
regionId);
}
public synchronized void start(final String attributeSortedString) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index 6b33c248bfb..beabba3128a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -168,12 +168,13 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
}
@Override
- public void discardEventsOfPipe(final String pipeNameToDrop) {
- super.discardEventsOfPipe(pipeNameToDrop);
+ public void discardEventsOfPipe(final String pipeNameToDrop, final int
regionId) {
+ super.discardEventsOfPipe(pipeNameToDrop, regionId);
tsfileInsertEventDeque.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()))
{
+ && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
+ && regionId == ((EnrichedEvent) event).getRegionId()) {
if (((EnrichedEvent) event)
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
eventCounter.decreaseEventCount(event);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
index c05bd13f07c..7e7f5fa1d21 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
@@ -69,7 +69,7 @@ public class SubscriptionConnectorSubtaskLifeCycle extends
PipeConnectorSubtaskL
}
@Override
- public synchronized boolean deregister(final String ignored) {
+ public synchronized boolean deregister(final String ignored, int regionId) {
if (registeredTaskCount <= 0) {
throw new IllegalStateException("registeredTaskCount <= 0");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
index 45d62174e4b..377790836fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
@@ -164,7 +164,7 @@ public class SubscriptionConnectorSubtaskManager {
public synchronized void deregister(
final String pipeName,
final long creationTime,
- final int dataRegionId,
+ final int regionId,
final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
@@ -172,11 +172,11 @@ public class SubscriptionConnectorSubtaskManager {
final PipeConnectorSubtaskLifeCycle lifeCycle =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
- if (lifeCycle.deregister(pipeName)) {
+ if (lifeCycle.deregister(pipeName, regionId)) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
}
- PipeEventCommitManager.getInstance().deregister(pipeName, creationTime,
dataRegionId);
+ PipeEventCommitManager.getInstance().deregister(pipeName, creationTime,
regionId);
}
public synchronized void start(final String attributeSortedString) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index eb66a638672..e1f8f9ed1d5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -476,7 +476,7 @@ public abstract class IoTDBConnector implements
PipeConnector {
* When a pipe is dropped, the connector maybe reused and will not be
closed. We need to discard
* its batched or queued events in the output pipe connector.
*/
- public synchronized void discardEventsOfPipe(final String pipeName) {
+ public synchronized void discardEventsOfPipe(final String pipeName, final
int regionId) {
// Do nothing by default
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 4429d4d8b92..7c7d6fe9ea2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -292,6 +292,11 @@ public abstract class EnrichedEvent implements Event {
return creationTime;
}
+ public final int getRegionId() {
+ // TODO: persist regionId in EnrichedEvent
+ return committerKey == null ? -1 : committerKey.getRegionId();
+ }
+
public final boolean isDataRegionEvent() {
return !(this instanceof PipeWritePlanEvent) && !(this instanceof
PipeSnapshotEvent);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
index c67e80181f9..e981818b87e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
@@ -124,11 +124,12 @@ public abstract class BlockingPendingQueue<E extends
Event> {
eventCounter.reset();
}
- public void discardEventsOfPipe(final String pipeNameToDrop) {
+ public void discardEventsOfPipe(final String pipeNameToDrop, final int
regionId) {
pendingQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()))
{
+ && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
+ && regionId == ((EnrichedEvent) event).getRegionId()) {
if (((EnrichedEvent)
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
eventCounter.decreaseEventCount(event);
}