nicktelford commented on code in PR #21578:
URL: https://github.com/apache/kafka/pull/21578#discussion_r2854235081


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -667,6 +636,21 @@ private boolean isOverflowing(final long value) {
         return value < 0;
     }
 
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        try {
+            return cfAccessor.getCommitedOffset(dbAccessor, partition);
+        } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error while getting committed 
offset for partition " + partition, e);
+        }
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public boolean managesOffsets() {
+        return true;

Review Comment:
   ~I think we'll need to keep this as `false` until KIP-892 lands, because 
until we're able to buffer writes between commits, there's no way to guarantee 
that the committed offsets reflect the records written to the database.~
   
   ~To elaborate: between commits, new records are written to RocksDB. We can't 
guarantee when those records will be written to disk by RocksDB (due to 
background flushes), so if the application crashes between commits, some of the 
records on disk might be newer than the most recently written offsets; this is 
a problem even with atomic flush.~
   
   ~I still think we want this code, but we can't actually _use_ it until 
KIP-892 lands, sadly~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to