Chuckame commented on code in PR #16218:
URL: https://github.com/apache/kafka/pull/16218#discussion_r1823572135
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java:
##########
@@ -111,34 +110,26 @@ public void process(final Record<KO, Change<VO>> record) {
return;
}
- final Bytes prefixBytes = keySchema.prefixBytes(record.key());
+ final Bytes foreignKeyBytes = keySchema.prefixBytes(record.key());
//Perform the prefixScan and propagate the results
try (final KeyValueIterator<Bytes,
ValueAndTimestamp<SubscriptionWrapper<K>>> prefixScanResults =
- subscriptionStore.range(prefixBytes,
Bytes.increment(prefixBytes))) {
+ subscriptionStore.prefixScan(foreignKeyBytes, new
BytesSerializer())) {
Review Comment:
Personally I prefer to use the native prefixScan instead of a range "hack"
to profit the RocksDB native implementation, as it's using the index and
filters differently, and may load into memory different part of the index.
I'm not sure if a range using your increment would work, as the range is
inclusive, so we are not able to predict what's the maximum value before the
increment.
Taking the following keys `AAA -> ZZZ` (where we only have A-Z letters), if
we prefixScan for `A`, the inclusive-end should be the last value before `BAA`
which is `AZZ`. Your increment suppose to compute a range from `A` to `AZ`, but
it will only take values until `AZ`, while `AZZ` is greater than `AZ`.
Comparatively: `A < AB < AXZ < AZ < AZA < AZZ`
I think the easiest solution is to just exclude the entry when the key is
equals to the incremented bytes, but only for the last key instead of all the
entries, as a range is ordered:
```diff
//Perform the prefixScan and propagate the results
try (final KeyValueIterator<Bytes,
ValueAndTimestamp<SubscriptionWrapper<K>>> prefixScanResults =
subscriptionStore.range(foreignKeyBytes,
Bytes.increment(foreignKeyBytes))) {
while (prefixScanResults.hasNext()) {
final KeyValue<Bytes,
ValueAndTimestamp<SubscriptionWrapper<K>>> next = prefixScanResults.next();
// have to check the prefix on the last key because the
range end is inclusive :(
- if (prefixEquals(next.key.get(),
foreignKeyBytes.get())) {
+ if (prefixScanResults.hasNext() ||
prefixEquals(next.key.get(), foreignKeyBytes.get())) {
final CombinedKey<KO, K> combinedKey =
keySchema.fromBytes(next.key);
context().forward(
record.withKey(combinedKey.primaryKey())
.withValue(new
SubscriptionResponseWrapper<>(
next.value.value().hash(),
record.value().newValue,
next.value.value().primaryPartition()))
);
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]