mjsax commented on a change in pull request #10861: URL: https://github.com/apache/kafka/pull/10861#discussion_r652355524
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -82,20 +87,23 @@ private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> { private WindowStore<K, V2> otherWindowStore; - private StreamsMetricsImpl metrics; private Sensor droppedRecordsSensor; private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty(); - @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); - metrics = (StreamsMetricsImpl) context.metrics(); + final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); otherWindowStore = context.getStateStore(otherWindowName); - if (StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) { - outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name)); + if (enableSpuriousResultFix Review comment: After update the code with the `else` a few tests started to fail. The issue is, that for left/outer join we _always_ set the store name (even if the feature is disabled...) -- only for inner join, we get an `Optinal.empty()`. Thus, we can actually not verify the `else` case (ie, we added a store even if we don't need it) at runtime. I guess we need to rely on the added unit tests instead to cover this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org