cadonna commented on code in PR #14852:
URL: https://github.com/apache/kafka/pull/14852#discussion_r1410391609


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -278,13 +280,55 @@ private void addValueProvidersToMetricsRecorder() {
 
     void openRocksDB(final DBOptions dbOptions,
                      final ColumnFamilyOptions columnFamilyOptions) {
-        final List<ColumnFamilyDescriptor> columnFamilyDescriptors
-                = Collections.singletonList(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
-        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+        final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+                dbOptions,
+                new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions)
+        );
+
+        dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
+    }
+
+    /**
+     * Open RocksDB while automatically creating any requested column families 
that don't yet exist.
+     */
+    List<ColumnFamilyHandle> openRocksDB(final DBOptions dbOptions,
+                                         final ColumnFamilyDescriptor 
defaultColumnFamilyDescriptor,
+                                         final ColumnFamilyDescriptor... 
columnFamilyDescriptors) {
+        final String absolutePath = dbDir.getAbsolutePath();
+        final List<ColumnFamilyDescriptor> extraDescriptors = 
Arrays.asList(columnFamilyDescriptors);
+        final List<ColumnFamilyDescriptor> allDescriptors = new ArrayList<>(1 
+ columnFamilyDescriptors.length);
+        allDescriptors.add(defaultColumnFamilyDescriptor);
+        allDescriptors.addAll(extraDescriptors);
 
         try {
-            db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), 
columnFamilyDescriptors, columnFamilies);
-            dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
+            final Options options = new Options(dbOptions, 
defaultColumnFamilyDescriptor.getOptions());
+            final List<byte[]> allExisting = 
RocksDB.listColumnFamilies(options, absolutePath);
+
+            final List<ColumnFamilyDescriptor> existingDescriptors = 
allDescriptors.stream()
+                    .filter(descriptor -> descriptor == 
defaultColumnFamilyDescriptor || allExisting.stream().anyMatch(existing -> 
Arrays.equals(existing, descriptor.getName())))
+                    .collect(Collectors.toList());
+            final List<ColumnFamilyDescriptor> toCreate = 
extraDescriptors.stream()
+                    .filter(descriptor -> 
allExisting.stream().noneMatch(existing -> Arrays.equals(existing, 
descriptor.getName())))
+                    .collect(Collectors.toList());
+            final List<ColumnFamilyHandle> existingColumnFamilies = new 
ArrayList<>(existingDescriptors.size());
+            db = RocksDB.open(dbOptions, absolutePath, existingDescriptors, 
existingColumnFamilies);
+            final List<ColumnFamilyHandle> createdColumnFamilies = 
db.createColumnFamilies(toCreate);
+
+            // match up the existing and created ColumnFamilyHandles with the 
existing/created ColumnFamilyDescriptors
+            // so that the order of the resultant List matches the order of 
the openRocksDB arguments
+            final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(allDescriptors.size());
+            int existing = 0;
+            int created = 0;
+            while (existing + created < allDescriptors.size()) {
+                if (existing < existingDescriptors.size() && 
(existingDescriptors.get(existing) == allDescriptors.get(existing + created))) {
+                    columnFamilies.add(existingColumnFamilies.get(existing));
+                    existing++;
+                } else if (created < toCreate.size() && (toCreate.get(created) 
== allDescriptors.get(existing + created))) {
+                    columnFamilies.add(createdColumnFamilies.get(created));
+                    created++;
+                }

Review Comment:
   Maybe add an `else`-branch that throws an `IllegalStateException` to catch 
bugs.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -278,13 +280,55 @@ private void addValueProvidersToMetricsRecorder() {
 
     void openRocksDB(final DBOptions dbOptions,
                      final ColumnFamilyOptions columnFamilyOptions) {
-        final List<ColumnFamilyDescriptor> columnFamilyDescriptors
-                = Collections.singletonList(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
-        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+        final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+                dbOptions,
+                new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions)
+        );
+
+        dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
+    }
+
+    /**
+     * Open RocksDB while automatically creating any requested column families 
that don't yet exist.
+     */
+    List<ColumnFamilyHandle> openRocksDB(final DBOptions dbOptions,
+                                         final ColumnFamilyDescriptor 
defaultColumnFamilyDescriptor,
+                                         final ColumnFamilyDescriptor... 
columnFamilyDescriptors) {
+        final String absolutePath = dbDir.getAbsolutePath();
+        final List<ColumnFamilyDescriptor> extraDescriptors = 
Arrays.asList(columnFamilyDescriptors);
+        final List<ColumnFamilyDescriptor> allDescriptors = new ArrayList<>(1 
+ columnFamilyDescriptors.length);
+        allDescriptors.add(defaultColumnFamilyDescriptor);
+        allDescriptors.addAll(extraDescriptors);
 
         try {
-            db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), 
columnFamilyDescriptors, columnFamilies);
-            dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
+            final Options options = new Options(dbOptions, 
defaultColumnFamilyDescriptor.getOptions());
+            final List<byte[]> allExisting = 
RocksDB.listColumnFamilies(options, absolutePath);
+
+            final List<ColumnFamilyDescriptor> existingDescriptors = 
allDescriptors.stream()
+                    .filter(descriptor -> descriptor == 
defaultColumnFamilyDescriptor || allExisting.stream().anyMatch(existing -> 
Arrays.equals(existing, descriptor.getName())))

Review Comment:
   Wouldn't `allExisting.stream().anyMatch(existing -> Arrays.equals(existing, 
descriptor.getName()))` be enough?
   Should the addition of `descriptor == defaultColumnFamilyDescriptor` be a 
performance improvement?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -278,13 +280,55 @@ private void addValueProvidersToMetricsRecorder() {
 
     void openRocksDB(final DBOptions dbOptions,
                      final ColumnFamilyOptions columnFamilyOptions) {
-        final List<ColumnFamilyDescriptor> columnFamilyDescriptors
-                = Collections.singletonList(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
-        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+        final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+                dbOptions,
+                new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions)
+        );
+
+        dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
+    }
+
+    /**
+     * Open RocksDB while automatically creating any requested column families 
that don't yet exist.
+     */
+    List<ColumnFamilyHandle> openRocksDB(final DBOptions dbOptions,
+                                         final ColumnFamilyDescriptor 
defaultColumnFamilyDescriptor,
+                                         final ColumnFamilyDescriptor... 
columnFamilyDescriptors) {
+        final String absolutePath = dbDir.getAbsolutePath();
+        final List<ColumnFamilyDescriptor> extraDescriptors = 
Arrays.asList(columnFamilyDescriptors);
+        final List<ColumnFamilyDescriptor> allDescriptors = new ArrayList<>(1 
+ columnFamilyDescriptors.length);
+        allDescriptors.add(defaultColumnFamilyDescriptor);
+        allDescriptors.addAll(extraDescriptors);
 
         try {
-            db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), 
columnFamilyDescriptors, columnFamilies);
-            dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
+            final Options options = new Options(dbOptions, 
defaultColumnFamilyDescriptor.getOptions());
+            final List<byte[]> allExisting = 
RocksDB.listColumnFamilies(options, absolutePath);
+
+            final List<ColumnFamilyDescriptor> existingDescriptors = 
allDescriptors.stream()
+                    .filter(descriptor -> descriptor == 
defaultColumnFamilyDescriptor || allExisting.stream().anyMatch(existing -> 
Arrays.equals(existing, descriptor.getName())))
+                    .collect(Collectors.toList());
+            final List<ColumnFamilyDescriptor> toCreate = 
extraDescriptors.stream()
+                    .filter(descriptor -> 
allExisting.stream().noneMatch(existing -> Arrays.equals(existing, 
descriptor.getName())))
+                    .collect(Collectors.toList());
+            final List<ColumnFamilyHandle> existingColumnFamilies = new 
ArrayList<>(existingDescriptors.size());
+            db = RocksDB.open(dbOptions, absolutePath, existingDescriptors, 
existingColumnFamilies);
+            final List<ColumnFamilyHandle> createdColumnFamilies = 
db.createColumnFamilies(toCreate);
+
+            // match up the existing and created ColumnFamilyHandles with the 
existing/created ColumnFamilyDescriptors
+            // so that the order of the resultant List matches the order of 
the openRocksDB arguments
+            final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(allDescriptors.size());
+            int existing = 0;
+            int created = 0;
+            while (existing + created < allDescriptors.size()) {
+                if (existing < existingDescriptors.size() && 
(existingDescriptors.get(existing) == allDescriptors.get(existing + created))) {
+                    columnFamilies.add(existingColumnFamilies.get(existing));
+                    existing++;
+                } else if (created < toCreate.size() && (toCreate.get(created) 
== allDescriptors.get(existing + created))) {
+                    columnFamilies.add(createdColumnFamilies.get(created));
+                    created++;
+                }
+            }

Review Comment:
   Could you please extract this code to method with a meaningful name such 
that you do not need the inline comment? 
   If that does not work well, then comment on the extracted method.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -278,13 +280,55 @@ private void addValueProvidersToMetricsRecorder() {
 
     void openRocksDB(final DBOptions dbOptions,
                      final ColumnFamilyOptions columnFamilyOptions) {
-        final List<ColumnFamilyDescriptor> columnFamilyDescriptors
-                = Collections.singletonList(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
-        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+        final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+                dbOptions,
+                new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions)
+        );
+
+        dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
+    }
+
+    /**
+     * Open RocksDB while automatically creating any requested column families 
that don't yet exist.
+     */
+    List<ColumnFamilyHandle> openRocksDB(final DBOptions dbOptions,

Review Comment:
   Shouldn't that be `protected` since `RocksDBTimestampedStore` extends 
`RocksDBStore`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to