[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1229930816 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +final V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() < minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} Review Comment: I need to swap those lines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.or
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1229930160 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +final V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() < minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} + +callback.accept(new Eviction<>(key, value, bufferValue.context())); + +wrapped().remove(keyValue.key); +numRecords--; +bufferSize = bufferSize - computeRecordSize(keyValue.key, bufferValue); +} +if (numRecords == 0) { +minTimes
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1229836485 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +final V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() < minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} + +callback.accept(new Eviction<>(key, value, bufferValue.context())); + +wrapped().remove(keyValue.key); +numRecords--; +bufferSize = bufferSize - computeRecordSize(keyValue.key, bufferValue); +} +if (numRecords == 0) { +minTimes
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1229836485 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +final V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() < minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} + +callback.accept(new Eviction<>(key, value, bufferValue.context())); + +wrapped().remove(keyValue.key); +numRecords--; +bufferSize = bufferSize - computeRecordSize(keyValue.key, bufferValue); +} +if (numRecords == 0) { +minTimes
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1229824269 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { Review Comment: I think so but I can't prove it. If not we might have to deal with slight missordering until we can find a solution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1229732758 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedKeyValueBytesStoreSupplier.java: ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +public class RocksDbTimeOrderedKeyValueBytesStoreSupplier { Review Comment: oops I changed RocksDBKeyValueBytesStoreSupplier Instead, got this one as well ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); Review Comment: Well my thought was that we already have the store to be initialized from the constructor so we shouldn't risk a different store being passed in. We could use the root I suppose but I see no reason too. ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1228430026 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +if (wrapped().observedStreamTime - gracePeriod > minTimestamp) { +return; +} + +V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() != minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} + +callback.accept(new Eviction(key, value, bufferValue.context())); + +wrapped().remove(keyValue.key); +numRecords--; +bufferSize = bu
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1228419948 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { Review Comment: I don't mind obscuring the implementation and setting those values as default instead of exposing them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1228360963 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +if (wrapped().observedStreamTime - gracePeriod > minTimestamp) { +return; +} + +final V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() != minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} + +callback.accept(new Eviction(key, value, bufferValue.context())); + +wrapped().remove(keyValue.key); +numRecords--; +bufferSiz
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1226969927 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +if (wrapped().observedStreamTime - gracePeriod > minTimestamp) { +return; +} + +V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() != minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} + +callback.accept(new Eviction(key, value, bufferValue.context())); + +wrapped().remove(keyValue.key); +numRecords--; +bufferSize = bu
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1226940321 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +if (wrapped().observedStreamTime - gracePeriod > minTimestamp) { +return; +} + +V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() != minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} + +callback.accept(new Eviction(key, value, bufferValue.context())); + +wrapped().remove(keyValue.key); +numRecords--; +bufferSize = bu
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1226962980 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { Review Comment: a significant amount. the reason we were able to do this as we could was because the time ordered store implementation was already done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1226940321 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +if (wrapped().observedStreamTime - gracePeriod > minTimestamp) { +return; +} + +V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() != minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} + +callback.accept(new Eviction(key, value, bufferValue.context())); + +wrapped().remove(keyValue.key); +numRecords--; +bufferSize = bu
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1223326402 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { Review Comment: The segments are not particularly critical. What I needed was the time ordered part and that had been implemented with the segment store ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private FullChangeSerde valueSerde; +private final String topic; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.M
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1223321590 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRec; +private Serde keySerde; +private FullChangeSerde valueSerde; Review Comment: @vcrfxia @cadonna I made the changes on https://github.com/apache/kafka/pull/13830. I will rebase this one once that is merged -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1222332377 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java: ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.test.MockInternalNewProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class RocksDBTimeOrderedKeyValueBufferTest { +public RocksDBTimeOrderedKeyValueBuffer buffer; +@Mock +public SerdeGetter serdeGetter; +public InternalProcessorContext context; +public StreamsMetricsImpl streamsMetrics; +@Mock +public Sensor sensor; +public long offset; + +@Before +public void setUp() { +when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde()); +when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde()); +final Metrics metrics = new Metrics(); +offset = 0; +streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); +context = new MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory()); +} + +public void createJoin(final Duration grace) { +final RocksDBTimeOrderedKeyValueSegmentedBytesStore store = new RocksDbTimeOrderedKeyValueBytesStoreSupplier("testing", 100).get(); Review Comment: For testing this was mostly just me being and picking a number I was sure would work. putting it as the grace period or higher would be the correct thing not in a test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1220212149 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRec; +private Serde keySerde; +private FullChangeSerde valueSerde; +private String topic; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod; +minTimestamp = Long.MAX_VALUE; +numRec = 0; +bufferSize = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) getter.valueSerde()) : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue = null; + +if (predicate.get()) { +final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod.toMillis()); +if (iterator.hasNext()) { +keyValue = iterator.next(); +} +if (keyValue == null) { +if (numRecords() == 0) { +minTimestamp = Long.MAX_VALUE; +} +return; +} +BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); + +Change value = valueSerde.deserializeParts( +topic, +new Change<>(bufferValue.newValue(), bufferValue.oldValue()) +); +while (keyValue != null && predicate.get() && wrapped().observedStreamTime - gracePeriod.toMillis() >= minTimestamp()) { +if (bufferValue.context().timestamp() != minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +}
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1220161797 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private FullChangeSerde valueSerde; +private final String topic; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) getter.valueSerde()) : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod); +try { Review Comment: I always forget that exists. Good call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1219946644 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRec; +private Serde keySerde; +private FullChangeSerde valueSerde; Review Comment: I agree for the most part. The issue comes with https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java I was running into problems fixing it and the PR kept ballooning. I might just have been doing something wrong. WHen We come back to fix the serialization error I would rather break it out into a second PR as it was getting messy -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1214823173 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; Review Comment: ah yeah, that makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1214821877 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedKeyValueBytesStoreSupplier.java: ## @@ -38,13 +35,12 @@ public RocksDBTimeOrderedKeyValueSegmentedBytesStore get() { name, metricsScope(), retentionPeriod, -Math.max(retentionPeriod / 2, 60_000L), -withIndex +Math.max(retentionPeriod / 2, 60_000L) ); } public String metricsScope() { -return "rocksdb-session"; +return "rocksdb-buffer"; Review Comment: Sure that is fine with me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1213589269 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueSegmentedBytesStore.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; +import org.rocksdb.WriteBatch; + +/** + * A RocksDB backed time-ordered segmented bytes store for window key schema. + */ +public class RocksDBTimeOrderedKeyValueSegmentedBytesStore extends AbstractRocksDBTimeOrderedSegmentedBytesStore { + +RocksDBTimeOrderedKeyValueSegmentedBytesStore(final String name, + final String metricsScope, + final long retention, + final long segmentInterval, + final boolean withIndex) { +super(name, metricsScope, retention, segmentInterval, new TimeFirstWindowKeySchema(), +Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() : null)); +} + +@Override +protected KeyValue getIndexKeyValue(final Bytes baseKey, final byte[] baseValue) { +throw new UnsupportedOperationException("Do not use for TimeOrderedKeyValueStore"); +} + +@Override +Map getWriteBatches(final Collection> records) { Review Comment: correct. I haven't finished the restoration logic yet ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRec; +private Serde keySerde; +private FullChangeSerde valueSerde;
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1213589269 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueSegmentedBytesStore.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; +import org.rocksdb.WriteBatch; + +/** + * A RocksDB backed time-ordered segmented bytes store for window key schema. + */ +public class RocksDBTimeOrderedKeyValueSegmentedBytesStore extends AbstractRocksDBTimeOrderedSegmentedBytesStore { + +RocksDBTimeOrderedKeyValueSegmentedBytesStore(final String name, + final String metricsScope, + final long retention, + final long segmentInterval, + final boolean withIndex) { +super(name, metricsScope, retention, segmentInterval, new TimeFirstWindowKeySchema(), +Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() : null)); +} + +@Override +protected KeyValue getIndexKeyValue(final Bytes baseKey, final byte[] baseValue) { +throw new UnsupportedOperationException("Do not use for TimeOrderedKeyValueStore"); +} + +@Override +Map getWriteBatches(final Collection> records) { Review Comment: correct. I haven't finished the restoration logic yet (haven't got to that yet) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1213395601 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRec; +private Serde keySerde; +private FullChangeSerde valueSerde; +private String topic; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod; +minTimestamp = Long.MAX_VALUE; +numRec = 0; +bufferSize = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) getter.valueSerde()) : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue = null; + +if (predicate.get()) { +final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod.toMillis()); Review Comment: Good catch, completely skipped my mind ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueSegmentedBytesStore.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord