Copilot commented on code in PR #15644:
URL: https://github.com/apache/iotdb/pull/15644#discussion_r2125963468
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java:
##########
@@ -96,17 +98,42 @@ protected void onTabletInsertionEvent(final
TabletInsertionEvent event) {
protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
// TODO: parse tsfile event on the fly like
SubscriptionPipeTabletEventBatch
try {
- for (final TabletInsertionEvent parsedEvent :
event.toTabletInsertionEvents()) {
- if (!((PipeRawTabletInsertionEvent) parsedEvent)
- .increaseReferenceCount(this.getClass().getName())) {
- LOGGER.warn(
- "SubscriptionPipeTsFileEventBatch: Failed to increase the
reference count of event {}, skipping it.",
- ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage());
- } else {
+ final Iterable<TabletInsertionEvent> iterable =
event.toTabletInsertionEvents();
+ final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
+ while (iterator.hasNext()) {
+ final TabletInsertionEvent parsedEvent = iterator.next();
+ int retryCount = 0;
Review Comment:
Consider adding a delay or exponential backoff in this retry loop to prevent
potential CPU hogging in the event of persistent memory allocation failures.
```suggestion
int retryCount = 0;
final int baseDelay = 100; // base delay in milliseconds
final int maxDelay = 5000; // maximum delay in milliseconds
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java:
##########
@@ -149,9 +155,34 @@ public void process(TsFileInsertionEvent
tsFileInsertionEvent, EventCollector ev
throws Exception {
if (shouldSplitFile) {
try {
- for (final TabletInsertionEvent tabletInsertionEvent :
- tsFileInsertionEvent.toTabletInsertionEvents()) {
- process(tabletInsertionEvent, eventCollector);
+ final Iterable<TabletInsertionEvent> iterable =
+ tsFileInsertionEvent.toTabletInsertionEvents();
+ final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
+ while (iterator.hasNext()) {
+ final TabletInsertionEvent parsedEvent = iterator.next();
+ int retryCount = 0;
+ while (true) {
+ // If failed due do insufficient memory, retry until success to
avoid race among
+ // multiple processor threads
+ try {
+ process(parsedEvent, eventCollector);
+ break;
+ } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+ if (retryCount++ % 100 == 0) {
+ LOGGER.warn(
+ "DownSamplingProcessor: failed to allocate memory for
parsing TsFile {}, retry count is {}, will keep retrying.",
+ ((PipeTsFileInsertionEvent)
tsFileInsertionEvent).getTsFile(),
+ retryCount,
+ e);
+ } else if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "DownSamplingProcessor: failed to allocate memory for
parsing TsFile {}, retry count is {}, will keep retrying.",
+ ((PipeTsFileInsertionEvent)
tsFileInsertionEvent).getTsFile(),
+ retryCount,
+ e);
+ }
Review Comment:
Consider implementing a delay or backoff mechanism in this retry loop to
mitigate high CPU usage when memory allocation issues persist.
```suggestion
// If failed due to insufficient memory, retry until success to
avoid race among
// multiple processor threads
try {
process(parsedEvent, eventCollector);
break;
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
long delay = Math.min(1000, (long) Math.pow(2, retryCount));
// Exponential backoff with max delay of 1000ms
if (retryCount++ % 100 == 0) {
LOGGER.warn(
"DownSamplingProcessor: failed to allocate memory for
parsing TsFile {}, retry count is {}, will retry after {} ms.",
((PipeTsFileInsertionEvent)
tsFileInsertionEvent).getTsFile(),
retryCount,
delay,
e);
} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"DownSamplingProcessor: failed to allocate memory for
parsing TsFile {}, retry count is {}, will retry after {} ms.",
((PipeTsFileInsertionEvent)
tsFileInsertionEvent).getTsFile(),
retryCount,
delay,
e);
}
try {
Thread.sleep(delay);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
throw new RuntimeException("Retry loop interrupted",
interruptedException);
}
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java:
##########
@@ -524,9 +529,34 @@ public void process(
final TsFileInsertionEvent tsFileInsertionEvent, final EventCollector
eventCollector)
throws Exception {
try {
- for (final TabletInsertionEvent tabletInsertionEvent :
- tsFileInsertionEvent.toTabletInsertionEvents()) {
- process(tabletInsertionEvent, eventCollector);
+ final Iterable<TabletInsertionEvent> iterable =
+ tsFileInsertionEvent.toTabletInsertionEvents();
+ final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
+ while (iterator.hasNext()) {
+ final TabletInsertionEvent parsedEvent = iterator.next();
+ int retryCount = 0;
+ while (true) {
+ // If failed due do insufficient memory, retry until success to
avoid race among multiple
+ // processor threads
+ try {
+ process(parsedEvent, eventCollector);
+ break;
+ } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+ if (retryCount++ % 100 == 0) {
+ LOGGER.warn(
+ "AggregateProcessor: failed to allocate memory for parsing
TsFile {}, retry count is {}, will keep retrying.",
+ ((PipeTsFileInsertionEvent)
tsFileInsertionEvent).getTsFile(),
+ retryCount,
+ e);
+ } else if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "AggregateProcessor: failed to allocate memory for parsing
TsFile {}, retry count is {}, will keep retrying.",
+ ((PipeTsFileInsertionEvent)
tsFileInsertionEvent).getTsFile(),
+ retryCount,
+ e);
+ }
Review Comment:
Introduce a delay or exponential backoff in this retry loop to reduce the
risk of performance degradation under continuous memory failures.
```suggestion
// If failed due to insufficient memory, retry until success to
avoid race among multiple
// processor threads
try {
process(parsedEvent, eventCollector);
break;
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
long delay = Math.min(1000L * (1 << Math.min(retryCount, 10)),
30000L); // Exponential backoff with max delay of 30 seconds
if (retryCount++ % 100 == 0) {
LOGGER.warn(
"AggregateProcessor: failed to allocate memory for parsing
TsFile {}, retry count is {}, will retry after {} ms.",
((PipeTsFileInsertionEvent)
tsFileInsertionEvent).getTsFile(),
retryCount,
delay,
e);
} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"AggregateProcessor: failed to allocate memory for parsing
TsFile {}, retry count is {}, will retry after {} ms.",
((PipeTsFileInsertionEvent)
tsFileInsertionEvent).getTsFile(),
retryCount,
delay,
e);
}
Thread.sleep(delay);
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java:
##########
@@ -149,9 +150,34 @@ protected boolean executeOnce() throws Exception {
&& ((PipeTsFileInsertionEvent) event).shouldParse4Privilege()) {
try (final PipeTsFileInsertionEvent tsFileInsertionEvent =
(PipeTsFileInsertionEvent) event) {
- for (final TabletInsertionEvent tabletInsertionEvent :
- tsFileInsertionEvent.toTabletInsertionEvents()) {
- pipeProcessor.process(tabletInsertionEvent,
outputEventCollector);
+ final Iterable<TabletInsertionEvent> iterable =
+ tsFileInsertionEvent.toTabletInsertionEvents();
+ final Iterator<TabletInsertionEvent> iterator =
iterable.iterator();
+ while (iterator.hasNext()) {
+ final TabletInsertionEvent parsedEvent = iterator.next();
+ int retryCount = 0;
+ while (true) {
+ // If failed due do insufficient memory, retry until success
to avoid race among
Review Comment:
It may be beneficial to add a delay or backoff strategy in this retry loop
to mitigate potential CPU resource exhaustion in a sustained failure scenario.
```suggestion
long delay = 100; // Initial delay in milliseconds
final long maxDelay = 5000; // Maximum delay in milliseconds
while (true) {
// If failed due to insufficient memory, retry until
success to avoid race among
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java:
##########
@@ -141,10 +143,36 @@ public void transfer(TsFileInsertionEvent
tsFileInsertionEvent) throws Exception
}
try {
- for (TabletInsertionEvent event :
tsFileInsertionEvent.toTabletInsertionEvents()) {
- // Skip report if any tablet events is added
- ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit();
- transfer(event);
+ final Iterable<TabletInsertionEvent> iterable =
+ tsFileInsertionEvent.toTabletInsertionEvents();
+ final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
+ while (iterator.hasNext()) {
+ final TabletInsertionEvent parsedEvent = iterator.next();
+ int retryCount = 0;
+ while (true) {
+ // If failed due do insufficient memory, retry until success to
avoid race among multiple
Review Comment:
Consider introducing a delay or exponential backoff within this retry loop
to prevent high CPU load during continuous memory allocation failures.
```suggestion
// If failed due to insufficient memory, retry until success to
avoid race among multiple
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java:
##########
@@ -689,11 +691,37 @@ public long count(final boolean skipReportOnCommit)
throws IOException {
if (shouldParseTime()) {
try {
- for (final TabletInsertionEvent event : toTabletInsertionEvents()) {
- final PipeRawTabletInsertionEvent rawEvent =
((PipeRawTabletInsertionEvent) event);
- count += rawEvent.count();
- if (skipReportOnCommit) {
- rawEvent.skipReportOnCommit();
+ final Iterable<TabletInsertionEvent> iterable =
toTabletInsertionEvents();
+ final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
+ while (iterator.hasNext()) {
+ final TabletInsertionEvent parsedEvent = iterator.next();
+ int retryCount = 0;
+ while (true) {
+ // If failed due do insufficient memory, retry until success to
avoid race among
Review Comment:
Consider adding backoff logic in this retry loop to avoid a tight loop that
may lead to excessive CPU consumption if memory issues continue.
```suggestion
long backoffDelay = 100; // Initial backoff delay in milliseconds
final long maxBackoffDelay = 5000; // Maximum backoff delay in
milliseconds
while (true) {
// If failed due to insufficient memory, retry until success to
avoid race among
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]