This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f0a3960e3e1 KAFKA-17867 Consider using zero-copy for
PushTelemetryRequest (#17622)
f0a3960e3e1 is described below
commit f0a3960e3e114228bd4d8b220812fc142f832b2b
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Oct 30 20:49:56 2024 +0800
KAFKA-17867 Consider using zero-copy for PushTelemetryRequest (#17622)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../common/requests/PushTelemetryRequest.java | 2 +-
.../internals/ClientTelemetryReporter.java | 5 +++--
.../telemetry/internals/ClientTelemetryUtils.java | 12 +++++------
.../common/message/PushTelemetryRequest.json | 2 +-
.../common/requests/PushTelemetryRequestTest.java | 7 +++---
.../kafka/common/requests/RequestResponseTest.java | 2 +-
.../internals/ClientTelemetryUtilsTest.java | 6 +++---
.../apache/kafka/server/ClientMetricsManager.java | 7 +++---
.../kafka/server/ClientMetricsManagerTest.java | 25 +++++++++++-----------
.../metrics/ClientMetricsReceiverPluginTest.java | 3 ++-
10 files changed, 37 insertions(+), 34 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
index 94e029d4db1..9264de0f59e 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
@@ -88,7 +88,7 @@ public class PushTelemetryRequest extends AbstractRequest {
public ByteBuffer metricsData() {
CompressionType cType =
CompressionType.forId(this.data.compressionType());
return (cType == CompressionType.NONE) ?
- ByteBuffer.wrap(this.data.metrics()) :
ClientTelemetryUtils.decompress(this.data.metrics(), cType);
+ this.data.metrics() :
ClientTelemetryUtils.decompress(this.data.metrics(), cType);
}
public static PushTelemetryRequest parse(ByteBuffer buffer, short version)
{
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index 7dfaa2b0e50..91df6b8aac5 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
@@ -710,12 +711,12 @@ public class ClientTelemetryReporter implements
MetricsReporter {
}
CompressionType compressionType =
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
- byte[] compressedPayload;
+ ByteBuffer compressedPayload;
try {
compressedPayload = ClientTelemetryUtils.compress(payload,
compressionType);
} catch (IOException e) {
log.info("Failed to compress telemetry payload for
compression: {}, sending uncompressed data", compressionType);
- compressedPayload = payload.toByteArray();
+ compressedPayload = ByteBuffer.wrap(payload.toByteArray());
compressionType = CompressionType.NONE;
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
index a943f85b38f..3c555afb3b0 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
-import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -191,23 +190,22 @@ public class ClientTelemetryUtils {
return CompressionType.NONE;
}
- public static byte[] compress(MetricsData metrics, CompressionType
compressionType) throws IOException {
+ public static ByteBuffer compress(MetricsData metrics, CompressionType
compressionType) throws IOException {
try (ByteBufferOutputStream compressedOut = new
ByteBufferOutputStream(512)) {
Compression compression = Compression.of(compressionType).build();
try (OutputStream out = compression.wrapForOutput(compressedOut,
RecordBatch.CURRENT_MAGIC_VALUE)) {
metrics.writeTo(out);
}
compressedOut.buffer().flip();
- return Utils.toArray(compressedOut.buffer());
+ return compressedOut.buffer();
}
}
- public static ByteBuffer decompress(byte[] metrics, CompressionType
compressionType) {
- ByteBuffer data = ByteBuffer.wrap(metrics);
+ public static ByteBuffer decompress(ByteBuffer metrics, CompressionType
compressionType) {
Compression compression = Compression.of(compressionType).build();
- try (InputStream in = compression.wrapForInput(data,
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
+ try (InputStream in = compression.wrapForInput(metrics,
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
ByteBufferOutputStream out = new ByteBufferOutputStream(512)) {
- byte[] bytes = new byte[data.capacity() * 2];
+ byte[] bytes = new byte[metrics.limit() * 2];
int nRead;
while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
out.write(bytes, 0, nRead);
diff --git
a/clients/src/main/resources/common/message/PushTelemetryRequest.json
b/clients/src/main/resources/common/message/PushTelemetryRequest.json
index b91cc7d94f7..dd39bbf1ce6 100644
--- a/clients/src/main/resources/common/message/PushTelemetryRequest.json
+++ b/clients/src/main/resources/common/message/PushTelemetryRequest.json
@@ -38,7 +38,7 @@
"about": "Compression codec used to compress the metrics."
},
{
- "name": "Metrics", "type": "bytes", "versions": "0+",
+ "name": "Metrics", "type": "bytes", "versions": "0+", "zeroCopy": true,
"about": "Metrics encoded in OpenTelemetry MetricsData v1 protobuf
format."
}
]
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
index 6617a88d9d5..beed3e49102 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.telemetry.internals.MetricKey;
import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -70,12 +71,12 @@ public class PushTelemetryRequestTest {
}
private PushTelemetryRequest getPushTelemetryRequest(MetricsData
metricsData, CompressionType compressionType) throws IOException {
- byte[] compressedData = ClientTelemetryUtils.compress(metricsData,
compressionType);
+ ByteBuffer compressedData = ClientTelemetryUtils.compress(metricsData,
compressionType);
byte[] data = metricsData.toByteArray();
if (compressionType != CompressionType.NONE) {
- assertTrue(compressedData.length < data.length);
+ assertTrue(compressedData.limit() < data.length);
} else {
- assertArrayEquals(compressedData, data);
+ assertArrayEquals(Utils.toArray(compressedData), data);
}
return new PushTelemetryRequest.Builder(
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a3206b60c9e..5de7f898d43 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -3872,7 +3872,7 @@ public class RequestResponseTest {
.setSubscriptionId(1)
.setTerminating(false)
.setCompressionType(CompressionType.ZSTD.id)
- .setMetrics("test-metrics".getBytes(StandardCharsets.UTF_8))
+
.setMetrics(ByteBuffer.wrap("test-metrics".getBytes(StandardCharsets.UTF_8)))
).build(version);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
index 9f21c6c6fda..41679bed3f7 100644
---
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
@@ -134,12 +134,12 @@ public class ClientTelemetryUtilsTest {
public void testCompressDecompress(CompressionType compressionType) throws
IOException {
MetricsData metricsData = getMetricsData();
byte[] raw = metricsData.toByteArray();
- byte[] compressed = ClientTelemetryUtils.compress(metricsData,
compressionType);
+ ByteBuffer compressed = ClientTelemetryUtils.compress(metricsData,
compressionType);
assertNotNull(compressed);
if (compressionType != CompressionType.NONE) {
- assertTrue(compressed.length < raw.length);
+ assertTrue(compressed.limit() < raw.length);
} else {
- assertArrayEquals(raw, compressed);
+ assertArrayEquals(raw, Utils.toArray(compressed));
}
ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed,
compressionType);
assertNotNull(decompressed);
diff --git
a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index d3e3c137ac2..ba26c9b4dae 100644
--- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -59,6 +59,7 @@ import org.apache.kafka.server.util.timer.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -210,8 +211,8 @@ public class ClientMetricsManager implements AutoCloseable {
}
// Push the metrics to the external client receiver plugin.
- byte[] metrics = request.data().metrics();
- if (metrics != null && metrics.length > 0) {
+ ByteBuffer metrics = request.data().metrics();
+ if (metrics != null && metrics.limit() > 0) {
try {
long exportTimeStartMs = time.hiResClockMs();
receiverPlugin.exportMetrics(requestContext, request);
@@ -428,7 +429,7 @@ public class ClientMetricsManager implements AutoCloseable {
throw new UnsupportedCompressionTypeException(msg);
}
- if (request.data().metrics() != null &&
request.data().metrics().length > clientTelemetryMaxBytes) {
+ if (request.data().metrics() != null &&
request.data().metrics().limit() > clientTelemetryMaxBytes) {
String msg = String.format("Telemetry request from [%s] is larger
than the maximum allowed size [%s]",
request.data().clientInstanceId(), clientTelemetryMaxBytes);
throw new TelemetryTooLargeException(msg);
diff --git
a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
index 7edb286ae1a..4cb4053505b 100644
--- a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
@@ -364,7 +365,7 @@ public class ClientMetricsManagerTest {
.setClientInstanceId(response.data().clientInstanceId())
.setSubscriptionId(response.data().subscriptionId())
.setCompressionType(CompressionType.NONE.id)
-
.setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
+
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))),
true).build();
PushTelemetryResponse pushResponse =
newClientMetricsManager.processPushTelemetryRequest(
pushRequest, ClientMetricsTestUtils.requestContext());
@@ -559,7 +560,7 @@ public class ClientMetricsManagerTest {
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
.setCompressionType(CompressionType.NONE.id)
- .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)),
true).build();
+
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))),
true).build();
PushTelemetryResponse response =
clientMetricsManager.processPushTelemetryRequest(
request, ClientMetricsTestUtils.requestContext());
@@ -603,7 +604,7 @@ public class ClientMetricsManagerTest {
new PushTelemetryRequestData()
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
-
.setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
+
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))),
true).build();
PushTelemetryResponse response =
newClientMetricsManager.processPushTelemetryRequest(
request, ClientMetricsTestUtils.requestContext());
@@ -640,7 +641,7 @@ public class ClientMetricsManagerTest {
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
.setCompressionType(CompressionType.NONE.id)
- .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)),
true).build();
+
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))),
true).build();
PushTelemetryResponse response =
clientMetricsManager.processPushTelemetryRequest(
request, ClientMetricsTestUtils.requestContext());
@@ -688,7 +689,7 @@ public class ClientMetricsManagerTest {
new PushTelemetryRequestData()
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
- .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)),
true).build();
+
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))),
true).build();
PushTelemetryResponse response =
clientMetricsManager.processPushTelemetryRequest(
request, ClientMetricsTestUtils.requestContext());
@@ -726,7 +727,7 @@ public class ClientMetricsManagerTest {
new PushTelemetryRequestData()
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
- .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)),
true).build();
+
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))),
true).build();
PushTelemetryResponse response =
clientMetricsManager.processPushTelemetryRequest(
request, ClientMetricsTestUtils.requestContext());
@@ -738,7 +739,7 @@ public class ClientMetricsManagerTest {
new PushTelemetryRequestData()
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
- .setMetrics("test-data".getBytes(StandardCharsets.UTF_8))
+
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8)))
.setTerminating(true), true).build();
response = clientMetricsManager.processPushTelemetryRequest(
@@ -810,7 +811,7 @@ public class ClientMetricsManagerTest {
PushTelemetryRequest request = new PushTelemetryRequest.Builder(
new PushTelemetryRequestData()
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
- .setMetrics("test-data".getBytes(StandardCharsets.UTF_8))
+
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8)))
.setSubscriptionId(1234), true).build();
PushTelemetryResponse response =
clientMetricsManager.processPushTelemetryRequest(
@@ -908,7 +909,7 @@ public class ClientMetricsManagerTest {
new PushTelemetryRequestData()
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
- .setMetrics(metrics), true).build();
+ .setMetrics(ByteBuffer.wrap(metrics)),
true).build();
// Set the max bytes 1 to force the error.
PushTelemetryResponse response =
clientMetricsManager.processPushTelemetryRequest(
@@ -937,7 +938,7 @@ public class ClientMetricsManagerTest {
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
.setCompressionType(CompressionType.NONE.id)
- .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)),
true).build();
+
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))),
true).build();
CountDownLatch lock = new CountDownLatch(2);
List<PushTelemetryResponse> responses =
Collections.synchronizedList(new ArrayList<>());
@@ -1013,7 +1014,7 @@ public class ClientMetricsManagerTest {
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
.setCompressionType(CompressionType.NONE.id)
- .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)),
true).build();
+
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))),
true).build();
clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultProperties());
assertEquals(1, clientMetricsManager.subscriptions().size());
@@ -1103,7 +1104,7 @@ public class ClientMetricsManagerTest {
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
.setCompressionType(CompressionType.NONE.id)
-
.setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
+
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))),
true).build();
PushTelemetryResponse response =
clientMetricsManager.processPushTelemetryRequest(
request, ClientMetricsTestUtils.requestContext());
diff --git
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java
index 35f9a41b47d..698e8720954 100644
---
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java
+++
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -53,7 +54,7 @@ public class ClientMetricsReceiverPluginTest {
byte[] metrics = "test-metrics".getBytes(StandardCharsets.UTF_8);
clientMetricsReceiverPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
- new PushTelemetryRequest.Builder(new
PushTelemetryRequestData().setMetrics(metrics), true).build());
+ new PushTelemetryRequest.Builder(new
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build());
assertEquals(1, telemetryReceiver.exportMetricsInvokedCount);
assertEquals(1, telemetryReceiver.metricsData.size());