nicktelford commented on code in PR #21578:
URL: https://github.com/apache/kafka/pull/21578#discussion_r2879506037


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java:
##########
@@ -54,8 +60,30 @@ public final void commit(final RocksDBStore.DBAccessor 
accessor, final Map<Topic
     }
 
     @Override
-    public void close() {
+    public void open(final RocksDBStore.DBAccessor accessor) throws 
RocksDBException {
+        final byte[] valueBytes = accessor.get(offsetColumnFamilyHandle, 
statusKey);
+        if (valueBytes == null || Arrays.equals(valueBytes, closedState)) {
+            // If the status key is not present, we initialize it to "OPEN"
+            accessor.put(offsetColumnFamilyHandle, statusKey, openState);
+            // Store the new status on disk
+            accessor.flush(offsetColumnFamilyHandle);
+            open.set(true);
+        } else {
+            throw new RocksDBException("Invalid state");
+        }
+    }
+
+    @Override
+    public void close(final RocksDBStore.DBAccessor accessor) throws 
RocksDBException {
+        accessor.put(offsetColumnFamilyHandle, statusKey, closedState);
+        accessor.flush(offsetColumnFamilyHandle);

Review Comment:
   Again, is this `flush` necessary? Doesn't Rocks guarantee that it will flush 
any unwritten data to disk on-close?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java:
##########
@@ -54,8 +60,30 @@ public final void commit(final RocksDBStore.DBAccessor 
accessor, final Map<Topic
     }
 
     @Override
-    public void close() {
+    public void open(final RocksDBStore.DBAccessor accessor) throws 
RocksDBException {
+        final byte[] valueBytes = accessor.get(offsetColumnFamilyHandle, 
statusKey);
+        if (valueBytes == null || Arrays.equals(valueBytes, closedState)) {
+            // If the status key is not present, we initialize it to "OPEN"
+            accessor.put(offsetColumnFamilyHandle, statusKey, openState);
+            // Store the new status on disk
+            accessor.flush(offsetColumnFamilyHandle);

Review Comment:
   I don't think it's necessary to explicitly `flush` here. Since the status 
flag is written to the offsets CF, we can guarantee that if the status flag 
hasn't been flushed to disk, then no offsets written to the CF after the store 
was opened were flushed either. Since we also have Atomic Flush, we can 
guarantee that no _records_ have been flushed if the status flag has also not 
been flushed.
   
   Conversely, Atomic Flush guarantees us that if records/offsets, written 
since the store was opened, have been flushed to disk, for any reason, then the 
status flag has also been flushed.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java:
##########
@@ -54,8 +60,30 @@ public final void commit(final RocksDBStore.DBAccessor 
accessor, final Map<Topic
     }
 
     @Override
-    public void close() {
+    public void open(final RocksDBStore.DBAccessor accessor) throws 
RocksDBException {
+        final byte[] valueBytes = accessor.get(offsetColumnFamilyHandle, 
statusKey);
+        if (valueBytes == null || Arrays.equals(valueBytes, closedState)) {
+            // If the status key is not present, we initialize it to "OPEN"
+            accessor.put(offsetColumnFamilyHandle, statusKey, openState);
+            // Store the new status on disk
+            accessor.flush(offsetColumnFamilyHandle);
+            open.set(true);
+        } else {
+            throw new RocksDBException("Invalid state");

Review Comment:
   Can we be a bit more descriptive with this error? Perhaps indicate that this 
store was "dirty" and, under EOS, needs to be wiped.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java:
##########
@@ -35,6 +37,10 @@ abstract class AbstractColumnFamilyAccessor implements 
RocksDBStore.ColumnFamily
     private final ColumnFamilyHandle offsetColumnFamilyHandle;
     private final StringSerializer stringSerializer = new StringSerializer();
     private final Serdes.LongSerde longSerde = new Serdes.LongSerde();
+    private final byte[] statusKey = stringSerializer.serialize(null, 
"status");
+    private final byte[] openState = longSerde.serializer().serialize(null, 
1L);
+    private final byte[] closedState = longSerde.serializer().serialize(null, 
0L);
+    private final AtomicBoolean open = new AtomicBoolean(false);

Review Comment:
   This was previously a `volatile boolean`, is it necessary to use an 
`AtomicBoolean` instead?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java:
##########
@@ -356,6 +356,7 @@ public 
RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig() {}
 
         public void setConfig(final String storeName, final Options options, 
final Map<String, Object> configs) {
             options.setTableFormatConfig(new PlainTableConfig());
+            options.useFixedLengthPrefixExtractor(1);

Review Comment:
   What does this do? 🤔 



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java:
##########
@@ -54,8 +60,30 @@ public final void commit(final RocksDBStore.DBAccessor 
accessor, final Map<Topic
     }
 
     @Override
-    public void close() {
+    public void open(final RocksDBStore.DBAccessor accessor) throws 
RocksDBException {
+        final byte[] valueBytes = accessor.get(offsetColumnFamilyHandle, 
statusKey);
+        if (valueBytes == null || Arrays.equals(valueBytes, closedState)) {

Review Comment:
   IIRC, we only care about the "status" flag under EOS. Under ALOS, even if 
the store was previously opened, we want to use the committed offsets as our 
starting point and restore forwards from there.
   
   One wrinkle here is that `RocksDBStore` is not _currently_ aware of the 
processing mode, but it can be determined via `stateStoreContext.appConfigs()`.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -245,7 +245,11 @@ void openDB(final Map<String, Object> configs, final File 
stateDir) {
         setupStatistics(configs, dbOptions);
         openRocksDB(dbOptions, columnFamilyOptions);
         dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
-        open = true;
+        try {
+            cfAccessor.open(dbAccessor);
+        } catch (final Throwable fatal) {
+            throw new ProcessorStateException(fatal);

Review Comment:
   Should we perhaps provide an error message with a bit more context here, 
like the `name()` of this StateStore?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to