nicktelford commented on code in PR #21578:
URL: https://github.com/apache/kafka/pull/21578#discussion_r2854137171


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+import java.util.Map;
+
+/**
+ * Abstract base class for all ColumnFamilyAccessor.
+ * Provides common logic for committing and retrieving offsets,
+ * while delegating specific commit behavior to subclasses.
+ */
+abstract class AbstractColumnFamilyAccessor implements 
RocksDBStore.ColumnFamilyAccessor {
+
+    private final ColumnFamilyHandle offsetColumnFamilyHandle;
+    private final StringSerializer stringSerializer = new StringSerializer();
+    private final Serdes.LongSerde longSerde = new Serdes.LongSerde();
+
+    AbstractColumnFamilyAccessor(final ColumnFamilyHandle 
offsetColumnFamilyHandle) {
+        this.offsetColumnFamilyHandle = offsetColumnFamilyHandle;
+    }
+
+    @Override
+    public final void commit(final RocksDBStore.DBAccessor accessor, final 
Map<TopicPartition, Long> changelogOffsets) throws RocksDBException {
+        this.commit(accessor);
+        for (final Map.Entry<TopicPartition, Long> entry : 
changelogOffsets.entrySet()) {
+            final TopicPartition tp = entry.getKey();
+            final Long offset = entry.getValue();
+            final byte[] key = stringSerializer.serialize(null, tp.toString());
+            final byte[] value = longSerde.serializer().serialize(null, 
offset);
+            accessor.put(offsetColumnFamilyHandle, key, value);
+        }
+        accessor.flush(offsetColumnFamilyHandle);

Review Comment:
   We should never explicitly flush memtables.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java:
##########
@@ -236,8 +237,7 @@ public long approximateNumEntries(final DBAccessor accessor)
     }
 
     @Override
-    public void commit(final DBAccessor accessor,
-                       final Map<TopicPartition, Long> changelogOffsets) 
throws RocksDBException {
+    public void commit(final DBAccessor accessor) throws RocksDBException {
         accessor.flush(oldColumnFamily, newColumnFamily);
     }

Review Comment:
   One of the primary goals of KIP-1035 is to avoid having to explicitly 
`flush` RocksDB, because doing so creates unnecessarily small tables on-disk, 
which then have to be compacted.
   
   We can completely remove this method, but only if we set 
`dbOptions.setAtomicFlush(true)` in `openDB` (anywhere below the 
`configSetter.setConfig` line is fine, so that users can't accidentally 
override it).
   
   Atomic Flush ensures that RocksDB will always flush all Column Families 
atomically, even when doing automatic flushes in the background (which is what 
we want).
   
   This ensures that the records in the data column families will always 
correspond to the offsets in the offsets column family.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java:
##########
@@ -222,6 +223,21 @@ public void 
shouldAddValueProvidersWithStatisticsToInjectedMetricsRecorderWhenRe
         verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), 
notNull(), notNull());
     }
 
+    @Test
+    public void shouldCommitOffsets() {
+        final TopicPartition tp0 = new TopicPartition("topic-0", 0);
+        final TopicPartition tp1 = new TopicPartition("topic-1", 0);
+        final Map<TopicPartition, Long> offsetsToCommit = Map.of(tp0, 100L, 
tp1, 200L);
+        rocksDBStore = getRocksDBStore();
+        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.commit(offsetsToCommit);
+        rocksDBStore.close();
+        rocksDBStore.init(context, rocksDBStore);
+        assertEquals(100L, rocksDBStore.committedOffset(tp0));
+        assertEquals(200L, rocksDBStore.committedOffset(tp1));
+        rocksDBStore.close();
+    }
+

Review Comment:
   As per my above comment: I don't think we should actually enable using these 
offsets until KIP-892 lands. So perhaps here we should have a test that 
verifies that offsets _aren't_ used/stored? 🙁 



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+abstract class AbstractColumnFamilyAccessorTest {
+
+    @Mock
+    protected ColumnFamilyHandle offsetsCF;
+
+    @Mock
+    protected RocksDBStore.DBAccessor dbAccessor;
+
+    protected AbstractColumnFamilyAccessor accessor;
+
+    abstract AbstractColumnFamilyAccessor createColumnFamilyAccessor();
+    private final LongSerializer offsetSerializer = new LongSerializer();
+    private final StringSerializer topicSerializer = new StringSerializer();
+
+
+    @BeforeEach
+    public void setUp() {
+        accessor = createColumnFamilyAccessor();
+    }
+
+    @Test
+    public void shouldCommitOffsets() throws RocksDBException {
+        final TopicPartition tp0 = new TopicPartition("testTopic", 0);
+        final TopicPartition tp1 = new TopicPartition("testTopic", 1);
+        final Map<TopicPartition, Long> changelogOffsets = Map.of(tp0, 10L, 
tp1, 20L);
+        accessor.commit(dbAccessor, changelogOffsets);
+        verify(dbAccessor).flush(offsetsCF);
+        verify(dbAccessor, times(2)).flush(any(ColumnFamilyHandle[].class));
+        verify(dbAccessor).put(eq(offsetsCF), 
eq(topicSerializer.serialize(null, tp0.toString())), 
eq(offsetSerializer.serialize(null, 10L)));
+        verify(dbAccessor).put(eq(offsetsCF), 
eq(topicSerializer.serialize(null, tp1.toString())), 
eq(offsetSerializer.serialize(null, 20L)));

Review Comment:
   I think a better test here would be to verify that reading offsets using 
`accessor.getCommittedOffset` returns the offsets written using 
`accessor.commit`. That way, you're not leaning so heavily on mocking the 
internal implementation details.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -667,6 +636,21 @@ private boolean isOverflowing(final long value) {
         return value < 0;
     }
 
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        try {
+            return cfAccessor.getCommitedOffset(dbAccessor, partition);
+        } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error while getting committed 
offset for partition " + partition, e);
+        }
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public boolean managesOffsets() {
+        return true;

Review Comment:
   I think we'll need to keep this as `false` until KIP-892 lands, because 
until we're able to buffer writes between commits, there's no way to guarantee 
that the committed offsets reflect the records written to the database.
   
   To elaborate: between commits, new records are written to RocksDB. We can't 
guarantee when those records will be written to disk by RocksDB (due to 
background flushes), so if the application crashes between commits, some of the 
records on disk might be newer than the most recently written offsets; this is 
a problem even with atomic flush.
   
   I still think we want this code, but we can't actually _use_ it until 
KIP-892 lands, sadly â˜šī¸ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to