This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 844c125 MINOR: Optimize the OrderedBytes#upperRange for not all query
cases (#11181)
844c125 is described below
commit 844c1259a9e2ea9caf2635213c4c0a144f6b758f
Author: Luke Chen <[email protected]>
AuthorDate: Fri Aug 27 05:37:34 2021 +0800
MINOR: Optimize the OrderedBytes#upperRange for not all query cases (#11181)
Currently in OrderedBytes#upperRange method, we'll check key bytes 1 by 1,
to see if there's a byte value >= first timestamp byte value, so that we can
skip the following key bytes, because we know compareTo will always return 0 or
1. However, in most cases, the first timestamp byte is always 0, more
specifically the upperRange is called for both window store and session store.
For former, the suffix is in timestamp, Long.MAX_VALUE and for latter the
suffix is in Long.MAX_VALUE, times [...]
This PR optimizes the not all query cases by not checking the key byte 1 by
1 (because we know the unsigned integer will always be >= 0), instead, put all
bytes and timestamp directly. So, we won't have byte array copy in the end
either.
Reviewers: Guozhang Wang <[email protected]>
---
.../streams/state/internals/OrderedBytes.java | 36 ++++++++++++++--------
.../state/internals/WindowKeySchemaTest.java | 2 +-
2 files changed, 25 insertions(+), 13 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
index cd6b6ad..561f24c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
@@ -32,21 +32,33 @@ class OrderedBytes {
static Bytes upperRange(final Bytes key, final byte[] maxSuffix) {
final byte[] bytes = key.get();
final ByteBuffer rangeEnd = ByteBuffer.allocate(bytes.length +
maxSuffix.length);
+ final int firstTimestampByte = maxSuffix[0] & 0xFF;
- int i = 0;
- while (i < bytes.length && (
- i < MIN_KEY_LENGTH // assumes keys are at least one byte long
- || (bytes[i] & 0xFF) >= (maxSuffix[0] & 0xFF)
- )) {
- rangeEnd.put(bytes[i++]);
- }
+ // if firstTimestampByte is 0, we'll put all key bytes into range
result because `(bytes[i] & 0xFF) >= firstTimestampByte`
+ // will always be true (this is a byte to unsigned int conversion
comparison)
+ if (firstTimestampByte == 0) {
+ return Bytes.wrap(
+ rangeEnd
+ .put(bytes)
+ .put(maxSuffix)
+ .array()
+ );
+ } else {
+ int i = 0;
+ while (i < bytes.length && (
+ i < MIN_KEY_LENGTH // assumes keys are at least one byte long
+ || (bytes[i] & 0xFF) >= firstTimestampByte
+ )) {
+ rangeEnd.put(bytes[i++]);
+ }
- rangeEnd.put(maxSuffix);
- rangeEnd.flip();
+ rangeEnd.put(maxSuffix);
+ rangeEnd.flip();
- final byte[] res = new byte[rangeEnd.remaining()];
- ByteBuffer.wrap(res).put(rangeEnd);
- return Bytes.wrap(res);
+ final byte[] res = new byte[rangeEnd.remaining()];
+ ByteBuffer.wrap(res).put(rangeEnd);
+ return Bytes.wrap(res);
+ }
}
static Bytes lowerRange(final Bytes key, final byte[] minSuffix) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
index 8f0ca83..dc88410 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
@@ -128,7 +128,7 @@ public class WindowKeySchemaTest {
final Bytes upper = windowKeySchema.upperRange(Bytes.wrap(new byte[]
{0xC, 0xC, 0x9}), 0x0AffffffffffffffL);
assertThat(
- "shorter key with max timestamp should be in range",
+ "shorter key with customized timestamp should be in range",
upper.compareTo(
WindowKeySchema.toStoreKeyBinary(
new byte[] {0xC, 0xC},