nicktelford commented on code in PR #14852: URL: https://github.com/apache/kafka/pull/14852#discussion_r1410543947
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ########## @@ -278,13 +280,55 @@ private void addValueProvidersToMetricsRecorder() { void openRocksDB(final DBOptions dbOptions, final ColumnFamilyOptions columnFamilyOptions) { - final List<ColumnFamilyDescriptor> columnFamilyDescriptors - = Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions)); - final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>(columnFamilyDescriptors.size()); + final List<ColumnFamilyHandle> columnFamilies = openRocksDB( + dbOptions, + new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions) + ); + + dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0)); + } + + /** + * Open RocksDB while automatically creating any requested column families that don't yet exist. + */ + List<ColumnFamilyHandle> openRocksDB(final DBOptions dbOptions, + final ColumnFamilyDescriptor defaultColumnFamilyDescriptor, + final ColumnFamilyDescriptor... columnFamilyDescriptors) { + final String absolutePath = dbDir.getAbsolutePath(); + final List<ColumnFamilyDescriptor> extraDescriptors = Arrays.asList(columnFamilyDescriptors); + final List<ColumnFamilyDescriptor> allDescriptors = new ArrayList<>(1 + columnFamilyDescriptors.length); + allDescriptors.add(defaultColumnFamilyDescriptor); + allDescriptors.addAll(extraDescriptors); try { - db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilies); - dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0)); + final Options options = new Options(dbOptions, defaultColumnFamilyDescriptor.getOptions()); + final List<byte[]> allExisting = RocksDB.listColumnFamilies(options, absolutePath); + + final List<ColumnFamilyDescriptor> existingDescriptors = allDescriptors.stream() + .filter(descriptor -> descriptor == defaultColumnFamilyDescriptor || allExisting.stream().anyMatch(existing -> Arrays.equals(existing, descriptor.getName()))) + .collect(Collectors.toList()); + final List<ColumnFamilyDescriptor> toCreate = extraDescriptors.stream() + .filter(descriptor -> allExisting.stream().noneMatch(existing -> Arrays.equals(existing, descriptor.getName()))) + .collect(Collectors.toList()); + final List<ColumnFamilyHandle> existingColumnFamilies = new ArrayList<>(existingDescriptors.size()); + db = RocksDB.open(dbOptions, absolutePath, existingDescriptors, existingColumnFamilies); + final List<ColumnFamilyHandle> createdColumnFamilies = db.createColumnFamilies(toCreate); + + // match up the existing and created ColumnFamilyHandles with the existing/created ColumnFamilyDescriptors + // so that the order of the resultant List matches the order of the openRocksDB arguments + final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>(allDescriptors.size()); + int existing = 0; + int created = 0; + while (existing + created < allDescriptors.size()) { + if (existing < existingDescriptors.size() && (existingDescriptors.get(existing) == allDescriptors.get(existing + created))) { + columnFamilies.add(existingColumnFamilies.get(existing)); + existing++; + } else if (created < toCreate.size() && (toCreate.get(created) == allDescriptors.get(existing + created))) { + columnFamilies.add(createdColumnFamilies.get(created)); + created++; + } + } Review Comment: Done. I've also refactored the method a bit to avoid depending on the `existingDescriptor` and `toCreate` lists, since that would have required the method to have 5 arguments; it now only has 3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org