Copilot commented on code in PR #15644:
URL: https://github.com/apache/iotdb/pull/15644#discussion_r2125963468


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java:
##########
@@ -96,17 +98,42 @@ protected void onTabletInsertionEvent(final 
TabletInsertionEvent event) {
   protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
     // TODO: parse tsfile event on the fly like 
SubscriptionPipeTabletEventBatch
     try {
-      for (final TabletInsertionEvent parsedEvent : 
event.toTabletInsertionEvents()) {
-        if (!((PipeRawTabletInsertionEvent) parsedEvent)
-            .increaseReferenceCount(this.getClass().getName())) {
-          LOGGER.warn(
-              "SubscriptionPipeTsFileEventBatch: Failed to increase the 
reference count of event {}, skipping it.",
-              ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage());
-        } else {
+      final Iterable<TabletInsertionEvent> iterable = 
event.toTabletInsertionEvents();
+      final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
+      while (iterator.hasNext()) {
+        final TabletInsertionEvent parsedEvent = iterator.next();
+        int retryCount = 0;

Review Comment:
   Consider adding a delay or exponential backoff in this retry loop to prevent 
potential CPU hogging in the event of persistent memory allocation failures.
   ```suggestion
           int retryCount = 0;
           final int baseDelay = 100; // base delay in milliseconds
           final int maxDelay = 5000; // maximum delay in milliseconds
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java:
##########
@@ -149,9 +155,34 @@ public void process(TsFileInsertionEvent 
tsFileInsertionEvent, EventCollector ev
       throws Exception {
     if (shouldSplitFile) {
       try {
-        for (final TabletInsertionEvent tabletInsertionEvent :
-            tsFileInsertionEvent.toTabletInsertionEvents()) {
-          process(tabletInsertionEvent, eventCollector);
+        final Iterable<TabletInsertionEvent> iterable =
+            tsFileInsertionEvent.toTabletInsertionEvents();
+        final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
+        while (iterator.hasNext()) {
+          final TabletInsertionEvent parsedEvent = iterator.next();
+          int retryCount = 0;
+          while (true) {
+            // If failed due do insufficient memory, retry until success to 
avoid race among
+            // multiple processor threads
+            try {
+              process(parsedEvent, eventCollector);
+              break;
+            } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+              if (retryCount++ % 100 == 0) {
+                LOGGER.warn(
+                    "DownSamplingProcessor: failed to allocate memory for 
parsing TsFile {}, retry count is {}, will keep retrying.",
+                    ((PipeTsFileInsertionEvent) 
tsFileInsertionEvent).getTsFile(),
+                    retryCount,
+                    e);
+              } else if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug(
+                    "DownSamplingProcessor: failed to allocate memory for 
parsing TsFile {}, retry count is {}, will keep retrying.",
+                    ((PipeTsFileInsertionEvent) 
tsFileInsertionEvent).getTsFile(),
+                    retryCount,
+                    e);
+              }

Review Comment:
   Consider implementing a delay or backoff mechanism in this retry loop to 
mitigate high CPU usage when memory allocation issues persist.
   ```suggestion
               // If failed due to insufficient memory, retry until success to 
avoid race among
               // multiple processor threads
               try {
                 process(parsedEvent, eventCollector);
                 break;
               } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
                 long delay = Math.min(1000, (long) Math.pow(2, retryCount)); 
// Exponential backoff with max delay of 1000ms
                 if (retryCount++ % 100 == 0) {
                   LOGGER.warn(
                       "DownSamplingProcessor: failed to allocate memory for 
parsing TsFile {}, retry count is {}, will retry after {} ms.",
                       ((PipeTsFileInsertionEvent) 
tsFileInsertionEvent).getTsFile(),
                       retryCount,
                       delay,
                       e);
                 } else if (LOGGER.isDebugEnabled()) {
                   LOGGER.debug(
                       "DownSamplingProcessor: failed to allocate memory for 
parsing TsFile {}, retry count is {}, will retry after {} ms.",
                       ((PipeTsFileInsertionEvent) 
tsFileInsertionEvent).getTsFile(),
                       retryCount,
                       delay,
                       e);
                 }
                 try {
                   Thread.sleep(delay);
                 } catch (InterruptedException interruptedException) {
                   Thread.currentThread().interrupt();
                   throw new RuntimeException("Retry loop interrupted", 
interruptedException);
                 }
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java:
##########
@@ -524,9 +529,34 @@ public void process(
       final TsFileInsertionEvent tsFileInsertionEvent, final EventCollector 
eventCollector)
       throws Exception {
     try {
-      for (final TabletInsertionEvent tabletInsertionEvent :
-          tsFileInsertionEvent.toTabletInsertionEvents()) {
-        process(tabletInsertionEvent, eventCollector);
+      final Iterable<TabletInsertionEvent> iterable =
+          tsFileInsertionEvent.toTabletInsertionEvents();
+      final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
+      while (iterator.hasNext()) {
+        final TabletInsertionEvent parsedEvent = iterator.next();
+        int retryCount = 0;
+        while (true) {
+          // If failed due do insufficient memory, retry until success to 
avoid race among multiple
+          // processor threads
+          try {
+            process(parsedEvent, eventCollector);
+            break;
+          } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+            if (retryCount++ % 100 == 0) {
+              LOGGER.warn(
+                  "AggregateProcessor: failed to allocate memory for parsing 
TsFile {}, retry count is {}, will keep retrying.",
+                  ((PipeTsFileInsertionEvent) 
tsFileInsertionEvent).getTsFile(),
+                  retryCount,
+                  e);
+            } else if (LOGGER.isDebugEnabled()) {
+              LOGGER.debug(
+                  "AggregateProcessor: failed to allocate memory for parsing 
TsFile {}, retry count is {}, will keep retrying.",
+                  ((PipeTsFileInsertionEvent) 
tsFileInsertionEvent).getTsFile(),
+                  retryCount,
+                  e);
+            }

Review Comment:
   Introduce a delay or exponential backoff in this retry loop to reduce the 
risk of performance degradation under continuous memory failures.
   ```suggestion
             // If failed due to insufficient memory, retry until success to 
avoid race among multiple
             // processor threads
             try {
               process(parsedEvent, eventCollector);
               break;
             } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
               long delay = Math.min(1000L * (1 << Math.min(retryCount, 10)), 
30000L); // Exponential backoff with max delay of 30 seconds
               if (retryCount++ % 100 == 0) {
                 LOGGER.warn(
                     "AggregateProcessor: failed to allocate memory for parsing 
TsFile {}, retry count is {}, will retry after {} ms.",
                     ((PipeTsFileInsertionEvent) 
tsFileInsertionEvent).getTsFile(),
                     retryCount,
                     delay,
                     e);
               } else if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug(
                     "AggregateProcessor: failed to allocate memory for parsing 
TsFile {}, retry count is {}, will retry after {} ms.",
                     ((PipeTsFileInsertionEvent) 
tsFileInsertionEvent).getTsFile(),
                     retryCount,
                     delay,
                     e);
               }
               Thread.sleep(delay);
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java:
##########
@@ -149,9 +150,34 @@ protected boolean executeOnce() throws Exception {
               && ((PipeTsFileInsertionEvent) event).shouldParse4Privilege()) {
             try (final PipeTsFileInsertionEvent tsFileInsertionEvent =
                 (PipeTsFileInsertionEvent) event) {
-              for (final TabletInsertionEvent tabletInsertionEvent :
-                  tsFileInsertionEvent.toTabletInsertionEvents()) {
-                pipeProcessor.process(tabletInsertionEvent, 
outputEventCollector);
+              final Iterable<TabletInsertionEvent> iterable =
+                  tsFileInsertionEvent.toTabletInsertionEvents();
+              final Iterator<TabletInsertionEvent> iterator = 
iterable.iterator();
+              while (iterator.hasNext()) {
+                final TabletInsertionEvent parsedEvent = iterator.next();
+                int retryCount = 0;
+                while (true) {
+                  // If failed due do insufficient memory, retry until success 
to avoid race among

Review Comment:
   It may be beneficial to add a delay or backoff strategy in this retry loop 
to mitigate potential CPU resource exhaustion in a sustained failure scenario.
   ```suggestion
                   long delay = 100; // Initial delay in milliseconds
                   final long maxDelay = 5000; // Maximum delay in milliseconds
                   while (true) {
                     // If failed due to insufficient memory, retry until 
success to avoid race among
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java:
##########
@@ -141,10 +143,36 @@ public void transfer(TsFileInsertionEvent 
tsFileInsertionEvent) throws Exception
     }
 
     try {
-      for (TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
-        // Skip report if any tablet events is added
-        ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit();
-        transfer(event);
+      final Iterable<TabletInsertionEvent> iterable =
+          tsFileInsertionEvent.toTabletInsertionEvents();
+      final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
+      while (iterator.hasNext()) {
+        final TabletInsertionEvent parsedEvent = iterator.next();
+        int retryCount = 0;
+        while (true) {
+          // If failed due do insufficient memory, retry until success to 
avoid race among multiple

Review Comment:
   Consider introducing a delay or exponential backoff within this retry loop 
to prevent high CPU load during continuous memory allocation failures.
   ```suggestion
             // If failed due to insufficient memory, retry until success to 
avoid race among multiple
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java:
##########
@@ -689,11 +691,37 @@ public long count(final boolean skipReportOnCommit) 
throws IOException {
 
     if (shouldParseTime()) {
       try {
-        for (final TabletInsertionEvent event : toTabletInsertionEvents()) {
-          final PipeRawTabletInsertionEvent rawEvent = 
((PipeRawTabletInsertionEvent) event);
-          count += rawEvent.count();
-          if (skipReportOnCommit) {
-            rawEvent.skipReportOnCommit();
+        final Iterable<TabletInsertionEvent> iterable = 
toTabletInsertionEvents();
+        final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
+        while (iterator.hasNext()) {
+          final TabletInsertionEvent parsedEvent = iterator.next();
+          int retryCount = 0;
+          while (true) {
+            // If failed due do insufficient memory, retry until success to 
avoid race among

Review Comment:
   Consider adding backoff logic in this retry loop to avoid a tight loop that 
may lead to excessive CPU consumption if memory issues continue.
   ```suggestion
             long backoffDelay = 100; // Initial backoff delay in milliseconds
             final long maxBackoffDelay = 5000; // Maximum backoff delay in 
milliseconds
             while (true) {
               // If failed due to insufficient memory, retry until success to 
avoid race among
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to