Pengzna commented on code in PR #15684:
URL: https://github.com/apache/iotdb/pull/15684#discussion_r2141854270


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -630,11 +631,20 @@ public void addFailureEventToRetryQueue(final 
EnrichedEvent event) {
       return;
     }
 
-    retryEventQueue.offer(event);
-    LOGGER.info(
-        "PipeConsensus-ConsensusGroup-{}: Event {} transfer failed, will be 
added to retry queue.",
-        consensusGroupId,
-        event);
+    boolean res = retryEventQueue.offer(event);
+    if (res) {
+      LOGGER.info(
+          "PipeConsensus-ConsensusGroup-{}: Event {} replicate index {} 
transfer failed, will be added to retry queue.",
+          consensusGroupId,
+          event,
+          event.getReplicateIndexForIoTV2());
+    } else {
+      LOGGER.warn(
+          "PipeConsensus-ConsensusGroup-{}: Event {} replicate index {} 
transfer failed, added to retry queue failed, this event will be ignored.",
+          consensusGroupId,
+          event,
+          event.getReplicateIndexForIoTV2());
+    }

Review Comment:
   good idea. I will do that in a further pr 



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -535,9 +535,10 @@ private void asyncTransferQueuedEventsIfNecessary() {
                 ? peekedEvent.getRetryInterval()
                 : 0L;
         LOGGER.info(
-            "PipeConsensus-ConsensusGroup-{}: retry with interval {} for {}",
+            "PipeConsensus-ConsensusGroup-{}: retry with interval {} for index 
{} {}",
             consensusGroupId,
             retryInterval,
+            peekedEvent.getReplicateIndexForIoTV2(),
             peekedEvent);

Review Comment:
   yes, but its too long and not convenient for me to check log. I add this 
mark for quick check



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -616,7 +617,7 @@ private void retryTransfer(final PipeDeleteDataNodeEvent 
deleteDataNodeEvent) {
    * @param event event to retry
    */
   @SuppressWarnings("java:S899")
-  public void addFailureEventToRetryQueue(final EnrichedEvent event) {
+  public synchronized void addFailureEventToRetryQueue(final EnrichedEvent 
event) {

Review Comment:
   keep serialization for this method(enqueue) and 
`asyncTransferQueuedEventsIfNecessary`(dequeue), in case that one thread is 
infinitely polling events in `asyncTransferQueuedEventsIfNecessary` because 
there are always enqueue events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to