cadonna commented on code in PR #12235:
URL: https://github.com/apache/kafka/pull/12235#discussion_r891064878


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java:
##########
@@ -166,4 +171,46 @@ public static String extractThreadId(final String 
fullThreadName) {
         final int index = fullThreadName.indexOf("StreamThread-");
         return fullThreadName.substring(index);
     }
+
+    public static long producerRecordSizeInBytes(final ProducerRecord<byte[], 
byte[]> record) {
+        return recordSizeInBytes(
+            record.key().length,
+            record.value() == null ? 0 : record.value().length,
+            record.topic(),
+            record.headers()
+        );
+    }
+
+    public static long consumerRecordSizeInBytes(final ConsumerRecord<byte[], 
byte[]> record) {
+        return recordSizeInBytes(
+            record.serializedKeySize(),
+            record.serializedValueSize(),
+            record.topic(),
+            record.headers()
+        );
+    }
+
+    public static long recordSizeInBytes(final long keyBytes,

Review Comment:
   nit: Could you make this private? 



##########
streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java:
##########
@@ -46,13 +47,15 @@ public <K, V> void send(final String topic,
                             final Integer partition,
                             final Long timestamp,
                             final Serializer<K> keySerializer,
-                            final Serializer<V> valueSerializer) {
+                            final Serializer<V> valueSerializer,
+                            final String processorNodeId,
+                            final InternalProcessorContext<Void, Void> 
context) {
         collected.add(new ProducerRecord<>(topic,
-            partition,
-            timestamp,
-            key,
-            value,
-            headers));
+                                           partition,
+                                           timestamp,
+                                           key,
+                                           value,
+                                           headers));

Review Comment:
   nit: The indentation was actually right since it is a method call and not a 
method declaration.
   



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -192,6 +217,28 @@ public <K, V> void send(final String topic,
                 } else {
                     log.warn("Received offset={} in produce response for {}", 
metadata.offset(), tp);
                 }
+
+                if (!topic.endsWith("-changelog")) {
+                    // we may not have created a sensor yet if the node uses 
dynamic topic routing

Review Comment:
   This comment is a bit misleading here. AFAIU it refers to the `else`-branch. 
Please move it or remove it. I think you know my preference 🙂.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to