nicktelford commented on code in PR #15137:
URL: https://github.com/apache/kafka/pull/15137#discussion_r1443144457


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -805,6 +959,194 @@ public void reset() {
         public void close() {
             // nothing to close
         }
+
+        @Override
+        public void maybeRegisterTransactionIterator(final 
ManagedKeyValueIterator<Bytes, byte[]> iterator) {
+            // never register iterators as transaction iterator, because we 
have no transaction
+        }
+
+        @Override
+        public void writeOffset(final TopicPartition topicPartition, final 
Long offset, final WriteBatchInterface batch) throws RocksDBException {
+            final byte[] key = topicPartitionKeyCache.computeIfAbsent(
+                    topicPartition,
+                    tp -> TOPIC_PARTITION_SERIALIZER.serialize(null, tp)
+            );
+            if (offset == null) {
+                batch.delete(offsetsCF, key);
+            } else {
+                final byte[] serializedOffset = 
OFFSET_SERIALIZER.serialize(null, offset);
+                batch.put(offsetsCF, key, serializedOffset);
+            }
+        }
+
+        @Override
+        public void updatePosition(final Position position,
+                                   final TopicPartition topicPartition,
+                                   final Long offset) throws RocksDBException {
+            final byte[] key = TOPIC_PARTITION_SERIALIZER.serialize(null, 
topicPartition);
+            if (offset == null) {
+                db.delete(offsetsCF, key);
+            } else {
+                final byte[] value = OFFSET_SERIALIZER.serialize(null, offset);
+                db.put(offsetsCF, key, value);
+                position.withComponent(topicPartition.topic(), 
topicPartition.partition(), offset);
+            }
+        }
+    }
+
+    static class BatchedDBAccessor implements DBAccessor {
+
+        private final RocksDB db;
+        private final WriteBatchWithIndex batch = new 
WriteBatchWithIndex(true);
+        private Position uncommittedPosition = Position.emptyPosition();
+        private long uncommittedBytes;
+
+        private final Map<TopicPartition, byte[]> topicPartitionKeyCache = new 
HashMap<>();
+
+        private final ColumnFamilyHandle offsetsCF;
+        private final WriteOptions writeOptions;
+        private final ReadOptions defaultReadOptions = new ReadOptions();
+
+        // used to simulate calls from StreamThreads in tests
+        boolean isStreamThreadForTest = false;
+
+        private Set<KeyValueIterator<Bytes, byte[]>> openTransactionIterators 
= new HashSet<>();
+
+        BatchedDBAccessor(final RocksDB db,
+                          final ColumnFamilyHandle offsetsCF,
+                          final WriteOptions writeOptions) {
+            this.db = db;
+            this.offsetsCF = offsetsCF;
+            this.writeOptions = writeOptions;
+        }
+
+        @Override
+        public byte[] get(final ColumnFamilyHandle columnFamily, final byte[] 
key) throws RocksDBException {
+            if (Thread.currentThread() instanceof ProcessingThread || 
isStreamThreadForTest) {

Review Comment:
   We need to do this to ensure that, under `READ_COMMITTED`, interactive query 
threads don't read from the transaction buffer, since it's not thread-safe.
   
   While it's not ideal to add more to the hot path, `Thread.currentThread()` 
and `instanceof` checks are both JVM intrinsics, so should be negligible 
compared with the cost of querying RocksDB.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to