guozhangwang commented on a change in pull request #10861: URL: https://github.com/apache/kafka/pull/10861#discussion_r651983648
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -82,20 +87,23 @@ private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> { private WindowStore<K, V2> otherWindowStore; - private StreamsMetricsImpl metrics; private Sensor droppedRecordsSensor; private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty(); - @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); - metrics = (StreamsMetricsImpl) context.metrics(); + final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); otherWindowStore = context.getStateStore(otherWindowName); - if (StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) { - outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name)); + if (enableSpuriousResultFix Review comment: I'm thinking exactly the opposite :) if we have a bug which would cause us to create a state store, checking it twice may actually mask the bug: we would end up creating the state store, and then on the second check not getting it, so the behavior is still correct, and it'll be hard for us to discover we are creating state stores unnecessarily. If we have a bug and do not create state stores when needed, then we would behave in the old way without the fix; the key point here is that, we only have one decision point to make, and either that decision is correct or buggy, we can get it surfaced quickly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org