gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1591293880
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -184,20 +146,34 @@ public void process(final Record<K, V1> record) {
// This condition below allows us to process the
out-of-order records without the need
// to hold it in the temporary outer store
if (!outerJoinStore.isPresent() || timeTo <
sharedTimeTracker.streamTime) {
-
context().forward(record.withValue(joiner.apply(record.key(), record.value(),
null)));
+ context().forward(
+ record.withValue(joiner.apply(record.key(),
record.value(), null)));
Review Comment:
nit: revert
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -43,95 +42,73 @@
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
-class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1,
K, VOut> {
+abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements
ProcessorSupplier<K, VThis, K, VOut> {
private static final Logger LOG =
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
- private final String otherWindowName;
+ private final boolean outer;
+ private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther,
? extends VOut> joiner;
private final long joinBeforeMs;
private final long joinAfterMs;
private final long joinGraceMs;
+ private final String otherWindowName;
+ private final TimeTrackerSupplier sharedTimeTrackerSupplier;
private final boolean enableSpuriousResultFix;
+ private final Optional<String> outerJoinWindowName;
private final long windowsBeforeMs;
private final long windowsAfterMs;
- private final boolean outer;
- private final boolean isLeftSide;
- private final Optional<String> outerJoinWindowName;
- private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ?
extends VOut> joiner;
-
- private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
- KStreamKStreamJoin(final boolean isLeftSide,
- final String otherWindowName,
- final JoinWindowsInternal windows,
- final ValueJoinerWithKey<? super K, ? super V1, ? super
V2, ? extends VOut> joiner,
- final boolean outer,
- final Optional<String> outerJoinWindowName,
- final TimeTrackerSupplier sharedTimeTrackerSupplier) {
- this.isLeftSide = isLeftSide;
+ KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier
sharedTimeTrackerSupplier,
+ final Optional<String> outerJoinWindowName, final long
joinBeforeMs,
+ final long joinAfterMs, final JoinWindowsInternal
windows, final boolean outer,
+ final ValueJoinerWithKey<? super K, ? super VThis, ?
super VOther, ? extends VOut> joiner) {
Review Comment:
Revert the re-ordering here that isn't necessary, so it's easy to see the
signature change for `joiner` and the removal of `isLeftSide`.
We should also put each constructor argument on it's own to follow the
previous style.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -43,95 +42,73 @@
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
-class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1,
K, VOut> {
+abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements
ProcessorSupplier<K, VThis, K, VOut> {
private static final Logger LOG =
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
- private final String otherWindowName;
+ private final boolean outer;
+ private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther,
? extends VOut> joiner;
private final long joinBeforeMs;
private final long joinAfterMs;
private final long joinGraceMs;
+ private final String otherWindowName;
+ private final TimeTrackerSupplier sharedTimeTrackerSupplier;
private final boolean enableSpuriousResultFix;
+ private final Optional<String> outerJoinWindowName;
private final long windowsBeforeMs;
private final long windowsAfterMs;
- private final boolean outer;
- private final boolean isLeftSide;
- private final Optional<String> outerJoinWindowName;
- private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ?
extends VOut> joiner;
-
- private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
- KStreamKStreamJoin(final boolean isLeftSide,
- final String otherWindowName,
- final JoinWindowsInternal windows,
- final ValueJoinerWithKey<? super K, ? super V1, ? super
V2, ? extends VOut> joiner,
- final boolean outer,
- final Optional<String> outerJoinWindowName,
- final TimeTrackerSupplier sharedTimeTrackerSupplier) {
- this.isLeftSide = isLeftSide;
+ KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier
sharedTimeTrackerSupplier,
+ final Optional<String> outerJoinWindowName, final long
joinBeforeMs,
+ final long joinAfterMs, final JoinWindowsInternal
windows, final boolean outer,
+ final ValueJoinerWithKey<? super K, ? super VThis, ?
super VOther, ? extends VOut> joiner) {
this.otherWindowName = otherWindowName;
- if (isLeftSide) {
- this.joinBeforeMs = windows.beforeMs;
- this.joinAfterMs = windows.afterMs;
- } else {
- this.joinBeforeMs = windows.afterMs;
- this.joinAfterMs = windows.beforeMs;
- }
- this.windowsAfterMs = windows.afterMs;
- this.windowsBeforeMs = windows.beforeMs;
- this.joinGraceMs = windows.gracePeriodMs();
+ this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
- this.joiner = joiner;
- this.outer = outer;
this.outerJoinWindowName = outerJoinWindowName;
- this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
- }
-
- @Override
- public Processor<K, V1, K, VOut> get() {
- return new KStreamKStreamJoinProcessor();
+ this.joinBeforeMs = joinBeforeMs;
+ this.joinAfterMs = joinAfterMs;
+ this.joinGraceMs = windows.gracePeriodMs();
+ this.windowsBeforeMs = windows.beforeMs;
+ this.windowsAfterMs = windows.afterMs;
+ this.outer = outer;
+ this.joiner = joiner;
Review Comment:
nit: Reorder this to make the diff smaller.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -43,95 +42,73 @@
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
-class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1,
K, VOut> {
+abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements
ProcessorSupplier<K, VThis, K, VOut> {
private static final Logger LOG =
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
- private final String otherWindowName;
+ private final boolean outer;
+ private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther,
? extends VOut> joiner;
private final long joinBeforeMs;
private final long joinAfterMs;
private final long joinGraceMs;
+ private final String otherWindowName;
+ private final TimeTrackerSupplier sharedTimeTrackerSupplier;
private final boolean enableSpuriousResultFix;
+ private final Optional<String> outerJoinWindowName;
private final long windowsBeforeMs;
private final long windowsAfterMs;
- private final boolean outer;
- private final boolean isLeftSide;
- private final Optional<String> outerJoinWindowName;
- private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ?
extends VOut> joiner;
-
- private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
- KStreamKStreamJoin(final boolean isLeftSide,
- final String otherWindowName,
- final JoinWindowsInternal windows,
- final ValueJoinerWithKey<? super K, ? super V1, ? super
V2, ? extends VOut> joiner,
- final boolean outer,
- final Optional<String> outerJoinWindowName,
- final TimeTrackerSupplier sharedTimeTrackerSupplier) {
- this.isLeftSide = isLeftSide;
+ KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier
sharedTimeTrackerSupplier,
+ final Optional<String> outerJoinWindowName, final long
joinBeforeMs,
+ final long joinAfterMs, final JoinWindowsInternal
windows, final boolean outer,
+ final ValueJoinerWithKey<? super K, ? super VThis, ?
super VOther, ? extends VOut> joiner) {
this.otherWindowName = otherWindowName;
- if (isLeftSide) {
- this.joinBeforeMs = windows.beforeMs;
- this.joinAfterMs = windows.afterMs;
- } else {
- this.joinBeforeMs = windows.afterMs;
- this.joinAfterMs = windows.beforeMs;
- }
- this.windowsAfterMs = windows.afterMs;
- this.windowsBeforeMs = windows.beforeMs;
- this.joinGraceMs = windows.gracePeriodMs();
+ this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
- this.joiner = joiner;
- this.outer = outer;
this.outerJoinWindowName = outerJoinWindowName;
- this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
- }
-
- @Override
- public Processor<K, V1, K, VOut> get() {
- return new KStreamKStreamJoinProcessor();
+ this.joinBeforeMs = joinBeforeMs;
+ this.joinAfterMs = joinAfterMs;
+ this.joinGraceMs = windows.gracePeriodMs();
+ this.windowsBeforeMs = windows.beforeMs;
+ this.windowsAfterMs = windows.afterMs;
+ this.outer = outer;
+ this.joiner = joiner;
}
- private class KStreamKStreamJoinProcessor extends ContextualProcessor<K,
V1, K, VOut> {
- private WindowStore<K, V2> otherWindowStore;
- private Sensor droppedRecordsSensor;
- private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>,
LeftOrRightValue<V1, V2>>> outerJoinStore = Optional.empty();
+ protected abstract class KStreamKStreamJoinProcessor extends
ContextualProcessor<K, VThis, K, VOut> {
private InternalProcessorContext<K, VOut> internalProcessorContext;
+ private Sensor droppedRecordsSensor;
+
private TimeTracker sharedTimeTracker;
+ private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>,
LeftOrRightValue<VL, VR>>> outerJoinStore =
+ Optional.empty();
+ private WindowStore<K, VOther> otherWindowStore;
@Override
public void init(final ProcessorContext<K, VOut> context) {
super.init(context);
internalProcessorContext = (InternalProcessorContext<K, VOut>)
context;
-
final StreamsMetricsImpl metrics = (StreamsMetricsImpl)
context.metrics();
- droppedRecordsSensor =
droppedRecordsSensor(Thread.currentThread().getName(),
context.taskId().toString(), metrics);
+ droppedRecordsSensor =
+ droppedRecordsSensor(Thread.currentThread().getName(),
context.taskId().toString(), metrics);
otherWindowStore = context.getStateStore(otherWindowName);
sharedTimeTracker =
sharedTimeTrackerSupplier.get(context.taskId());
if (enableSpuriousResultFix) {
outerJoinStore =
outerJoinWindowName.map(context::getStateStore);
sharedTimeTracker.setEmitInterval(
- StreamsConfig.InternalConfig.getLong(
- context.appConfigs(),
-
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
- 1000L
- )
+ StreamsConfig.InternalConfig.getLong(
+ context.appConfigs(),
+
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
+ 1000L
+ )
);
Review Comment:
nit: revert changes here
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -184,20 +146,34 @@ public void process(final Record<K, V1> record) {
// This condition below allows us to process the
out-of-order records without the need
// to hold it in the temporary outer store
if (!outerJoinStore.isPresent() || timeTo <
sharedTimeTracker.streamTime) {
-
context().forward(record.withValue(joiner.apply(record.key(), record.value(),
null)));
+ context().forward(
+ record.withValue(joiner.apply(record.key(),
record.value(), null)));
} else {
- sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
- outerJoinStore.ifPresent(store -> store.put(
- TimestampedKeyAndJoinSide.make(isLeftSide,
record.key(), inputRecordTimestamp),
- LeftOrRightValue.make(isLeftSide,
record.value())));
+ sharedTimeTracker.updatedMinTime(recordTimestamp);
+ putInOuterJoinStore(record, recordTimestamp);
}
}
}
}
+ @Override
+ public void close() {
+ sharedTimeTrackerSupplier.remove(context().taskId());
+ }
Review Comment:
nit: move this down near the bottom for a smaller diff
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##########
@@ -89,8 +74,8 @@ public V2 getRightValue() {
@Override
public String toString() {
return "<"
- + ((leftValue != null) ? "left," + leftValue : "right," +
rightValue)
- + ">";
+ + ((leftValue != null) ? "left," + leftValue : "right," +
rightValue)
+ + ">";
Review Comment:
nit: revert this indent change
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -43,95 +42,73 @@
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
-class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1,
K, VOut> {
+abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements
ProcessorSupplier<K, VThis, K, VOut> {
private static final Logger LOG =
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
- private final String otherWindowName;
+ private final boolean outer;
+ private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther,
? extends VOut> joiner;
private final long joinBeforeMs;
private final long joinAfterMs;
private final long joinGraceMs;
+ private final String otherWindowName;
+ private final TimeTrackerSupplier sharedTimeTrackerSupplier;
private final boolean enableSpuriousResultFix;
+ private final Optional<String> outerJoinWindowName;
private final long windowsBeforeMs;
private final long windowsAfterMs;
- private final boolean outer;
- private final boolean isLeftSide;
- private final Optional<String> outerJoinWindowName;
- private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ?
extends VOut> joiner;
-
- private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
- KStreamKStreamJoin(final boolean isLeftSide,
- final String otherWindowName,
- final JoinWindowsInternal windows,
- final ValueJoinerWithKey<? super K, ? super V1, ? super
V2, ? extends VOut> joiner,
- final boolean outer,
- final Optional<String> outerJoinWindowName,
- final TimeTrackerSupplier sharedTimeTrackerSupplier) {
- this.isLeftSide = isLeftSide;
+ KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier
sharedTimeTrackerSupplier,
+ final Optional<String> outerJoinWindowName, final long
joinBeforeMs,
+ final long joinAfterMs, final JoinWindowsInternal
windows, final boolean outer,
+ final ValueJoinerWithKey<? super K, ? super VThis, ?
super VOther, ? extends VOut> joiner) {
this.otherWindowName = otherWindowName;
- if (isLeftSide) {
- this.joinBeforeMs = windows.beforeMs;
- this.joinAfterMs = windows.afterMs;
- } else {
- this.joinBeforeMs = windows.afterMs;
- this.joinAfterMs = windows.beforeMs;
- }
- this.windowsAfterMs = windows.afterMs;
- this.windowsBeforeMs = windows.beforeMs;
- this.joinGraceMs = windows.gracePeriodMs();
+ this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
- this.joiner = joiner;
- this.outer = outer;
this.outerJoinWindowName = outerJoinWindowName;
- this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
- }
-
- @Override
- public Processor<K, V1, K, VOut> get() {
- return new KStreamKStreamJoinProcessor();
+ this.joinBeforeMs = joinBeforeMs;
+ this.joinAfterMs = joinAfterMs;
+ this.joinGraceMs = windows.gracePeriodMs();
+ this.windowsBeforeMs = windows.beforeMs;
+ this.windowsAfterMs = windows.afterMs;
+ this.outer = outer;
+ this.joiner = joiner;
}
- private class KStreamKStreamJoinProcessor extends ContextualProcessor<K,
V1, K, VOut> {
- private WindowStore<K, V2> otherWindowStore;
- private Sensor droppedRecordsSensor;
- private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>,
LeftOrRightValue<V1, V2>>> outerJoinStore = Optional.empty();
+ protected abstract class KStreamKStreamJoinProcessor extends
ContextualProcessor<K, VThis, K, VOut> {
private InternalProcessorContext<K, VOut> internalProcessorContext;
+ private Sensor droppedRecordsSensor;
+
private TimeTracker sharedTimeTracker;
+ private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>,
LeftOrRightValue<VL, VR>>> outerJoinStore =
+ Optional.empty();
+ private WindowStore<K, VOther> otherWindowStore;
Review Comment:
nit: reorder this to make the diff smaller.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##########
@@ -103,7 +88,7 @@ public boolean equals(final Object o) {
}
final LeftOrRightValue<?, ?> that = (LeftOrRightValue<?, ?>) o;
return Objects.equals(leftValue, that.leftValue) &&
- Objects.equals(rightValue, that.rightValue);
+ Objects.equals(rightValue, that.rightValue);
Review Comment:
nit: revert this indent change
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]