mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r644509797



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -158,32 +165,52 @@ Cancellable schedule(final Duration interval,
     void commit();
 
     /**
-     * Returns the topic name of the current input record; could be null if it 
is not
-     * available (for example, if this method is invoked from the punctuate 
call).
+     * Returns the topic name of the current input record; could be {@code 
null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, 
or while processing a
+     * record that was forwarded by a punctuation callback, the record won't 
have an associated topic.
+     * Another example is
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic 
name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by 
the Kafka Streams DSL.
      *
      * @return the topic name
      */
     String topic();
 
     /**
-     * Returns the partition id of the current input record; could be -1 if it 
is not
-     * available (for example, if this method is invoked from the punctuate 
call).
+     * Returns the partition id of the current input record; could be {@code 
-1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, 
or while processing a
+     * record that was forwarded by a punctuation callback the record won't 
have an associated partition id.
+     * Another example is
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}
+     * (and siblings), that do not always guarantee to provide a valid 
partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by 
the Kafka Streams DSL.
      *
      * @return the partition id
      */
     int partition();
 
     /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate 
call).
+     * Returns the offset of the current input record; could be {@code -1} if 
it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, 
or while processing a
+     * record that was forwarded by a punctuation callback, the record won't 
have an associated offset.
+     * Another example is
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}
+     * (and siblings), that do not always guarantee to provide a valid offset, 
as they might be
+     * executed "out-of-band" due to some internal optimizations applied by 
the Kafka Streams DSL.
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the headers of the current input record; could be null if it is 
not
-     * available (for example, if this method is invoked from the punctuate 
call).
+     * Returns the headers of the current input record.

Review comment:
       While I think `headers` should never be `null`, they could still be 
empty. Not sure to what extend this must be documented? For punctuation 
"forwards" it seems obvious (similar if one would not set a key or value...), 
and for the new `Record` api it's even more obvious. For the 
`KTable#transformValues` "ValueGetter" case, it's a little different though. 
Might be worth to repeat here?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
##########
@@ -43,12 +43,11 @@ public ProcessorRecordContext(final long timestamp,
                                   final int partition,
                                   final String topic,
                                   final Headers headers) {
-
         this.timestamp = timestamp;
         this.offset = offset;
         this.topic = topic;
         this.partition = partition;
-        this.headers = headers;
+        this.headers = Objects.requireNonNull(headers);

Review comment:
       I think, for the internal change, this is actually ok. (I am still ok to 
split this into a separate PR, and keep this on a JavaDocs only PR).

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
##########
@@ -43,16 +43,12 @@
 
     private final Headers headers = new RecordHeaders(new Header[]{new 
RecordHeader("key", "value".getBytes())});
     private NamedCache cache;
-    private Metrics innerMetrics;
-    private StreamsMetricsImpl metrics;
-    private final String taskIDString = "0.0";
-    private final String underlyingStoreName = "storeName";

Review comment:
       Just some side cleanup in this test.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
##########
@@ -43,16 +43,12 @@
 
     private final Headers headers = new RecordHeaders(new Header[]{new 
RecordHeader("key", "value".getBytes())});
     private NamedCache cache;
-    private Metrics innerMetrics;
-    private StreamsMetricsImpl metrics;
-    private final String taskIDString = "0.0";
-    private final String underlyingStoreName = "storeName";

Review comment:
       Just some additional side cleanup in this test.

##########
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##########
@@ -319,7 +321,7 @@ public void setRecordMetadata(final String topic,
         this.topic = topic;
         this.partition = partition;
         this.offset = offset;
-        this.headers = headers;
+        this.headers = Objects.requireNonNull(headers);

Review comment:
       This change is a little tricky... But if we consider it a bug-fix, it 
might still be ok?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to