This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b2688478116 [fix][broker] Avoid OOM not trigger PulsarByteBufAllocator
outOfMemoryListener when use ByteBufAllocator.DEFAULT.heapBuffer in
PrometheusMetricsGeneratorUtils (#18747)
b2688478116 is described below
commit b268847811643d4d355cff34783de1c5b80cbeed
Author: lixinyang <[email protected]>
AuthorDate: Thu Dec 8 09:40:32 2022 +0800
[fix][broker] Avoid OOM not trigger PulsarByteBufAllocator
outOfMemoryListener when use ByteBufAllocator.DEFAULT.heapBuffer in
PrometheusMetricsGeneratorUtils (#18747)
Co-authored-by: nicklixinyang <[email protected]>
---
.../broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java | 4 ++--
.../pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java | 4 ++--
.../main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java | 4 ++--
3 files changed, 6 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
index 4b0cf76f227..828d9871bb3 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.stats.prometheus;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import java.io.IOException;
@@ -27,6 +26,7 @@ import java.io.OutputStream;
import java.util.Enumeration;
import java.util.List;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
/**
@@ -38,7 +38,7 @@ public class PrometheusMetricsGeneratorUtils {
public static void generate(String cluster, OutputStream out,
List<PrometheusRawMetricsProvider>
metricsProviders)
throws IOException {
- ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+ ByteBuf buf = PulsarByteBufAllocator.DEFAULT.heapBuffer();
try {
SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
generateSystemMetrics(stream, cluster);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 5283215feaa..501bfbbb163 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -22,7 +22,6 @@ import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerat
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.getTypeStr;
import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
@@ -51,6 +50,7 @@ import org.apache.pulsar.broker.stats.WindowWrap;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
@@ -138,7 +138,7 @@ public class PrometheusMetricsGenerator {
} catch (IOException e) {
log.error("Generate metrics failed", e);
//return empty buffer if exception happens
- return ByteBufAllocator.DEFAULT.heapBuffer(0);
+ return PulsarByteBufAllocator.DEFAULT.heapBuffer(0);
}
});
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
index 29a6c517339..faab404683a 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
@@ -31,7 +31,6 @@ import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import java.io.IOException;
import java.lang.reflect.Type;
@@ -44,6 +43,7 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.pulsar.client.internal.DefaultImplementation;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
@@ -367,7 +367,7 @@ public final class SchemaUtils {
int dataLength = 4 + keyBytes.length + 4 + valueBytes.length;
byte[] schema = new byte[dataLength];
//record the key value schema respective length
- ByteBuf byteBuf = ByteBufAllocator.DEFAULT.heapBuffer(dataLength);
+ ByteBuf byteBuf =
PulsarByteBufAllocator.DEFAULT.heapBuffer(dataLength);
byteBuf.writeInt(keyBytes.length).writeBytes(keyBytes).writeInt(valueBytes.length).writeBytes(valueBytes);
byteBuf.readBytes(schema);
return schema;