This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1b992f29004 Pipe: Fixed some problems cause by reusing connectors
after exceptions occurred in connector stage (#12685)
1b992f29004 is described below
commit 1b992f29004390991e3df0c2c3390b2970f2bc8b
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 27 22:52:04 2024 +0800
Pipe: Fixed some problems cause by reusing connectors after exceptions
occurred in connector stage (#12685)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/execution/PipeConfigNodeSubtask.java | 2 +
.../airgap/IoTDBDataRegionAirGapConnector.java | 3 +-
.../protocol/legacy/IoTDBLegacyPipeConnector.java | 3 +-
.../async/IoTDBDataRegionAsyncConnector.java | 10 +++-
.../thrift/sync/IoTDBDataRegionSyncConnector.java | 3 +-
.../protocol/writeback/WriteBackConnector.java | 3 +-
.../subtask/connector/PipeConnectorSubtask.java | 60 ++++++++++++++++++++--
.../PipeRealtimePriorityBlockingQueue.java | 11 +++-
.../pipe/task/connection/BlockingPendingQueue.java | 22 +++++---
.../task/subtask/PipeAbstractConnectorSubtask.java | 43 +++++++++++++++-
10 files changed, 138 insertions(+), 22 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
index 220d0388be2..02cb5373359 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
@@ -186,6 +186,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
PipeConfigRegionConnectorMetrics.getInstance().markConfigEvent(taskID);
} catch (final PipeException e) {
+ setLastExceptionEvent(event);
if (!isClosed.get()) {
throw e;
} else {
@@ -196,6 +197,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
clearReferenceCountAndReleaseLastEvent();
}
} catch (final Exception e) {
+ setLastExceptionEvent(event);
if (!isClosed.get()) {
throw new PipeException(
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index 22377cc9a6d..9874741c7f0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -129,7 +130,7 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
try {
if (event instanceof PipeSchemaRegionWritePlanEvent) {
doTransferWrapper(socket, (PipeSchemaRegionWritePlanEvent) event);
- } else if (!(event instanceof PipeHeartbeatEvent)) {
+ } else if (!(event instanceof PipeHeartbeatEvent || event instanceof
PipeTerminateEvent)) {
LOGGER.warn(
"IoTDBDataRegionAirGapConnector does not support transferring
generic event: {}.",
event);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index e964ddffd26..5edf485adfb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.PipeConnector;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -303,7 +304,7 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
@Override
public void transfer(final Event event) throws Exception {
- if (!(event instanceof PipeHeartbeatEvent)) {
+ if (!(event instanceof PipeHeartbeatEvent || event instanceof
PipeTerminateEvent)) {
LOGGER.warn(
"IoTDBLegacyPipeConnector does not support transferring generic
event: {}.", event);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index b5b4397fd60..7e3b4fb8a9f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -370,8 +371,9 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
transferQueuedEventsIfNecessary();
transferBatchedEventsIfNecessary();
- if (!(event instanceof PipeHeartbeatEvent)
- && !(event instanceof PipeSchemaRegionWritePlanEvent)) {
+ if (!(event instanceof PipeHeartbeatEvent
+ || event instanceof PipeSchemaRegionWritePlanEvent
+ || event instanceof PipeTerminateEvent)) {
LOGGER.warn(
"IoTDBThriftAsyncConnector does not support transferring generic
event: {}.", event);
return;
@@ -470,6 +472,10 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
* @param event {@link Event} to retry
*/
public void addFailureEventToRetryQueue(final Event event) {
+ if (event instanceof EnrichedEvent && ((EnrichedEvent)
event).isReleased()) {
+ return;
+ }
+
if (isClosed.get()) {
if (event instanceof EnrichedEvent) {
((EnrichedEvent)
event).clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 3b664ac50c1..fca40f21d64 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -172,7 +173,7 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
doTransferWrapper();
}
- if (!(event instanceof PipeHeartbeatEvent)) {
+ if (!(event instanceof PipeHeartbeatEvent || event instanceof
PipeTerminateEvent)) {
LOGGER.warn(
"IoTDBThriftSyncConnector does not support transferring generic
event: {}.", event);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
index 8a575922fa7..68789bd3ef5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
@@ -102,7 +103,7 @@ public class WriteBackConnector implements PipeConnector {
@Override
public void transfer(final Event event) throws Exception {
- if (!(event instanceof PipeHeartbeatEvent)) {
+ if (!(event instanceof PipeHeartbeatEvent || event instanceof
PipeTerminateEvent)) {
LOGGER.warn("WriteBackConnector does not support transferring generic
event: {}.", event);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 2994f41a3f9..cc78ebdea54 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -135,27 +135,30 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
decreaseReferenceCountAndReleaseLastEvent(true);
} catch (final PipeException e) {
if (!isClosed.get()) {
+ setLastExceptionEvent(event);
throw e;
} else {
LOGGER.info(
- "{} in pipe transfer, ignored because pipe is dropped.",
+ "{} in pipe transfer, ignored because the connector subtask is
dropped.",
e.getClass().getSimpleName(),
e);
clearReferenceCountAndReleaseLastEvent();
}
} catch (final Exception e) {
if (!isClosed.get()) {
+ setLastExceptionEvent(event);
throw new PipeException(
String.format(
"Exception in pipe transfer, subtask: %s, last event: %s, root
cause: %s",
taskID,
- lastEvent instanceof EnrichedEvent
- ? ((EnrichedEvent) lastEvent).coreReportMessage()
- : lastEvent,
+ event instanceof EnrichedEvent
+ ? ((EnrichedEvent) event).coreReportMessage()
+ : event,
ErrorHandlingUtils.getRootCause(e).getMessage()),
e);
} else {
- LOGGER.info("Exception in pipe transfer, ignored because pipe is
dropped.", e);
+ LOGGER.info(
+ "Exception in pipe transfer, ignored because the connector subtask
is dropped.", e);
clearReferenceCountAndReleaseLastEvent();
}
}
@@ -218,6 +221,53 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
* its queued events in the output pipe connector.
*/
public void discardEventsOfPipe(final String pipeNameToDrop) {
+ // Try to remove the events as much as possible
+ inputPendingQueue.removeIf(
+ event -> {
+ if (event instanceof EnrichedEvent
+ && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()))
{
+ ((EnrichedEvent) event)
+
.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
+ return true;
+ }
+ return false;
+ });
+
+ // synchronized to use the lastEvent and lastExceptionEvent
+ synchronized (this) {
+ // Here we discard the last event, and re-submit the pipe task to avoid
that the pipe task has
+ // stopped submission but will not be stopped by critical exceptions,
because when it acquires
+ // lock, the pipe is already dropped, thus it will do nothing.
+ // Note that since we use a new thread to stop all the pipes, we will
not encounter deadlock
+ // here. Or else we will.
+ if (lastEvent instanceof EnrichedEvent
+ && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName()))
{
+ // Do not clear last event's reference count because it may be on
transferring
+ lastEvent = null;
+ // Submit self to avoid that the lastEvent has been retried "max
times" times and has
+ // stopped executing.
+ // 1. If the last event is still on execution, or submitted by the
previous "onSuccess" or
+ // "onFailure", the "submitSelf" cause nothing.
+ // 2. If the last event is waiting the instance lock to call
"onSuccess", then the callback
+ // method will skip this turn of submission.
+ // 3. If the last event is waiting to call "onFailure", then it will
be ignored because the
+ // last event has been set to null.
+ // 4. If the last event has called "onFailure" and caused the subtask
to stop submission,
+ // it's submitted here and the "report" will wait for the "drop
pipe" lock to stop all
+ // the pipes with critical exceptions. As illustrated above, the
"report" will do
+ // nothing.
+ submitSelf();
+ }
+
+ // We only clear the lastEvent's reference count when it's already on
failure. Namely, we
+ // clear the lastExceptionEvent. It's safe to potentially clear it twice
because we have the
+ // "nonnull" detection.
+ if (lastExceptionEvent instanceof EnrichedEvent
+ && pipeNameToDrop.equals(((EnrichedEvent)
lastExceptionEvent).getPipeName())) {
+ clearReferenceCountAndReleaseLastExceptionEvent();
+ }
+ }
+
if (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector) {
((IoTDBDataRegionAsyncConnector)
outputPipeConnector).discardEventsOfPipe(pipeNameToDrop);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index 82b09346322..b862907181d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -31,6 +31,7 @@ import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;
+import java.util.function.Predicate;
public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQueue<Event> {
@@ -51,10 +52,10 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
if (event instanceof PipeHeartbeatEvent && super.peekLast() instanceof
PipeHeartbeatEvent) {
// We can NOT keep too many PipeHeartbeatEvent in bufferQueue because
they may cause OOM.
((EnrichedEvent)
event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
+ return false;
} else {
- super.directOffer(event);
+ return super.directOffer(event);
}
- return true;
}
@Override
@@ -119,6 +120,12 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
tsfileInsertEventDeque.forEach(action);
}
+ @Override
+ public void removeIf(final Predicate<? super Event> filter) {
+ super.removeIf(filter);
+ pendingQueue.removeIf(filter);
+ }
+
@Override
public boolean isEmpty() {
return super.isEmpty() && tsfileInsertEventDeque.isEmpty();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
index e33f36b07f9..04983a984d9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.function.Predicate;
public abstract class BlockingPendingQueue<E extends Event> {
@@ -41,12 +42,13 @@ public abstract class BlockingPendingQueue<E extends Event>
{
private final PipeEventCounter eventCounter;
- protected BlockingPendingQueue(BlockingQueue<E> pendingQueue,
PipeEventCounter eventCounter) {
+ protected BlockingPendingQueue(
+ final BlockingQueue<E> pendingQueue, final PipeEventCounter
eventCounter) {
this.pendingQueue = pendingQueue;
this.eventCounter = eventCounter;
}
- public boolean waitedOffer(E event) {
+ public boolean waitedOffer(final E event) {
try {
final boolean offered =
pendingQueue.offer(event, MAX_BLOCKING_TIME_MS,
TimeUnit.MILLISECONDS);
@@ -54,14 +56,14 @@ public abstract class BlockingPendingQueue<E extends Event>
{
eventCounter.increaseEventCount(event);
}
return offered;
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOGGER.info("pending queue offer is interrupted.", e);
Thread.currentThread().interrupt();
return false;
}
}
- public boolean directOffer(E event) {
+ public boolean directOffer(final E event) {
final boolean offered = pendingQueue.offer(event);
if (offered) {
eventCounter.increaseEventCount(event);
@@ -69,12 +71,12 @@ public abstract class BlockingPendingQueue<E extends Event>
{
return offered;
}
- public boolean put(E event) {
+ public boolean put(final E event) {
try {
pendingQueue.put(event);
eventCounter.increaseEventCount(event);
return true;
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOGGER.info("pending queue put is interrupted.", e);
Thread.currentThread().interrupt();
return false;
@@ -92,7 +94,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
try {
event = pendingQueue.poll(MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
eventCounter.decreaseEventCount(event);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOGGER.info("pending queue poll is interrupted.", e);
Thread.currentThread().interrupt();
}
@@ -104,10 +106,14 @@ public abstract class BlockingPendingQueue<E extends
Event> {
eventCounter.reset();
}
- public void forEach(Consumer<? super E> action) {
+ public void forEach(final Consumer<? super E> action) {
pendingQueue.forEach(action);
}
+ public void removeIf(final Predicate<? super E> filter) {
+ pendingQueue.removeIf(filter);
+ }
+
public boolean isEmpty() {
return pendingQueue.isEmpty();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
index a95badf6a94..e6a2399f7e8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import com.google.common.util.concurrent.Futures;
@@ -48,6 +49,9 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
// a subtask is submitted to only one thread at a time
protected volatile boolean isSubmitted = false;
+ // For cleaning up the last event when the pipe is dropped
+ protected volatile Event lastExceptionEvent;
+
protected PipeAbstractConnectorSubtask(
final String taskID, final long creationTime, final PipeConnector
outputPipeConnector) {
super(taskID, creationTime);
@@ -76,11 +80,35 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
isSubmitted = false;
if (isClosed.get()) {
- LOGGER.info("onFailure in pipe transfer, ignored because pipe is
dropped.", throwable);
+ LOGGER.info(
+ "onFailure in pipe transfer, ignored because the connector subtask
is dropped.",
+ throwable);
clearReferenceCountAndReleaseLastEvent();
return;
}
+ // We assume that the event is cleared as the "lastEvent" in processor
subtask and reaches the
+ // connector subtask. Then, it may fail because of released resource and
block the other pipes
+ // using the same connector. We simply discard it.
+ if (lastExceptionEvent instanceof EnrichedEvent
+ && ((EnrichedEvent) lastExceptionEvent).isReleased()) {
+ LOGGER.info(
+ "onFailure in pipe transfer, ignored because the failure event is
released.", throwable);
+ submitSelf();
+ return;
+ }
+
+ // If lastExceptionEvent != lastEvent, it indicates that the lastEvent's
reference has been
+ // changed because the pipe of it has been dropped. In that case, we just
discard the event.
+ if (lastEvent != lastExceptionEvent) {
+ LOGGER.info(
+ "onFailure in pipe transfer, ignored because the failure event's
pipe is dropped.",
+ throwable);
+ clearReferenceCountAndReleaseLastExceptionEvent();
+ submitSelf();
+ return;
+ }
+
if (throwable instanceof PipeConnectionException) {
// Retry to connect to the target system if the connection is broken
// We should reconstruct the client before re-submit the subtask
@@ -191,4 +219,17 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
Futures.addCallback(nextFuture, this, subtaskCallbackListeningExecutor);
isSubmitted = true;
}
+
+ protected synchronized void setLastExceptionEvent(final Event event) {
+ lastExceptionEvent = event;
+ }
+
+ protected synchronized void
clearReferenceCountAndReleaseLastExceptionEvent() {
+ if (lastExceptionEvent != null) {
+ if (lastExceptionEvent instanceof EnrichedEvent) {
+ ((EnrichedEvent)
lastExceptionEvent).clearReferenceCount(PipeSubtask.class.getName());
+ }
+ lastExceptionEvent = null;
+ }
+ }
}