vcrfxia commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1101915894
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java: ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This + * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may + * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during + * restoration. + * <p> + * The structure of the internals of this write buffer mirrors the structure of the + * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the + * segment stores is buffered in a separate object -- specifically, a map. + */ +public class RocksDBVersionedStoreRestoreWriteBuffer { + + private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class); + + // write buffer for latest value store. value type is Optional in order to track tombstones + // which must be written to the underlying store. + private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer; + // map from segment id to write buffer. segments are stored in reverse-sorted order, + // so getReverseSegments() is more efficient + private final TreeMap<Long, WriteBufferSegmentWithDbFallback> segmentsWriteBuffer; + private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient; + private final RocksDBVersionedStoreRestoreClient restoreClient; + + /** + * Creates a new write buffer. + * @param dbClient client for reading from and writing to the underlying persistent store + */ + RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient) { + this.dbClient = Objects.requireNonNull(dbClient); + + this.latestValueWriteBuffer = new HashMap<>(); + // store in reverse-sorted order, to make getReverseSegments() more efficient + this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x)); + this.restoreClient = new RocksDBVersionedStoreRestoreClient(); + } + + /** + * @return client for writing to (and reading from) the write buffer + */ + VersionedStoreClient<?> getClient() { + return restoreClient; + } + + /** + * Flushes the contents of the write buffer into the persistent store, and clears the write + * buffer in the process. + * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch} + */ + void flush() throws RocksDBException { + + // flush segments first, as this is consistent with the store always writing to + // older segments/stores before later ones + try (final WriteBatch segmentsBatch = new WriteBatch()) { + final List<WriteBufferSegmentWithDbFallback> allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE); + if (allSegments.size() > 0) { + // collect entries into write batch + for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) { + final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment(); + for (final Map.Entry<Bytes, byte[]> segmentEntry : bufferSegment.getAll()) { + dbSegment.addToBatch( + new KeyValue<>(segmentEntry.getKey().get(), segmentEntry.getValue()), + segmentsBatch); + } + } + + // write to db. all the logical segments share the same physical store, + // so we can use any segment to perform the write + allSegments.get(0).dbSegment().write(segmentsBatch); + } + } catch (final RocksDBException e) { + log.error("Error restoring batch to RocksDBVersionedStore segments store."); + throw e; + } + segmentsWriteBuffer.clear(); + + // flush latest value store + try (final WriteBatch latestValueBatch = new WriteBatch()) { + // collect entries into write batch + for (final Map.Entry<Bytes, Optional<byte[]>> latestValueEntry : latestValueWriteBuffer.entrySet()) { + final byte[] value = latestValueEntry.getValue().orElse(null); + dbClient.addToLatestValueBatch( + new KeyValue<>(latestValueEntry.getKey().get(), value), + latestValueBatch); + } + + // write to db + dbClient.writeLatestValues(latestValueBatch); + } catch (final RocksDBException e) { + log.error("Error restoring batch to RocksDBVersionedStore latest value store."); + throw e; + } + latestValueWriteBuffer.clear(); + } + + /** + * The object representation of the write buffer corresponding to a single segment store. + * Contains the write buffer itself (a simple hash map) and also a reference to the underlying + * persistent segment store. + */ + private class WriteBufferSegmentWithDbFallback implements VersionedStoreSegment { + + private final long id; + private final Map<Bytes, byte[]> data; + private final LogicalKeyValueSegment dbSegment; + + WriteBufferSegmentWithDbFallback(final LogicalKeyValueSegment dbSegment) { + this.dbSegment = Objects.requireNonNull(dbSegment); + this.id = dbSegment.id(); + this.data = new HashMap<>(); + + // register segment with segments store + segmentsWriteBuffer.put(id, this); + } + + LogicalKeyValueSegment dbSegment() { + return dbSegment; + } + + @Override + public long id() { + return id; + } + + @Override + public void put(final Bytes key, final byte[] value) { + // all writes go to the write buffer + data.put(key, value); + } + + @Override + public byte[] get(final Bytes key) { + final byte[] bufferValue = data.get(key); + if (bufferValue != null) { + return bufferValue; + } + return dbSegment.get(key); + } + + Set<Entry<Bytes, byte[]>> getAll() { Review Comment: Minor optimization to not need to copy the map. (If we were to return a map, I don't think the return value should be modifiable.) If you prefer copying the map for readability we can do that too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org