reswqa commented on code in PR #28402: URL: https://github.com/apache/flink/pull/28402#discussion_r3450986280
########## flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.StringUtils; + +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; +import org.rocksdb.CompressionType; +import org.rocksdb.NativeLibraryLoader; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Slice; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link ArchiveStorage} implementation backed by RocksDB. + * + * <p>All archived data is stored as key-value pairs in a single RocksDB instance, avoiding the + * problem of numerous small files. The key is the request path (e.g. {@code + * /jobs/xxx/config.json}), and the value is a JSON string. + */ +public class RocksDBArchiveStorage implements ArchiveStorage<String> { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBArchiveStorage.class); + + private final RocksDB db; + + private Options dbOptions; + + private WriteOptions writeOptions; + + private final ArrayList<AutoCloseable> handlesToClose; Review Comment: ```suggestion private final List<AutoCloseable> handlesToClose; ``` ########## flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.StringUtils; + +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; +import org.rocksdb.CompressionType; +import org.rocksdb.NativeLibraryLoader; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Slice; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link ArchiveStorage} implementation backed by RocksDB. + * + * <p>All archived data is stored as key-value pairs in a single RocksDB instance, avoiding the + * problem of numerous small files. The key is the request path (e.g. {@code + * /jobs/xxx/config.json}), and the value is a JSON string. + */ +public class RocksDBArchiveStorage implements ArchiveStorage<String> { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBArchiveStorage.class); + + private final RocksDB db; + + private Options dbOptions; + + private WriteOptions writeOptions; + + private final ArrayList<AutoCloseable> handlesToClose; + + /** + * The temporary directory that holds the extracted RocksDB native library; cleaned up on {@link + * #close()}. + */ + @Nullable private File nativeLibDir; + + /** + * Creates a new {@link RocksDBArchiveStorage} instance with default RocksDB options. + * + * @param dbPath the RocksDB database directory path + * @throws IOException if the RocksDB database cannot be opened + */ + public RocksDBArchiveStorage(File dbPath) throws IOException { + this(dbPath, new Configuration()); + } + + /** + * Creates a new {@link RocksDBArchiveStorage} instance. + * + * @param dbPath the RocksDB database directory path + * @param config the configuration used to read RocksDB related options (see {@link + * HistoryServerOptions}) + * @throws IOException if the RocksDB native library cannot be loaded or the database cannot be + * opened + */ + public RocksDBArchiveStorage(File dbPath, ReadableConfig config) throws IOException { + checkNotNull(dbPath, "dbPath cannot be null"); + checkNotNull(config, "config cannot be null"); + this.handlesToClose = new ArrayList<>(); + String rocksDBNativeLibDir = + config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_ROCKSDB_NATIVE_LIB_DIR); + checkNotNull(rocksDBNativeLibDir, "rocksDBNativeLibDir cannot be null"); + this.nativeLibDir = new File(rocksDBNativeLibDir); + + try { + loadRocksDBLibrary(this.nativeLibDir); + loadRocksDBConfiguration(); + this.db = RocksDB.open(dbOptions, dbPath.getAbsolutePath()); + handlesToClose.add(db); + } catch (Throwable t) { + close(); + throw new IOException("Failed to initialize RocksDBArchiveStorage", t); + } + } + + @Override + public boolean exists(String key) throws IOException { + return db.keyExists(key.getBytes(UTF_8)); + } + + @Nullable + @Override + public String getEntry(String key) throws IOException { + try { + byte[] value = db.get(key.getBytes(UTF_8)); + if (value == null) { + return null; + } + return new String(value, UTF_8); + } catch (RocksDBException e) { + throw new IOException("Failed to get key: " + key, e); + } + } + + @Override + public void putArchiveContent(String key, String value) throws IOException { + try { + db.put(writeOptions, key.getBytes(UTF_8), value.getBytes(UTF_8)); + } catch (RocksDBException e) { + throw new IOException("Failed to put key: " + key, e); + } + } + + @Override + public void delete(String key) throws IOException { + try { + db.delete(writeOptions, key.getBytes(UTF_8)); + } catch (RocksDBException e) { + throw new IOException("Failed to delete key: " + key, e); + } + } + + @Override + public void deleteEntriesByPrefix(String keyPrefix) throws IOException { + if (StringUtils.isNullOrWhitespaceOnly(keyPrefix)) { + return; + } + + // Delete all keys that start with the given prefix + byte[] startKey = keyPrefix.getBytes(UTF_8); + byte[] endKey = computeNextKey(startKey); + if (endKey == null) { + throw new IOException("Failed to compute next key for prefix: " + keyPrefix); + } + + try { + db.deleteRange(writeOptions, startKey, endKey); + } catch (RocksDBException e) { + throw new IOException("Failed to delete prefix: " + keyPrefix, e); + } + } + + @Override + public List<String> getEntriesByPrefix(String prefix) throws IOException { + List<String> result = new ArrayList<>(); + if (StringUtils.isNullOrWhitespaceOnly(prefix)) { + return result; + } + + byte[] prefixBytes = prefix.getBytes(UTF_8); + byte[] upperBound = computeNextKey(prefixBytes); + if (upperBound == null) { + throw new IOException("Failed to compute next key for prefix: " + prefix); + } + + try (Slice upperBoundSlice = new Slice(upperBound); + ReadOptions readOptions = new ReadOptions().setIterateUpperBound(upperBoundSlice); + RocksIterator iterator = db.newIterator(readOptions)) { + for (iterator.seek(prefixBytes); iterator.isValid(); iterator.next()) { + result.add(new String(iterator.value(), UTF_8)); + } + try { + iterator.status(); + } catch (RocksDBException e) { + throw new IOException( + String.format("Error iterating over RocksDB, prefix: %s", prefix), e); + } + } + + return result; + } + + /** + * Compute the next key in lexicographic order. + * + * @param key the current key + * @return the next key, or null if no such key exists + */ + @Nullable + static byte[] computeNextKey(byte[] key) { Review Comment: This method shouldn't be package-private. ########## flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java: ########## @@ -464,21 +440,27 @@ void updateOverview() { * <p>For the display in the HistoryServer WebFrontend we have to combine these overviews. */ void updateJobOverview() { - try (JsonGenerator gen = - jacksonFactory.createGenerator( - HistoryServer.createOrGetFile(webDir, JobsOverviewHeaders.URL))) { - File[] overviews = new File(webOverviewDir.getPath()).listFiles(); - if (overviews != null) { - Collection<JobDetails> allJobs = new ArrayList<>(overviews.length); - for (File overview : overviews) { - MultipleJobsDetails subJobs = - mapper.readValue(overview, MultipleJobsDetails.class); - allJobs.addAll(subJobs.getJobs()); + try { + Collection<JobDetails> allJobs = new ArrayList<>(); + List<Entry> overviews = archiveStorage.getEntriesByPrefix(JOB_OVERVIEWS_KEY_PREFIX); + for (Entry overview : overviews) { + MultipleJobsDetails subJobs; + // Parse file directly to avoid loading the archive into string. Review Comment: ```suggestion // We treated File as a special case, mainly as a performance trade-off to avoid the overhead of loading the archive into a string. ``` ditto for `ApplicationArchiveFetcher`. ########## flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.StringUtils; + +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; +import org.rocksdb.CompressionType; +import org.rocksdb.NativeLibraryLoader; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Slice; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link ArchiveStorage} implementation backed by RocksDB. + * + * <p>All archived data is stored as key-value pairs in a single RocksDB instance, avoiding the + * problem of numerous small files. The key is the request path (e.g. {@code + * /jobs/xxx/config.json}), and the value is a JSON string. + */ +public class RocksDBArchiveStorage implements ArchiveStorage<String> { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBArchiveStorage.class); + + private final RocksDB db; + + private Options dbOptions; + + private WriteOptions writeOptions; + + private final ArrayList<AutoCloseable> handlesToClose; + + /** + * The temporary directory that holds the extracted RocksDB native library; cleaned up on {@link + * #close()}. + */ + @Nullable private File nativeLibDir; + + /** + * Creates a new {@link RocksDBArchiveStorage} instance with default RocksDB options. + * + * @param dbPath the RocksDB database directory path + * @throws IOException if the RocksDB database cannot be opened + */ + public RocksDBArchiveStorage(File dbPath) throws IOException { + this(dbPath, new Configuration()); + } + + /** + * Creates a new {@link RocksDBArchiveStorage} instance. + * + * @param dbPath the RocksDB database directory path + * @param config the configuration used to read RocksDB related options (see {@link + * HistoryServerOptions}) + * @throws IOException if the RocksDB native library cannot be loaded or the database cannot be + * opened + */ + public RocksDBArchiveStorage(File dbPath, ReadableConfig config) throws IOException { + checkNotNull(dbPath, "dbPath cannot be null"); + checkNotNull(config, "config cannot be null"); + this.handlesToClose = new ArrayList<>(); + String rocksDBNativeLibDir = + config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_ROCKSDB_NATIVE_LIB_DIR); + checkNotNull(rocksDBNativeLibDir, "rocksDBNativeLibDir cannot be null"); + this.nativeLibDir = new File(rocksDBNativeLibDir); + + try { + loadRocksDBLibrary(this.nativeLibDir); + loadRocksDBConfiguration(); + this.db = RocksDB.open(dbOptions, dbPath.getAbsolutePath()); + handlesToClose.add(db); + } catch (Throwable t) { + close(); + throw new IOException("Failed to initialize RocksDBArchiveStorage", t); + } + } + + @Override + public boolean exists(String key) throws IOException { + return db.keyExists(key.getBytes(UTF_8)); + } + + @Nullable + @Override + public String getEntry(String key) throws IOException { + try { + byte[] value = db.get(key.getBytes(UTF_8)); + if (value == null) { + return null; + } + return new String(value, UTF_8); + } catch (RocksDBException e) { + throw new IOException("Failed to get key: " + key, e); + } + } + + @Override + public void putArchiveContent(String key, String value) throws IOException { + try { + db.put(writeOptions, key.getBytes(UTF_8), value.getBytes(UTF_8)); + } catch (RocksDBException e) { + throw new IOException("Failed to put key: " + key, e); + } + } + + @Override + public void delete(String key) throws IOException { + try { + db.delete(writeOptions, key.getBytes(UTF_8)); + } catch (RocksDBException e) { + throw new IOException("Failed to delete key: " + key, e); + } + } + + @Override + public void deleteEntriesByPrefix(String keyPrefix) throws IOException { + if (StringUtils.isNullOrWhitespaceOnly(keyPrefix)) { + return; + } + + // Delete all keys that start with the given prefix + byte[] startKey = keyPrefix.getBytes(UTF_8); + byte[] endKey = computeNextKey(startKey); + if (endKey == null) { + throw new IOException("Failed to compute next key for prefix: " + keyPrefix); + } + + try { + db.deleteRange(writeOptions, startKey, endKey); + } catch (RocksDBException e) { + throw new IOException("Failed to delete prefix: " + keyPrefix, e); + } + } + + @Override + public List<String> getEntriesByPrefix(String prefix) throws IOException { + List<String> result = new ArrayList<>(); + if (StringUtils.isNullOrWhitespaceOnly(prefix)) { + return result; + } + + byte[] prefixBytes = prefix.getBytes(UTF_8); + byte[] upperBound = computeNextKey(prefixBytes); + if (upperBound == null) { + throw new IOException("Failed to compute next key for prefix: " + prefix); + } + + try (Slice upperBoundSlice = new Slice(upperBound); + ReadOptions readOptions = new ReadOptions().setIterateUpperBound(upperBoundSlice); + RocksIterator iterator = db.newIterator(readOptions)) { + for (iterator.seek(prefixBytes); iterator.isValid(); iterator.next()) { + result.add(new String(iterator.value(), UTF_8)); + } + try { + iterator.status(); + } catch (RocksDBException e) { + throw new IOException( + String.format("Error iterating over RocksDB, prefix: %s", prefix), e); + } + } + + return result; + } + + /** + * Compute the next key in lexicographic order. + * + * @param key the current key + * @return the next key, or null if no such key exists + */ + @Nullable + static byte[] computeNextKey(byte[] key) { + // Find the last byte that is not 0xFF; everything after it can be dropped. + int lastIncrementableIndex = -1; + for (int i = key.length - 1; i >= 0; i--) { + if ((key[i] & 0xFF) != 0xFF) { + lastIncrementableIndex = i; + break; + } + } + if (lastIncrementableIndex < 0) { + // All bytes are 0xFF. + return null; + } + byte[] nextKey = new byte[lastIncrementableIndex + 1]; + System.arraycopy(key, 0, nextKey, 0, lastIncrementableIndex + 1); + nextKey[lastIncrementableIndex]++; + return nextKey; + } + + @Override + public String readArchiveContent(String entry) throws IOException { + return entry; + } + + @Override + public void close() { + if (db != null) { + db.close(); + } + + handlesToClose.forEach(IOUtils::closeQuietly); Review Comment: `handlesToClose` also include `db`. Why should we double-close it? I suggest we just close all stuff in `handlesToClose` in reverse order. In that case, `db` will be the first to be shut down. ########## flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/AbstractHistoryServerHandlerTest.java: ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.handler.router.Router; +import org.apache.flink.runtime.webmonitor.testutils.HttpUtils; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; + +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Common HTTP-level tests for {@link AbstractHistoryServerHandler} subclasses. New subclasses can + * be exercised by adding an entry to {@link #handlerFactories()}. + */ +class AbstractHistoryServerHandlerTest { Review Comment: 1. We can use `@ExtendWith(ParameterizedTestExtension.class)` to avoid the complex syntax of parameterized testing in JUnit 5. You can refer to `ParquetFileSystemITCase`. 2. Use `@beforeEach` and `@afterEach` to eliminate the use of the `runWithTestBody` closure. ########## flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.StringUtils; + +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; +import org.rocksdb.CompressionType; +import org.rocksdb.NativeLibraryLoader; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Slice; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link ArchiveStorage} implementation backed by RocksDB. + * + * <p>All archived data is stored as key-value pairs in a single RocksDB instance, avoiding the + * problem of numerous small files. The key is the request path (e.g. {@code + * /jobs/xxx/config.json}), and the value is a JSON string. + */ +public class RocksDBArchiveStorage implements ArchiveStorage<String> { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBArchiveStorage.class); + + private final RocksDB db; + + private Options dbOptions; + + private WriteOptions writeOptions; + + private final ArrayList<AutoCloseable> handlesToClose; + + /** + * The temporary directory that holds the extracted RocksDB native library; cleaned up on {@link + * #close()}. + */ + @Nullable private File nativeLibDir; Review Comment: This could be a final field and why this variable is `Nullable`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
