nicktelford commented on code in PR #14852:
URL: https://github.com/apache/kafka/pull/14852#discussion_r1410509927


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -278,13 +280,55 @@ private void addValueProvidersToMetricsRecorder() {
 
     void openRocksDB(final DBOptions dbOptions,
                      final ColumnFamilyOptions columnFamilyOptions) {
-        final List<ColumnFamilyDescriptor> columnFamilyDescriptors
-                = Collections.singletonList(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
-        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+        final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+                dbOptions,
+                new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions)
+        );
+
+        dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
+    }
+
+    /**
+     * Open RocksDB while automatically creating any requested column families 
that don't yet exist.
+     */
+    List<ColumnFamilyHandle> openRocksDB(final DBOptions dbOptions,
+                                         final ColumnFamilyDescriptor 
defaultColumnFamilyDescriptor,
+                                         final ColumnFamilyDescriptor... 
columnFamilyDescriptors) {
+        final String absolutePath = dbDir.getAbsolutePath();
+        final List<ColumnFamilyDescriptor> extraDescriptors = 
Arrays.asList(columnFamilyDescriptors);
+        final List<ColumnFamilyDescriptor> allDescriptors = new ArrayList<>(1 
+ columnFamilyDescriptors.length);
+        allDescriptors.add(defaultColumnFamilyDescriptor);
+        allDescriptors.addAll(extraDescriptors);
 
         try {
-            db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), 
columnFamilyDescriptors, columnFamilies);
-            dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
+            final Options options = new Options(dbOptions, 
defaultColumnFamilyDescriptor.getOptions());
+            final List<byte[]> allExisting = 
RocksDB.listColumnFamilies(options, absolutePath);
+
+            final List<ColumnFamilyDescriptor> existingDescriptors = 
allDescriptors.stream()
+                    .filter(descriptor -> descriptor == 
defaultColumnFamilyDescriptor || allExisting.stream().anyMatch(existing -> 
Arrays.equals(existing, descriptor.getName())))

Review Comment:
   This is necessary because in the case that the database is being opened for 
the first time, the default column family won't (yet) exist, and so 
`listColumnFamilies` returns an empty `List`.
   
   Since the default column family will be created when we call `open` later, 
we don't need to explicitly create it, so we can count it as an 
`existingDescriptor`, even if it doesn't yet exist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to