cadonna commented on code in PR #14852: URL: https://github.com/apache/kafka/pull/14852#discussion_r1410391609
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ########## @@ -278,13 +280,55 @@ private void addValueProvidersToMetricsRecorder() { void openRocksDB(final DBOptions dbOptions, final ColumnFamilyOptions columnFamilyOptions) { - final List<ColumnFamilyDescriptor> columnFamilyDescriptors - = Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions)); - final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>(columnFamilyDescriptors.size()); + final List<ColumnFamilyHandle> columnFamilies = openRocksDB( + dbOptions, + new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions) + ); + + dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0)); + } + + /** + * Open RocksDB while automatically creating any requested column families that don't yet exist. + */ + List<ColumnFamilyHandle> openRocksDB(final DBOptions dbOptions, + final ColumnFamilyDescriptor defaultColumnFamilyDescriptor, + final ColumnFamilyDescriptor... columnFamilyDescriptors) { + final String absolutePath = dbDir.getAbsolutePath(); + final List<ColumnFamilyDescriptor> extraDescriptors = Arrays.asList(columnFamilyDescriptors); + final List<ColumnFamilyDescriptor> allDescriptors = new ArrayList<>(1 + columnFamilyDescriptors.length); + allDescriptors.add(defaultColumnFamilyDescriptor); + allDescriptors.addAll(extraDescriptors); try { - db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilies); - dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0)); + final Options options = new Options(dbOptions, defaultColumnFamilyDescriptor.getOptions()); + final List<byte[]> allExisting = RocksDB.listColumnFamilies(options, absolutePath); + + final List<ColumnFamilyDescriptor> existingDescriptors = allDescriptors.stream() + .filter(descriptor -> descriptor == defaultColumnFamilyDescriptor || allExisting.stream().anyMatch(existing -> Arrays.equals(existing, descriptor.getName()))) + .collect(Collectors.toList()); + final List<ColumnFamilyDescriptor> toCreate = extraDescriptors.stream() + .filter(descriptor -> allExisting.stream().noneMatch(existing -> Arrays.equals(existing, descriptor.getName()))) + .collect(Collectors.toList()); + final List<ColumnFamilyHandle> existingColumnFamilies = new ArrayList<>(existingDescriptors.size()); + db = RocksDB.open(dbOptions, absolutePath, existingDescriptors, existingColumnFamilies); + final List<ColumnFamilyHandle> createdColumnFamilies = db.createColumnFamilies(toCreate); + + // match up the existing and created ColumnFamilyHandles with the existing/created ColumnFamilyDescriptors + // so that the order of the resultant List matches the order of the openRocksDB arguments + final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>(allDescriptors.size()); + int existing = 0; + int created = 0; + while (existing + created < allDescriptors.size()) { + if (existing < existingDescriptors.size() && (existingDescriptors.get(existing) == allDescriptors.get(existing + created))) { + columnFamilies.add(existingColumnFamilies.get(existing)); + existing++; + } else if (created < toCreate.size() && (toCreate.get(created) == allDescriptors.get(existing + created))) { + columnFamilies.add(createdColumnFamilies.get(created)); + created++; + } Review Comment: Maybe add an `else`-branch that throws an `IllegalStateException` to catch bugs. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ########## @@ -278,13 +280,55 @@ private void addValueProvidersToMetricsRecorder() { void openRocksDB(final DBOptions dbOptions, final ColumnFamilyOptions columnFamilyOptions) { - final List<ColumnFamilyDescriptor> columnFamilyDescriptors - = Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions)); - final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>(columnFamilyDescriptors.size()); + final List<ColumnFamilyHandle> columnFamilies = openRocksDB( + dbOptions, + new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions) + ); + + dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0)); + } + + /** + * Open RocksDB while automatically creating any requested column families that don't yet exist. + */ + List<ColumnFamilyHandle> openRocksDB(final DBOptions dbOptions, + final ColumnFamilyDescriptor defaultColumnFamilyDescriptor, + final ColumnFamilyDescriptor... columnFamilyDescriptors) { + final String absolutePath = dbDir.getAbsolutePath(); + final List<ColumnFamilyDescriptor> extraDescriptors = Arrays.asList(columnFamilyDescriptors); + final List<ColumnFamilyDescriptor> allDescriptors = new ArrayList<>(1 + columnFamilyDescriptors.length); + allDescriptors.add(defaultColumnFamilyDescriptor); + allDescriptors.addAll(extraDescriptors); try { - db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilies); - dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0)); + final Options options = new Options(dbOptions, defaultColumnFamilyDescriptor.getOptions()); + final List<byte[]> allExisting = RocksDB.listColumnFamilies(options, absolutePath); + + final List<ColumnFamilyDescriptor> existingDescriptors = allDescriptors.stream() + .filter(descriptor -> descriptor == defaultColumnFamilyDescriptor || allExisting.stream().anyMatch(existing -> Arrays.equals(existing, descriptor.getName()))) Review Comment: Wouldn't `allExisting.stream().anyMatch(existing -> Arrays.equals(existing, descriptor.getName()))` be enough? Should the addition of `descriptor == defaultColumnFamilyDescriptor` be a performance improvement? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ########## @@ -278,13 +280,55 @@ private void addValueProvidersToMetricsRecorder() { void openRocksDB(final DBOptions dbOptions, final ColumnFamilyOptions columnFamilyOptions) { - final List<ColumnFamilyDescriptor> columnFamilyDescriptors - = Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions)); - final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>(columnFamilyDescriptors.size()); + final List<ColumnFamilyHandle> columnFamilies = openRocksDB( + dbOptions, + new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions) + ); + + dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0)); + } + + /** + * Open RocksDB while automatically creating any requested column families that don't yet exist. + */ + List<ColumnFamilyHandle> openRocksDB(final DBOptions dbOptions, + final ColumnFamilyDescriptor defaultColumnFamilyDescriptor, + final ColumnFamilyDescriptor... columnFamilyDescriptors) { + final String absolutePath = dbDir.getAbsolutePath(); + final List<ColumnFamilyDescriptor> extraDescriptors = Arrays.asList(columnFamilyDescriptors); + final List<ColumnFamilyDescriptor> allDescriptors = new ArrayList<>(1 + columnFamilyDescriptors.length); + allDescriptors.add(defaultColumnFamilyDescriptor); + allDescriptors.addAll(extraDescriptors); try { - db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilies); - dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0)); + final Options options = new Options(dbOptions, defaultColumnFamilyDescriptor.getOptions()); + final List<byte[]> allExisting = RocksDB.listColumnFamilies(options, absolutePath); + + final List<ColumnFamilyDescriptor> existingDescriptors = allDescriptors.stream() + .filter(descriptor -> descriptor == defaultColumnFamilyDescriptor || allExisting.stream().anyMatch(existing -> Arrays.equals(existing, descriptor.getName()))) + .collect(Collectors.toList()); + final List<ColumnFamilyDescriptor> toCreate = extraDescriptors.stream() + .filter(descriptor -> allExisting.stream().noneMatch(existing -> Arrays.equals(existing, descriptor.getName()))) + .collect(Collectors.toList()); + final List<ColumnFamilyHandle> existingColumnFamilies = new ArrayList<>(existingDescriptors.size()); + db = RocksDB.open(dbOptions, absolutePath, existingDescriptors, existingColumnFamilies); + final List<ColumnFamilyHandle> createdColumnFamilies = db.createColumnFamilies(toCreate); + + // match up the existing and created ColumnFamilyHandles with the existing/created ColumnFamilyDescriptors + // so that the order of the resultant List matches the order of the openRocksDB arguments + final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>(allDescriptors.size()); + int existing = 0; + int created = 0; + while (existing + created < allDescriptors.size()) { + if (existing < existingDescriptors.size() && (existingDescriptors.get(existing) == allDescriptors.get(existing + created))) { + columnFamilies.add(existingColumnFamilies.get(existing)); + existing++; + } else if (created < toCreate.size() && (toCreate.get(created) == allDescriptors.get(existing + created))) { + columnFamilies.add(createdColumnFamilies.get(created)); + created++; + } + } Review Comment: Could you please extract this code to method with a meaningful name such that you do not need the inline comment? If that does not work well, then comment on the extracted method. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ########## @@ -278,13 +280,55 @@ private void addValueProvidersToMetricsRecorder() { void openRocksDB(final DBOptions dbOptions, final ColumnFamilyOptions columnFamilyOptions) { - final List<ColumnFamilyDescriptor> columnFamilyDescriptors - = Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions)); - final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>(columnFamilyDescriptors.size()); + final List<ColumnFamilyHandle> columnFamilies = openRocksDB( + dbOptions, + new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions) + ); + + dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0)); + } + + /** + * Open RocksDB while automatically creating any requested column families that don't yet exist. + */ + List<ColumnFamilyHandle> openRocksDB(final DBOptions dbOptions, Review Comment: Shouldn't that be `protected` since `RocksDBTimestampedStore` extends `RocksDBStore`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org