mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r652355524



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -82,20 +87,23 @@
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> 
{
         private WindowStore<K, V2> otherWindowStore;
-        private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
         private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            metrics = (StreamsMetricsImpl) context.metrics();
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
             droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
             otherWindowStore = context.getStateStore(otherWindowName);
 
-            if 
(StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), 
ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
-                outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+            if (enableSpuriousResultFix

Review comment:
       After update the code with the `else` a few tests started to fail. The 
issue is, that for left/outer join we _always_ set the store name (even if the 
feature is disabled...) -- only for inner join, we get an `Optinal.empty()`. 
Thus, we can actually not verify the `else` case (ie, we added a store even if 
we don't need it) at runtime. I guess we need to rely on the added unit tests 
instead to cover this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to