guozhangwang commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r651983648
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -82,20 +87,23 @@
private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1>
{
private WindowStore<K, V2> otherWindowStore;
- private StreamsMetricsImpl metrics;
private Sensor droppedRecordsSensor;
private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>>
outerJoinWindowStore = Optional.empty();
- @SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
- metrics = (StreamsMetricsImpl) context.metrics();
+ final StreamsMetricsImpl metrics = (StreamsMetricsImpl)
context.metrics();
droppedRecordsSensor =
droppedRecordsSensor(Thread.currentThread().getName(),
context.taskId().toString(), metrics);
otherWindowStore = context.getStateStore(otherWindowName);
- if
(StreamsConfig.InternalConfig.getBoolean(context().appConfigs(),
ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
- outerJoinWindowStore = outerJoinWindowName.map(name ->
context.getStateStore(name));
+ if (enableSpuriousResultFix
Review comment:
I'm thinking exactly the opposite :) if we have a bug which would cause
us to create a state store, checking it twice may actually mask the bug: we
would end up creating the state store, and then on the second check not getting
it, so the behavior is still correct, and it'll be hard for us to discover we
are creating state stores unnecessarily.
If we have a bug and do not create state stores when needed, then we would
behave in the old way without the fix; the key point here is that, we only have
one decision point to make, and either that decision is correct or buggy, we
can get it surfaced quickly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]