vcrfxia commented on code in PR #13143:
URL: https://github.com/apache/kafka/pull/13143#discussion_r1085990051


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical 
store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix 
derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, 
Segment {
+    private static final Logger log = 
LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = 
Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new 
PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes 
keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] 
value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> 
entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical 
segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical 
segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();

Review Comment:
   Sure, I don't feel strongly so I made the change. Besides guarding against 
closing a segment which was never opened, the usage of `open` also guarded 
against closing the same segment twice. I've inlined `closeOpenIterators()` and 
accounted for this by clearing `openIterators` after it's copied.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical 
store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix 
derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, 
Segment {
+    private static final Logger log = 
LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = 
Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new 
PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {

Review Comment:
   Ack, see above. (Addressed in the latest commit.)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical 
store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix 
derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, 
Segment {
+    private static final Logger log = 
LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = 
Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new 
PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes 
keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] 
value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> 
entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical 
segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical 
segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", 
iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;

Review Comment:
   Ack, addressed in the latest commit.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical 
store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix 
derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, 
Segment {
+    private static final Logger log = 
LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = 
Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new 
PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes 
keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] 
value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> 
entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical 
segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical 
segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", 
iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized byte[] get(final Bytes key) {
+        return physicalStore.get(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes 
from, final Bytes to) {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = 
physicalStore.range(
+            prefixKeyFormatter.forPhysicalStore(from),
+            prefixKeyFormatter.forPhysicalStore(to),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = 
physicalStore.prefixScan(
+            prefixKeyFormatter.getPrefix(),
+            new BytesSerializer(),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("cannot estimate num entries 
for logical segment");
+    }
+
+    @Override
+    public void addToBatch(final KeyValue<byte[], byte[]> record, final 
WriteBatch batch) throws RocksDBException {
+        physicalStore.addToBatch(
+            new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(record.key),
+                record.value),
+            batch);
+    }
+
+    @Override
+    public void write(final WriteBatch batch) throws RocksDBException {
+        // no key transformations here since they should've already been done 
as part
+        // of adding to the write batch
+        physicalStore.write(batch);
+    }
+
+    /**
+     * Manages translation between raw key and the key to be stored into the 
physical store.
+     * The key for the physical store is the raw key prepended with a 
fixed-length prefix.
+     */
+    private static class PrefixKeyFormatter {
+        private final byte[] prefix;
+
+        PrefixKeyFormatter(final Bytes prefix) {
+            this.prefix = prefix.get();
+        }
+
+        Bytes forPhysicalStore(final Bytes key) {
+            return key == null ? null : 
Bytes.wrap(forPhysicalStore(key.get()));
+        }
+
+        byte[] forPhysicalStore(final byte[] key) {
+            final byte[] keyWithPrefix = new byte[prefix.length + key.length];
+            System.arraycopy(prefix, 0, keyWithPrefix, 0, prefix.length);
+            System.arraycopy(key, 0, keyWithPrefix, prefix.length, key.length);
+            return keyWithPrefix;
+        }
+
+        Bytes fromPhysicalStore(final Bytes keyWithPrefix) {
+            return Bytes.wrap(fromPhysicalStore(keyWithPrefix.get()));
+        }
+
+        private byte[] fromPhysicalStore(final byte[] keyWithPrefix) {
+            final int rawKeyLength = keyWithPrefix.length - prefix.length;
+            final byte[] rawKey = new byte[rawKeyLength];
+            System.arraycopy(keyWithPrefix, prefix.length, rawKey, 0, 
rawKeyLength);
+            return rawKey;
+        }
+
+        Bytes getPrefix() {
+            return Bytes.wrap(prefix);
+        }
+    }
+
+    /**
+     * Converts a {@link KeyValueIterator} which returns keys with prefixes to 
one which
+     * returns un-prefixed keys.
+     */
+    private static class StrippedPrefixKeyValueIteratorAdapter implements 
KeyValueIterator<Bytes, byte[]> {
+
+        private final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes;
+        private final Function<Bytes, Bytes> keyStripper;
+
+        StrippedPrefixKeyValueIteratorAdapter(final KeyValueIterator<Bytes, 
byte[]> iteratorWithKeyPrefixes,
+                                              final Function<Bytes, Bytes> 
keyStripper) {
+            this.iteratorWithKeyPrefixes = iteratorWithKeyPrefixes;
+            this.keyStripper = keyStripper;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iteratorWithKeyPrefixes.hasNext();
+        }
+
+        @Override
+        public KeyValue<Bytes, byte[]> next() {
+            final KeyValue<Bytes, byte[]> next = 
iteratorWithKeyPrefixes.next();
+            return new KeyValue<>(keyStripper.apply(next.key), next.value);
+        }
+
+        @Override
+        public Bytes peekNextKey() {
+            return keyStripper.apply(iteratorWithKeyPrefixes.peekNextKey());
+        }
+
+        @Override
+        public void remove() {
+            iteratorWithKeyPrefixes.remove();
+        }
+
+        @Override
+        public void close() {
+            iteratorWithKeyPrefixes.close();
+        }
+    }
+
+    private static Bytes serializeLongToBytes(final long l) {
+        return Bytes.wrap(ByteBuffer.allocate(Long.BYTES).putLong(l).array());

Review Comment:
   Nice eye. Fixed.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical 
store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix 
derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, 
Segment {
+    private static final Logger log = 
LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = 
Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new 
PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes 
keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] 
value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> 
entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical 
segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical 
segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", 
iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized byte[] get(final Bytes key) {
+        return physicalStore.get(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes 
from, final Bytes to) {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = 
physicalStore.range(
+            prefixKeyFormatter.forPhysicalStore(from),
+            prefixKeyFormatter.forPhysicalStore(to),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = 
physicalStore.prefixScan(
+            prefixKeyFormatter.getPrefix(),
+            new BytesSerializer(),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("cannot estimate num entries 
for logical segment");
+    }
+
+    @Override
+    public void addToBatch(final KeyValue<byte[], byte[]> record, final 
WriteBatch batch) throws RocksDBException {
+        physicalStore.addToBatch(
+            new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(record.key),
+                record.value),
+            batch);
+    }
+
+    @Override
+    public void write(final WriteBatch batch) throws RocksDBException {
+        // no key transformations here since they should've already been done 
as part
+        // of adding to the write batch
+        physicalStore.write(batch);
+    }
+
+    /**
+     * Manages translation between raw key and the key to be stored into the 
physical store.
+     * The key for the physical store is the raw key prepended with a 
fixed-length prefix.
+     */
+    private static class PrefixKeyFormatter {
+        private final byte[] prefix;
+
+        PrefixKeyFormatter(final Bytes prefix) {
+            this.prefix = prefix.get();
+        }
+
+        Bytes forPhysicalStore(final Bytes key) {
+            return key == null ? null : 
Bytes.wrap(forPhysicalStore(key.get()));
+        }
+
+        byte[] forPhysicalStore(final byte[] key) {
+            final byte[] keyWithPrefix = new byte[prefix.length + key.length];
+            System.arraycopy(prefix, 0, keyWithPrefix, 0, prefix.length);
+            System.arraycopy(key, 0, keyWithPrefix, prefix.length, key.length);
+            return keyWithPrefix;
+        }
+
+        Bytes fromPhysicalStore(final Bytes keyWithPrefix) {
+            return Bytes.wrap(fromPhysicalStore(keyWithPrefix.get()));
+        }
+
+        private byte[] fromPhysicalStore(final byte[] keyWithPrefix) {
+            final int rawKeyLength = keyWithPrefix.length - prefix.length;
+            final byte[] rawKey = new byte[rawKeyLength];
+            System.arraycopy(keyWithPrefix, prefix.length, rawKey, 0, 
rawKeyLength);
+            return rawKey;
+        }
+
+        Bytes getPrefix() {
+            return Bytes.wrap(prefix);

Review Comment:
   `wrap()` just creates the new object (after performing a null check) so it's 
very lightweight. 
   
   It's more convenient to keep `prefix` as `byte[]` than `Bytes` because all 
the other operations require `byte[]` rather than `Bytes`. If we really wanted 
we could keep both (one copy as `byte[]` and another as `Bytes`) but that feels 
like overkill.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical 
store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix 
derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, 
Segment {
+    private static final Logger log = 
LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = 
Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new 
PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes 
keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] 
value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> 
entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical 
segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical 
segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", 
iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized byte[] get(final Bytes key) {
+        return physicalStore.get(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes 
from, final Bytes to) {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = 
physicalStore.range(
+            prefixKeyFormatter.forPhysicalStore(from),
+            prefixKeyFormatter.forPhysicalStore(to),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = 
physicalStore.prefixScan(
+            prefixKeyFormatter.getPrefix(),
+            new BytesSerializer(),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("cannot estimate num entries 
for logical segment");
+    }
+
+    @Override
+    public void addToBatch(final KeyValue<byte[], byte[]> record, final 
WriteBatch batch) throws RocksDBException {
+        physicalStore.addToBatch(
+            new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(record.key),
+                record.value),
+            batch);
+    }
+
+    @Override
+    public void write(final WriteBatch batch) throws RocksDBException {
+        // no key transformations here since they should've already been done 
as part
+        // of adding to the write batch
+        physicalStore.write(batch);
+    }
+
+    /**
+     * Manages translation between raw key and the key to be stored into the 
physical store.
+     * The key for the physical store is the raw key prepended with a 
fixed-length prefix.
+     */
+    private static class PrefixKeyFormatter {
+        private final byte[] prefix;
+
+        PrefixKeyFormatter(final Bytes prefix) {
+            this.prefix = prefix.get();
+        }
+
+        Bytes forPhysicalStore(final Bytes key) {

Review Comment:
   Yup, updated. Originally I was going to have this class be more general 
(i.e., not necessarily prefix-based) so I left the names generic, but I think 
that's overkill for now since we only have the one use case. Agreed that 
switching to the simpler names improves readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to