This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2d36bfbcb5d IGNITE-26154 Reduce `byte[]` allocation rate (#6375)
2d36bfbcb5d is described below
commit 2d36bfbcb5d1f4820965e2ebcff3a5d69eb546c1
Author: Ivan Bessonov <[email protected]>
AuthorDate: Thu Aug 7 13:00:03 2025 +0300
IGNITE-26154 Reduce `byte[]` allocation rate (#6375)
---
.../metastorage/server/WatchProcessor.java | 11 +++++-----
.../network/TrackableNetworkMessageHandler.java | 11 +++++-----
.../internal/network/DefaultMessagingService.java | 24 +++++++++++++---------
.../apache/ignite/raft/jraft/core/Replicator.java | 5 ++++-
4 files changed, 29 insertions(+), 22 deletions(-)
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index 795e07c5204..d42b8ab145b 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -74,6 +74,8 @@ import org.jetbrains.annotations.VisibleForTesting;
* will not get notified of a new revision until all Watches have finished
processing a previous revision.
*/
public class WatchProcessor implements ManuallyCloseable {
+ private final boolean longHandlingLoggingEnabled =
getBoolean(IgniteSystemProperties.LONG_HANDLING_LOGGING_ENABLED, false);
+
/** Reads an entry from the storage using a given key and revision. */
@FunctionalInterface
public interface EntryReader {
@@ -257,7 +259,7 @@ public class WatchProcessor implements ManuallyCloseable {
return enqueue(() -> {
List<WatchAndEvents> watchAndEvents =
collectWatchesAndEvents(filteredUpdatedEntries, newRevision);
- long startTimeNanos = System.nanoTime();
+ long startTimeNanos = longHandlingLoggingEnabled ?
System.nanoTime() : 0;
CompletableFuture<Void> notifyWatchesFuture =
performWatchesNotifications(watchAndEvents, newRevision, time);
@@ -328,8 +330,8 @@ public class WatchProcessor implements ManuallyCloseable {
return allOf(notifyWatchFutures);
}
- private static void maybeLogLongProcessing(List<Entry> updatedEntries,
List<WatchAndEvents> watchAndEvents, long startTimeNanos) {
- if (!getBoolean(IgniteSystemProperties.LONG_HANDLING_LOGGING_ENABLED,
false)) {
+ private void maybeLogLongProcessing(List<Entry> updatedEntries,
List<WatchAndEvents> watchAndEvents, long startTimeNanos) {
+ if (!longHandlingLoggingEnabled) {
return;
}
@@ -369,7 +371,6 @@ public class WatchProcessor implements ManuallyCloseable {
}
var watchAndEvents = new ArrayList<WatchAndEvents>();
- boolean timeBagEnabled =
getBoolean(IgniteSystemProperties.LONG_HANDLING_LOGGING_ENABLED, false);
for (Watch watch : watches) {
List<EntryEvent> events = List.of();
@@ -391,7 +392,7 @@ public class WatchProcessor implements ManuallyCloseable {
}
if (!events.isEmpty()) {
- watchAndEvents.add(new WatchAndEvents(watch, events,
TimeBag.createTimeBag(timeBagEnabled, false)));
+ watchAndEvents.add(new WatchAndEvents(watch, events,
TimeBag.createTimeBag(longHandlingLoggingEnabled, false)));
}
}
diff --git
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/TrackableNetworkMessageHandler.java
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/TrackableNetworkMessageHandler.java
index 7cca18fd417..3777c0fcb6a 100644
---
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/TrackableNetworkMessageHandler.java
+++
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/TrackableNetworkMessageHandler.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.network;
+import static
org.apache.ignite.internal.lang.IgniteSystemProperties.LONG_HANDLING_LOGGING_ENABLED;
import static
org.apache.ignite.internal.tostring.IgniteToStringBuilder.includeSensitive;
import java.util.concurrent.TimeUnit;
@@ -37,6 +38,8 @@ public class TrackableNetworkMessageHandler implements
NetworkMessageHandler {
*/
private static final int MESSAGING_PROCESSING_LOG_THRESHOLD_MILLIS = 5;
+ private final boolean longHandlingLoggingEnabled =
IgniteSystemProperties.getBoolean(LONG_HANDLING_LOGGING_ENABLED, false);
+
private final NetworkMessageHandler targetHandler;
TrackableNetworkMessageHandler(NetworkMessageHandler targetHandler) {
@@ -45,11 +48,11 @@ public class TrackableNetworkMessageHandler implements
NetworkMessageHandler {
@Override
public void onReceived(NetworkMessage message, ClusterNode sender,
@Nullable Long correlationId) {
- long startTimeNanos = System.nanoTime();
+ long startTimeNanos = longHandlingLoggingEnabled ? System.nanoTime() :
0;
targetHandler.onReceived(message, sender, correlationId);
- if (longHandlingLoggingEnabled() && isNetworkThread()) {
+ if (longHandlingLoggingEnabled && isNetworkThread()) {
maybeLogLongProcessing(message, startTimeNanos);
}
}
@@ -58,10 +61,6 @@ public class TrackableNetworkMessageHandler implements
NetworkMessageHandler {
return Thread.currentThread() instanceof IgniteMessageServiceThread;
}
- private static boolean longHandlingLoggingEnabled() {
- return
IgniteSystemProperties.getBoolean(IgniteSystemProperties.LONG_HANDLING_LOGGING_ENABLED,
false);
- }
-
private static void maybeLogLongProcessing(NetworkMessage message, long
startTimeNanos) {
long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- startTimeNanos);
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index e19be5419ef..a35acc7995e 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -81,6 +81,8 @@ import org.jetbrains.annotations.TestOnly;
public class DefaultMessagingService extends AbstractMessagingService {
private static final IgniteLogger LOG =
Loggers.forClass(DefaultMessagingService.class);
+ private final boolean longHandlingLoggingEnabled =
IgniteSystemProperties.getBoolean(LONG_HANDLING_LOGGING_ENABLED, false);
+
/** Network messages factory. */
private final NetworkMessagesFactory factory;
@@ -486,22 +488,24 @@ public class DefaultMessagingService extends
AbstractMessagingService {
Long finalCorrelationId = correlationId;
firstHandlerExecutor.execute(() -> {
- long startedNanos = System.nanoTime();
+ long startedNanos = longHandlingLoggingEnabled ? System.nanoTime()
: 0;
try {
handleStartingWithFirstHandler(payload, finalCorrelationId,
inNetworkObject, firstHandlerContext, handlerContexts);
} catch (Throwable e) {
handleAndRethrowIfError(inNetworkObject, e);
} finally {
- long tookMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNanos);
-
- if (tookMillis > 100 &&
IgniteSystemProperties.getBoolean(LONG_HANDLING_LOGGING_ENABLED, false)) {
- LOG.warn(
- "Processing of {} from {} took {} ms",
- LOG.isDebugEnabled() && includeSensitive() ?
message : message.toStringForLightLogging(),
- inNetworkObject.sender(),
- tookMillis
- );
+ if (longHandlingLoggingEnabled && LOG.isWarnEnabled()) {
+ long tookMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNanos);
+
+ if (tookMillis > 100) {
+ LOG.warn(
+ "Processing of {} from {} took {} ms",
+ LOG.isDebugEnabled() && includeSensitive() ?
message : message.toStringForLightLogging(),
+ inNetworkObject.sender(),
+ tookMillis
+ );
+ }
}
}
});
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index 631c4b75a25..cd15efb2141 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -129,6 +129,8 @@ public class Replicator implements ThreadId.OnError {
private final String metricName;
+ private final String inflightsCountMetricName;
+
/** This set is used only for logging. */
private final Set<PeerId> deadPeers = ConcurrentHashMap.newKeySet();
@@ -170,6 +172,7 @@ public class Replicator implements ThreadId.OnError {
this.raftOptions = raftOptions;
this.rpcService = replicatorOptions.getRaftRpcService();
this.metricName = getReplicatorMetricName(replicatorOptions);
+ this.inflightsCountMetricName = name(this.metricName,
"replicate-inflights-count");
setState(State.Created);
}
@@ -553,7 +556,7 @@ public class Replicator implements ThreadId.OnError {
final int seq, final Future<Message> rpcInfly) {
this.rpcInFly = new Inflight(reqType, startIndex, count, size, seq,
rpcInfly);
this.inflights.add(this.rpcInFly);
- this.nodeMetrics.recordSize(name(this.metricName,
"replicate-inflights-count"), this.inflights.size());
+ this.nodeMetrics.recordSize(inflightsCountMetricName,
this.inflights.size());
}
/**