vvcephei commented on a change in pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#discussion_r425900580



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
##########
@@ -19,13 +19,18 @@
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
 import java.util.Map;
 
 public interface RecordCollector {
 
+    BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer();
+    ByteArraySerializer BYTE_ARRAY_VALUE_SERIALIZER = new 
ByteArraySerializer();
+

Review comment:
       It doesn't seem like these need to be defined here, since they're only 
used outside of this interface.
   
   They actually only have two, independent, usages, and it doesn't seem that 
important to de-duplicate the instances. Can we just copy them to separate 
constants in the classes that need them?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##########
@@ -218,4 +230,16 @@ public void initialize() {
     public void uninitialize() {
         initialized = false;
     }
+
+    @Override
+    public TaskType taskType() {
+        return stateManager.taskType();
+    }

Review comment:
       Looks like this doesn't need to be defaulted here. If the logic doesn't 
apply to all the implementing classes, it's better not to define it in the 
abstract class.

##########
File path: 
streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
##########
@@ -116,4 +119,20 @@ public void register(final StateStore store, final 
StateRestoreCallback stateRes
     public StateRestoreCallback stateRestoreCallback(final String storeName) {
         return restoreCallbacks.get(storeName);
     }
+
+    @Override
+    public TaskType taskType() {
+        return taskType;
+    }
+
+    public void setTaskType(final TaskType newType) {
+        taskType = newType;
+    }

Review comment:
       Looks like this is unused.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
##########
@@ -113,24 +114,26 @@ public void 
shouldDelegateToUnderlyingStoreWhenFetchingRange() {
     @SuppressWarnings("deprecation")
     public void shouldRetainDuplicatesWhenSet() {
         store = new ChangeLoggingWindowBytesStore(inner, true);
+
         inner.put(bytesKey, value, 0);
         EasyMock.expectLastCall().times(2);
 
         init();
-        store.put(bytesKey, value);
-        store.put(bytesKey, value);
 
         final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1);
         final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2);
-        assertThat(collector.collected().size(), equalTo(2));
-        assertThat(collector.collected().get(0).key(), equalTo(key1));
-        assertThat(collector.collected().get(0).value(), equalTo(value));
-        assertThat(collector.collected().get(0).timestamp(), equalTo(0L));
-        assertThat(collector.collected().get(1).key(), equalTo(key2));
-        assertThat(collector.collected().get(1).value(), equalTo(value));
-        assertThat(collector.collected().get(1).timestamp(), equalTo(0L));
 
-        EasyMock.verify(inner);
+        EasyMock.reset(context);
+        EasyMock.expect(context.timestamp()).andStubReturn(0L);
+        context.logChange(store.name(), key1, value, 0L);
+        context.logChange(store.name(), key2, value, 0L);

Review comment:
       We don't need `expectLastCall()` on these (and everywhere else)?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##########
@@ -218,4 +230,16 @@ public void initialize() {
     public void uninitialize() {
         initialized = false;
     }
+
+    @Override
+    public TaskType taskType() {
+        return stateManager.taskType();
+    }
+
+    void throwUnsupportedOperationExceptionIfStandby(final String 
operationName) {
+        if (taskType() == TaskType.STANDBY) {
+            throw new UnsupportedOperationException(
+                "this should not happen: " + operationName + "() is not 
supported in standby tasks.");
+        }
+    }

Review comment:
       It seems like an abstraction error to have something like this in an 
abstract class. Much better to just move all the implementations that need it 
to the concrete classes.
   
   For example, it's unclear whether the logic that's protected by this method 
should include global tasks or not. I.e., was it intended to "throw if not 
Active" (and we just forgot that there are also global tasks), or "throw if not 
Active or Global"? I'm not asking you to answer this question; I'm pointing out 
that putting this in the abstract class makes the code ambiguous. Even if the 
code is all correct right now, it's dangerous for maintenence because it would 
be easy to make the mistake of forgetting about global tasks at any point in 
the future and introducing a bug.
   
   OTOH, if all this logic gets pushed into the implementations, then the 
ProcessorContextImpl can assert that it only gets Active or Standby, and it can 
safely use this method, while the GlobalContext can take care of itself.

##########
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##########
@@ -75,6 +75,7 @@
     private final List<CapturedForward> capturedForwards = new LinkedList<>();
     private boolean committed = false;
 
+

Review comment:
       Not sure about this change ;) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to