nicktelford commented on code in PR #14852:
URL: https://github.com/apache/kafka/pull/14852#discussion_r1410543947


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -278,13 +280,55 @@ private void addValueProvidersToMetricsRecorder() {
 
     void openRocksDB(final DBOptions dbOptions,
                      final ColumnFamilyOptions columnFamilyOptions) {
-        final List<ColumnFamilyDescriptor> columnFamilyDescriptors
-                = Collections.singletonList(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
-        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+        final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+                dbOptions,
+                new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions)
+        );
+
+        dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
+    }
+
+    /**
+     * Open RocksDB while automatically creating any requested column families 
that don't yet exist.
+     */
+    List<ColumnFamilyHandle> openRocksDB(final DBOptions dbOptions,
+                                         final ColumnFamilyDescriptor 
defaultColumnFamilyDescriptor,
+                                         final ColumnFamilyDescriptor... 
columnFamilyDescriptors) {
+        final String absolutePath = dbDir.getAbsolutePath();
+        final List<ColumnFamilyDescriptor> extraDescriptors = 
Arrays.asList(columnFamilyDescriptors);
+        final List<ColumnFamilyDescriptor> allDescriptors = new ArrayList<>(1 
+ columnFamilyDescriptors.length);
+        allDescriptors.add(defaultColumnFamilyDescriptor);
+        allDescriptors.addAll(extraDescriptors);
 
         try {
-            db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), 
columnFamilyDescriptors, columnFamilies);
-            dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
+            final Options options = new Options(dbOptions, 
defaultColumnFamilyDescriptor.getOptions());
+            final List<byte[]> allExisting = 
RocksDB.listColumnFamilies(options, absolutePath);
+
+            final List<ColumnFamilyDescriptor> existingDescriptors = 
allDescriptors.stream()
+                    .filter(descriptor -> descriptor == 
defaultColumnFamilyDescriptor || allExisting.stream().anyMatch(existing -> 
Arrays.equals(existing, descriptor.getName())))
+                    .collect(Collectors.toList());
+            final List<ColumnFamilyDescriptor> toCreate = 
extraDescriptors.stream()
+                    .filter(descriptor -> 
allExisting.stream().noneMatch(existing -> Arrays.equals(existing, 
descriptor.getName())))
+                    .collect(Collectors.toList());
+            final List<ColumnFamilyHandle> existingColumnFamilies = new 
ArrayList<>(existingDescriptors.size());
+            db = RocksDB.open(dbOptions, absolutePath, existingDescriptors, 
existingColumnFamilies);
+            final List<ColumnFamilyHandle> createdColumnFamilies = 
db.createColumnFamilies(toCreate);
+
+            // match up the existing and created ColumnFamilyHandles with the 
existing/created ColumnFamilyDescriptors
+            // so that the order of the resultant List matches the order of 
the openRocksDB arguments
+            final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(allDescriptors.size());
+            int existing = 0;
+            int created = 0;
+            while (existing + created < allDescriptors.size()) {
+                if (existing < existingDescriptors.size() && 
(existingDescriptors.get(existing) == allDescriptors.get(existing + created))) {
+                    columnFamilies.add(existingColumnFamilies.get(existing));
+                    existing++;
+                } else if (created < toCreate.size() && (toCreate.get(created) 
== allDescriptors.get(existing + created))) {
+                    columnFamilies.add(createdColumnFamilies.get(created));
+                    created++;
+                }
+            }

Review Comment:
   Done. I've also refactored the method a bit to avoid depending on the 
`existingDescriptor` and `toCreate` lists, since that would have required the 
method to have 5 arguments; it now only has 3.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to