mjsax commented on code in PR #21451: URL: https://github.com/apache/kafka/pull/21451#discussion_r2801600600
########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; +import org.apache.kafka.streams.state.ValueTimestampHeaders; + +import java.util.Objects; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; + +/** + * A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used for recording operation metrics, and hence + * its inner KeyValueStore implementation does not need to provide its own metrics collecting functionality. + * + * The inner {@link KeyValueStore} of this class is of type <Bytes, byte[]>, + * hence we use {@link Serde}s to convert from <K, ValueTimestampHeaders<V>> to <Bytes, byte[]>. + * + * @param <K> key type + * @param <V> value type (wrapped in {@link ValueTimestampHeaders}) + */ +public class MeteredTimestampedKeyValueStoreWithHeaders<K, V> + extends MeteredKeyValueStore<K, ValueTimestampHeaders<V>> + implements TimestampedKeyValueStoreWithHeaders<K, V> { + + MeteredTimestampedKeyValueStoreWithHeaders(final KeyValueStore<Bytes, byte[]> inner, + final String metricScope, + final Time time, + final Serde<K> keySerde, + final Serde<ValueTimestampHeaders<V>> valueSerde) { + super(inner, metricScope, time, keySerde, valueSerde); + } + + @SuppressWarnings("unchecked") + @Override + protected Serde<ValueTimestampHeaders<V>> prepareValueSerdeForStore(final Serde<ValueTimestampHeaders<V>> valueSerde, + final SerdeGetter getter) { + if (valueSerde == null) { + return new ValueTimestampHeadersSerde<>((Serde<V>) getter.valueSerde()); + } else { + return super.prepareValueSerdeForStore(valueSerde, getter); + } + } + + @Override + public ValueTimestampHeaders<V> get(final K key) { + Objects.requireNonNull(key, "key cannot be null"); + try { + return maybeMeasureLatency(() -> outerValue(wrapped().get(keyBytes(key))), time, getSensor); + } catch (final ProcessorStateException e) { + final String message = String.format(e.getMessage(), key); + throw new ProcessorStateException(message, e); + } + } + + @Override + public void put(final K key, + final ValueTimestampHeaders<V> value) { + Objects.requireNonNull(key, "key cannot be null"); + try { + final Headers headers = value.headers(); + maybeMeasureLatency(() -> wrapped().put(keyBytes(key, headers), serdes.rawValue(value, headers)), time, putSensor); + maybeRecordE2ELatency(); + } catch (final ProcessorStateException e) { + final String message = String.format(e.getMessage(), key, value); + throw new ProcessorStateException(message, e); + } + } + + protected ValueTimestampHeaders<V> outerValue(final byte[] value) { Review Comment: ```suggestion @Override protected ValueTimestampHeaders<V> outerValue(final byte[] value) { ``` ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; +import org.apache.kafka.streams.state.ValueTimestampHeaders; + +import java.util.Objects; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; + +/** + * A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used for recording operation metrics, and hence + * its inner KeyValueStore implementation does not need to provide its own metrics collecting functionality. + * + * The inner {@link KeyValueStore} of this class is of type <Bytes, byte[]>, + * hence we use {@link Serde}s to convert from <K, ValueTimestampHeaders<V>> to <Bytes, byte[]>. + * + * @param <K> key type + * @param <V> value type (wrapped in {@link ValueTimestampHeaders}) + */ +public class MeteredTimestampedKeyValueStoreWithHeaders<K, V> + extends MeteredKeyValueStore<K, ValueTimestampHeaders<V>> + implements TimestampedKeyValueStoreWithHeaders<K, V> { + + MeteredTimestampedKeyValueStoreWithHeaders(final KeyValueStore<Bytes, byte[]> inner, + final String metricScope, + final Time time, + final Serde<K> keySerde, + final Serde<ValueTimestampHeaders<V>> valueSerde) { + super(inner, metricScope, time, keySerde, valueSerde); + } + + @SuppressWarnings("unchecked") + @Override + protected Serde<ValueTimestampHeaders<V>> prepareValueSerdeForStore(final Serde<ValueTimestampHeaders<V>> valueSerde, + final SerdeGetter getter) { + if (valueSerde == null) { + return new ValueTimestampHeadersSerde<>((Serde<V>) getter.valueSerde()); + } else { + return super.prepareValueSerdeForStore(valueSerde, getter); + } + } + + @Override + public ValueTimestampHeaders<V> get(final K key) { + Objects.requireNonNull(key, "key cannot be null"); + try { + return maybeMeasureLatency(() -> outerValue(wrapped().get(keyBytes(key))), time, getSensor); Review Comment: I think we should not call the old `keyByte(key)` but the new `keyBytes(key, new RecordHeaders())` (cf my other commend on `keyBytes` below)? But this change should go into `MeteredKeyValueStore`? And if we push this change into `MeteredKeyValueStore` we might not need to override `get()` at all any longer? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; +import org.apache.kafka.streams.state.ValueTimestampHeaders; + +import java.util.Objects; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; + +/** + * A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used for recording operation metrics, and hence + * its inner KeyValueStore implementation does not need to provide its own metrics collecting functionality. + * + * The inner {@link KeyValueStore} of this class is of type <Bytes, byte[]>, + * hence we use {@link Serde}s to convert from <K, ValueTimestampHeaders<V>> to <Bytes, byte[]>. + * + * @param <K> key type + * @param <V> value type (wrapped in {@link ValueTimestampHeaders}) + */ +public class MeteredTimestampedKeyValueStoreWithHeaders<K, V> + extends MeteredKeyValueStore<K, ValueTimestampHeaders<V>> + implements TimestampedKeyValueStoreWithHeaders<K, V> { + + MeteredTimestampedKeyValueStoreWithHeaders(final KeyValueStore<Bytes, byte[]> inner, + final String metricScope, + final Time time, + final Serde<K> keySerde, + final Serde<ValueTimestampHeaders<V>> valueSerde) { + super(inner, metricScope, time, keySerde, valueSerde); + } + + @SuppressWarnings("unchecked") + @Override + protected Serde<ValueTimestampHeaders<V>> prepareValueSerdeForStore(final Serde<ValueTimestampHeaders<V>> valueSerde, + final SerdeGetter getter) { + if (valueSerde == null) { + return new ValueTimestampHeadersSerde<>((Serde<V>) getter.valueSerde()); + } else { + return super.prepareValueSerdeForStore(valueSerde, getter); + } + } + + @Override + public ValueTimestampHeaders<V> get(final K key) { + Objects.requireNonNull(key, "key cannot be null"); + try { + return maybeMeasureLatency(() -> outerValue(wrapped().get(keyBytes(key))), time, getSensor); + } catch (final ProcessorStateException e) { + final String message = String.format(e.getMessage(), key); + throw new ProcessorStateException(message, e); + } + } + + @Override + public void put(final K key, + final ValueTimestampHeaders<V> value) { + Objects.requireNonNull(key, "key cannot be null"); + try { + final Headers headers = value.headers(); + maybeMeasureLatency(() -> wrapped().put(keyBytes(key, headers), serdes.rawValue(value, headers)), time, putSensor); + maybeRecordE2ELatency(); + } catch (final ProcessorStateException e) { + final String message = String.format(e.getMessage(), key, value); + throw new ProcessorStateException(message, e); + } + } + + protected ValueTimestampHeaders<V> outerValue(final byte[] value) { + final Headers headers = ValueTimestampHeadersDeserializer.headers(value); + return value != null ? serdes.valueFrom(value, headers) : null; + } + + protected Bytes keyBytes(final K key, final Headers headers) { Review Comment: Do we really need this? It seems, we need to update `MeteredKeyValueStore` and add `Headers` parameter to `keyBytest()` there and also update the implementation to call `serdes.rawKey(key, headers)` The previous PR updating `StateSerdes` only added a TODO about this -- we should not get into a more messy intermediate state. -- Or does @frankvicky already work on PR to clean this up? Wondering about PR conflicts/race conditions. ########## streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java: ########## @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.ValueTimestampHeaders; +import org.apache.kafka.test.KeyValueIteratorStub; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +public class MeteredTimestampedKeyValueStoreWithHeadersTest { + private static final String APPLICATION_ID = "test-app"; + private static final String STORE_NAME = "store-name"; + private static final String STORE_TYPE = "scope"; + private static final String STORE_LEVEL_GROUP = "stream-state-metrics"; + private static final String CHANGELOG_TOPIC = "changelog-topic-name"; + private static final String THREAD_ID_TAG_KEY = "thread-id"; + private static final String KEY = "key"; + private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes()); + private static final RecordHeaders HEADERS = makeHeaders(); + private static final ValueTimestampHeaders<String> VALUE_TIMESTAMP_HEADERS = + ValueTimestampHeaders.make("value", 97L, HEADERS); + private static final byte[] VALUE_TIMESTAMP_HEADERS_BYTES = serializeValueTimestampHeaders(VALUE_TIMESTAMP_HEADERS); + private final String threadId = Thread.currentThread().getName(); + private final TaskId taskId = new TaskId(0, 0, "My-Topology"); + @Mock + private KeyValueStore<Bytes, byte[]> inner; + @Mock + private InternalProcessorContext<?, ?> context; + private MockTime mockTime; + + private static final Map<String, Object> CONFIGS = + mkMap(mkEntry(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, APPLICATION_ID)); + + private MeteredTimestampedKeyValueStoreWithHeaders<String, String> metered; + private final KeyValue<Bytes, byte[]> byteKeyValueTimestampHeadersPair = KeyValue.pair(KEY_BYTES, + VALUE_TIMESTAMP_HEADERS_BYTES + ); + private final Metrics metrics = new Metrics(); + private Map<String, String> tags; + + private void setUpWithoutContext() { + mockTime = new MockTime(); + metered = new MeteredTimestampedKeyValueStoreWithHeaders<>( + inner, + "scope", + mockTime, + Serdes.String(), + new ValueTimestampHeadersSerde<>(Serdes.String()) + ); + metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); + tags = mkMap( + mkEntry(THREAD_ID_TAG_KEY, threadId), + mkEntry("task-id", taskId.toString()), + mkEntry(STORE_TYPE + "-state-id", STORE_NAME) + ); + } + + private void setUp() { + setUpWithoutContext(); + when(context.applicationId()).thenReturn(APPLICATION_ID); + when(context.metrics()) + .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime)); + when(context.taskId()).thenReturn(taskId); + when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); + when(inner.name()).thenReturn(STORE_NAME); + when(context.appConfigs()).thenReturn(CONFIGS); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private void setUpWithExpectSerdes() { + setUp(); + when(context.keySerde()).thenReturn((Serde) Serdes.String()); + when(context.valueSerde()).thenReturn((Serde) Serdes.Long()); + } + + private void init() { + metered.init(context, metered); + } + + @Test + public void shouldDelegateInit() { + setUp(); + final MeteredTimestampedKeyValueStoreWithHeaders<String, String> outer = new MeteredTimestampedKeyValueStoreWithHeaders<>( + inner, + STORE_TYPE, + new MockTime(), + Serdes.String(), + new ValueTimestampHeadersSerde<>(Serdes.String()) + ); + doNothing().when(inner).init(context, outer); + outer.init(context, outer); + } + + @Test + public void shouldPassChangelogTopicNameToStateStoreSerde() { + setUp(); + doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC); + } + + @Test + public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() { + setUp(); + final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, taskId.topologyName()); + when(context.changelogFor(STORE_NAME)).thenReturn(null); + doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName); + } + + @Test + public void testMetrics() { + setUp(); + init(); + final JmxReporter reporter = new JmxReporter(); + final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams"); + reporter.contextChange(metricsContext); + + metrics.addReporter(reporter); + assertTrue(reporter.containsMbean(String.format( + "kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s", + STORE_LEVEL_GROUP, + THREAD_ID_TAG_KEY, + threadId, + taskId, + STORE_TYPE, + STORE_NAME + ))); + } + + @Test + public void shouldWriteBytesToInnerStoreAndRecordPutMetric() { + setUp(); + doNothing().when(inner).put(any(Bytes.class), any(byte[].class)); + init(); + + metered.put(KEY, VALUE_TIMESTAMP_HEADERS); + + final KafkaMetric metric = metric("put-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldGetBytesFromInnerStoreAndReturnGetMetric() { + setUp(); + when(inner.get(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES); + init(); + + assertEquals(VALUE_TIMESTAMP_HEADERS, metered.get(KEY)); + + final KafkaMetric metric = metric("get-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() { + setUp(); + when(inner.putIfAbsent(any(Bytes.class), any(byte[].class))).thenReturn(null); + init(); + + metered.putIfAbsent(KEY, VALUE_TIMESTAMP_HEADERS); + + final KafkaMetric metric = metric("put-if-absent-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldPutAllToInnerStoreAndRecordPutAllMetric() { + setUp(); + doNothing().when(inner).putAll(any(List.class)); + init(); + + metered.putAll(Collections.singletonList(KeyValue.pair(KEY, VALUE_TIMESTAMP_HEADERS))); + + final KafkaMetric metric = metric("put-all-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() { + setUp(); + when(inner.delete(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES); + init(); + + metered.delete(KEY); + + final KafkaMetric metric = metric("delete-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() { + setUp(); + when(inner.range(any(Bytes.class), any(Bytes.class))).thenReturn( + new KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampHeadersPair).iterator())); + init(); + + try(final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.range(KEY, KEY)) { + assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value); + assertFalse(iterator.hasNext()); + } + + final KafkaMetric metric = metric("range-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldGetAllFromInnerStoreAndRecordAllMetric() { + setUp(); + when(inner.all()) + .thenReturn(new KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampHeadersPair).iterator())); + init(); + + try(final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.all()) { + assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value); + assertFalse(iterator.hasNext()); + } + + final KafkaMetric metric = metric(new MetricName("all-rate", STORE_LEVEL_GROUP, "", tags)); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldCommitInnerWhenCommitTimeRecords() { + setUp(); + doNothing().when(inner).commit(Map.of()); + init(); + + metered.commit(Map.of()); + + final KafkaMetric metric = metric("flush-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + private interface CachedKeyValueStore extends KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> { } + + @SuppressWarnings("unchecked") + @Test + public void shouldSetFlushListenerOnWrappedCachingStore() { + setUpWithoutContext(); + final CachedKeyValueStore cachedKeyValueStore = mock(CachedKeyValueStore.class); + + when(cachedKeyValueStore.setFlushListener(any(CacheFlushListener.class), eq(false))).thenReturn(true); + + metered = new MeteredTimestampedKeyValueStoreWithHeaders<>( + cachedKeyValueStore, + STORE_TYPE, + new MockTime(), + Serdes.String(), + new ValueTimestampHeadersSerde<>(Serdes.String())); + assertTrue(metered.setFlushListener(null, false)); + } + + @Test + public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() { + setUpWithoutContext(); + assertFalse(metered.setFlushListener(null, false)); + } + + @Test + public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() { + setUpWithExpectSerdes(); + final MeteredTimestampedKeyValueStoreWithHeaders<String, Long> store = new MeteredTimestampedKeyValueStoreWithHeaders<>( + inner, + STORE_TYPE, + new MockTime(), + null, + null + ); + store.init(context, inner); + + try { + store.put("key", ValueTimestampHeaders.make(42L, 60000, new RecordHeaders())); + } catch (final StreamsException exception) { + if (exception.getCause() instanceof ClassCastException) { + throw new AssertionError( + "Serdes are not correctly set from processor context.", exception); + } else { + throw exception; + } + } + } + + @Test + public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() { + setUp(); + final MeteredTimestampedKeyValueStoreWithHeaders<String, Long> store = new MeteredTimestampedKeyValueStoreWithHeaders<>( + inner, + STORE_TYPE, + new MockTime(), + Serdes.String(), + new ValueTimestampHeadersSerde<>(Serdes.Long()) + ); + store.init(context, inner); + + try { + store.put("key", ValueTimestampHeaders.make(42L, 60000, new RecordHeaders())); + } catch (final StreamsException exception) { + if (exception.getCause() instanceof ClassCastException) { + fail("Serdes are not correctly set from constructor parameters."); + } + throw exception; + } + } + + @SuppressWarnings("unused") + @Test + public void shouldTrackOpenIteratorsMetric() { + setUp(); + when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); + init(); + + final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); + assertNotNull(openIteratorsMetric); + + assertEquals(0L, (Long) openIteratorsMetric.metricValue()); + + try (final KeyValueIterator<String, ValueTimestampHeaders<String>> unused = metered.all()) { + assertEquals(1L, (Long) openIteratorsMetric.metricValue()); + } + + assertEquals(0L, (Long) openIteratorsMetric.metricValue()); + } + + @SuppressWarnings("unused") + @Test + public void shouldTimeIteratorDuration() { + setUp(); + when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); + init(); + + final KafkaMetric iteratorDurationAvgMetric = metric("iterator-duration-avg"); + final KafkaMetric iteratorDurationMaxMetric = metric("iterator-duration-max"); + assertNotNull(iteratorDurationAvgMetric); + assertNotNull(iteratorDurationMaxMetric); + + assertEquals(Double.NaN, (Double) iteratorDurationAvgMetric.metricValue()); + assertEquals(Double.NaN, (Double) iteratorDurationMaxMetric.metricValue()); + + try (final KeyValueIterator<String, ValueTimestampHeaders<String>> unused = metered.all()) { + // nothing to do, just close immediately + mockTime.sleep(2); + } + + assertEquals(2.0 * TimeUnit.MILLISECONDS.toNanos(1), (double) iteratorDurationAvgMetric.metricValue()); + assertEquals(2.0 * TimeUnit.MILLISECONDS.toNanos(1), (double) iteratorDurationMaxMetric.metricValue()); + + try (final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.all()) { + // nothing to do, just close immediately + mockTime.sleep(3); + } + + assertEquals(2.5 * TimeUnit.MILLISECONDS.toNanos(1), (double) iteratorDurationAvgMetric.metricValue()); + assertEquals(3.0 * TimeUnit.MILLISECONDS.toNanos(1), (double) iteratorDurationMaxMetric.metricValue()); + } + + @SuppressWarnings("unused") + @Test + public void shouldTrackOldestOpenIteratorTimestamp() { + setUp(); + when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); + init(); + + final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); + assertNotNull(oldestIteratorTimestampMetric); + + KeyValueIterator<String, ValueTimestampHeaders<String>> second = null; + final long secondTimestamp; + try { + try (final KeyValueIterator<String, ValueTimestampHeaders<String>> unused = metered.all()) { + + final long oldestTimestamp = mockTime.milliseconds(); + assertEquals(oldestTimestamp, (Long) oldestIteratorTimestampMetric.metricValue()); + mockTime.sleep(100); + + // open a second iterator before closing the first to test that we still produce the first iterator's timestamp + second = metered.all(); + secondTimestamp = mockTime.milliseconds(); + assertEquals(oldestTimestamp, (Long) oldestIteratorTimestampMetric.metricValue()); + mockTime.sleep(100); + } + + // now that the first iterator is closed, check that the timestamp has advanced to the still open second iterator + assertEquals(secondTimestamp, (Long) oldestIteratorTimestampMetric.metricValue()); + } finally { + if (second != null) { + second.close(); + } + } + // now that all iterators are closed, the metric should be zero + assertEquals(0L, (Long) oldestIteratorTimestampMetric.metricValue()); + } + + private static RecordHeaders makeHeaders() { + RecordHeaders headers = new RecordHeaders(); + headers.add("header-key", "header-value".getBytes()); + return headers; + } + + private static byte[] serializeValueTimestampHeaders(ValueTimestampHeaders<String> vth) { + ValueTimestampHeadersSerializer<String> serializer = new ValueTimestampHeadersSerializer<>(Serdes.String().serializer()); + return serializer.serialize("topic", vth); + } + + @SuppressWarnings("unchecked") + private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) { + final Serde<String> keySerde = mock(Serde.class); + final Serializer<String> keySerializer = mock(Serializer.class); + final Serde<ValueTimestampHeaders<String>> valueSerde = mock(Serde.class); + final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = mock(Deserializer.class); + final Serializer<ValueTimestampHeaders<String>> valueSerializer = mock(Serializer.class); + when(keySerde.serializer()).thenReturn(keySerializer); + // Mock both 2-arg and 3-arg versions of serialize - use lenient to avoid strict stubbing errors + lenient().when(keySerializer.serialize(topic, KEY)).thenReturn(KEY.getBytes()); + lenient().when(keySerializer.serialize(eq(topic), any(RecordHeaders.class), eq(KEY))).thenReturn(KEY.getBytes()); + when(valueSerde.deserializer()).thenReturn(valueDeserializer); + // Mock both 2-arg and 3-arg versions of deserialize - use lenient to avoid strict stubbing errors Review Comment: Same ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; +import org.apache.kafka.streams.state.ValueTimestampHeaders; + +import java.util.Objects; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; + +/** + * A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used for recording operation metrics, and hence + * its inner KeyValueStore implementation does not need to provide its own metrics collecting functionality. + * + * The inner {@link KeyValueStore} of this class is of type <Bytes, byte[]>, + * hence we use {@link Serde}s to convert from <K, ValueTimestampHeaders<V>> to <Bytes, byte[]>. + * + * @param <K> key type + * @param <V> value type (wrapped in {@link ValueTimestampHeaders}) + */ +public class MeteredTimestampedKeyValueStoreWithHeaders<K, V> Review Comment: Do we also need to overwrite `query()` and `position()` as long as this new store does not support IQv2? Otherwise, we might try to execute `KeyQuery` and `RangeQuery` defined in `MeteredKyeValueStore` but crash on it as the byte format does not match? Or would we get `KeyQuery` and `RangeQuery` for free and it would just work? Might be worth to verify (and if it works, add a test in a follow up PR, when we have all pieces together)? If we are unsure about it, let's file a ticket and cycle back on this question later. ########## streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java: ########## @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.ValueTimestampHeaders; +import org.apache.kafka.test.KeyValueIteratorStub; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +public class MeteredTimestampedKeyValueStoreWithHeadersTest { + private static final String APPLICATION_ID = "test-app"; + private static final String STORE_NAME = "store-name"; + private static final String STORE_TYPE = "scope"; + private static final String STORE_LEVEL_GROUP = "stream-state-metrics"; + private static final String CHANGELOG_TOPIC = "changelog-topic-name"; + private static final String THREAD_ID_TAG_KEY = "thread-id"; + private static final String KEY = "key"; + private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes()); + private static final RecordHeaders HEADERS = makeHeaders(); + private static final ValueTimestampHeaders<String> VALUE_TIMESTAMP_HEADERS = + ValueTimestampHeaders.make("value", 97L, HEADERS); + private static final byte[] VALUE_TIMESTAMP_HEADERS_BYTES = serializeValueTimestampHeaders(VALUE_TIMESTAMP_HEADERS); + private final String threadId = Thread.currentThread().getName(); + private final TaskId taskId = new TaskId(0, 0, "My-Topology"); + @Mock + private KeyValueStore<Bytes, byte[]> inner; + @Mock + private InternalProcessorContext<?, ?> context; + private MockTime mockTime; + + private static final Map<String, Object> CONFIGS = + mkMap(mkEntry(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, APPLICATION_ID)); + + private MeteredTimestampedKeyValueStoreWithHeaders<String, String> metered; + private final KeyValue<Bytes, byte[]> byteKeyValueTimestampHeadersPair = KeyValue.pair(KEY_BYTES, + VALUE_TIMESTAMP_HEADERS_BYTES + ); + private final Metrics metrics = new Metrics(); + private Map<String, String> tags; + + private void setUpWithoutContext() { + mockTime = new MockTime(); + metered = new MeteredTimestampedKeyValueStoreWithHeaders<>( + inner, + "scope", + mockTime, + Serdes.String(), + new ValueTimestampHeadersSerde<>(Serdes.String()) + ); + metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); + tags = mkMap( + mkEntry(THREAD_ID_TAG_KEY, threadId), + mkEntry("task-id", taskId.toString()), + mkEntry(STORE_TYPE + "-state-id", STORE_NAME) + ); + } + + private void setUp() { + setUpWithoutContext(); + when(context.applicationId()).thenReturn(APPLICATION_ID); + when(context.metrics()) + .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime)); + when(context.taskId()).thenReturn(taskId); + when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); + when(inner.name()).thenReturn(STORE_NAME); + when(context.appConfigs()).thenReturn(CONFIGS); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private void setUpWithExpectSerdes() { + setUp(); + when(context.keySerde()).thenReturn((Serde) Serdes.String()); + when(context.valueSerde()).thenReturn((Serde) Serdes.Long()); + } + + private void init() { + metered.init(context, metered); + } + + @Test + public void shouldDelegateInit() { + setUp(); + final MeteredTimestampedKeyValueStoreWithHeaders<String, String> outer = new MeteredTimestampedKeyValueStoreWithHeaders<>( + inner, + STORE_TYPE, + new MockTime(), + Serdes.String(), + new ValueTimestampHeadersSerde<>(Serdes.String()) + ); + doNothing().when(inner).init(context, outer); + outer.init(context, outer); + } + + @Test + public void shouldPassChangelogTopicNameToStateStoreSerde() { + setUp(); + doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC); + } + + @Test + public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() { + setUp(); + final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, taskId.topologyName()); + when(context.changelogFor(STORE_NAME)).thenReturn(null); + doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName); + } + + @Test + public void testMetrics() { + setUp(); + init(); + final JmxReporter reporter = new JmxReporter(); + final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams"); + reporter.contextChange(metricsContext); + + metrics.addReporter(reporter); + assertTrue(reporter.containsMbean(String.format( + "kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s", + STORE_LEVEL_GROUP, + THREAD_ID_TAG_KEY, + threadId, + taskId, + STORE_TYPE, + STORE_NAME + ))); + } + + @Test + public void shouldWriteBytesToInnerStoreAndRecordPutMetric() { + setUp(); + doNothing().when(inner).put(any(Bytes.class), any(byte[].class)); + init(); + + metered.put(KEY, VALUE_TIMESTAMP_HEADERS); + + final KafkaMetric metric = metric("put-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldGetBytesFromInnerStoreAndReturnGetMetric() { + setUp(); + when(inner.get(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES); + init(); + + assertEquals(VALUE_TIMESTAMP_HEADERS, metered.get(KEY)); + + final KafkaMetric metric = metric("get-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() { + setUp(); + when(inner.putIfAbsent(any(Bytes.class), any(byte[].class))).thenReturn(null); + init(); + + metered.putIfAbsent(KEY, VALUE_TIMESTAMP_HEADERS); + + final KafkaMetric metric = metric("put-if-absent-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldPutAllToInnerStoreAndRecordPutAllMetric() { + setUp(); + doNothing().when(inner).putAll(any(List.class)); + init(); + + metered.putAll(Collections.singletonList(KeyValue.pair(KEY, VALUE_TIMESTAMP_HEADERS))); + + final KafkaMetric metric = metric("put-all-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() { + setUp(); + when(inner.delete(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES); + init(); + + metered.delete(KEY); + + final KafkaMetric metric = metric("delete-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() { + setUp(); + when(inner.range(any(Bytes.class), any(Bytes.class))).thenReturn( + new KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampHeadersPair).iterator())); + init(); + + try(final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.range(KEY, KEY)) { + assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value); + assertFalse(iterator.hasNext()); + } + + final KafkaMetric metric = metric("range-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldGetAllFromInnerStoreAndRecordAllMetric() { + setUp(); + when(inner.all()) + .thenReturn(new KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampHeadersPair).iterator())); + init(); + + try(final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.all()) { + assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value); + assertFalse(iterator.hasNext()); + } + + final KafkaMetric metric = metric(new MetricName("all-rate", STORE_LEVEL_GROUP, "", tags)); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldCommitInnerWhenCommitTimeRecords() { + setUp(); + doNothing().when(inner).commit(Map.of()); + init(); + + metered.commit(Map.of()); + + final KafkaMetric metric = metric("flush-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + private interface CachedKeyValueStore extends KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> { } + + @SuppressWarnings("unchecked") + @Test + public void shouldSetFlushListenerOnWrappedCachingStore() { + setUpWithoutContext(); + final CachedKeyValueStore cachedKeyValueStore = mock(CachedKeyValueStore.class); + + when(cachedKeyValueStore.setFlushListener(any(CacheFlushListener.class), eq(false))).thenReturn(true); + + metered = new MeteredTimestampedKeyValueStoreWithHeaders<>( + cachedKeyValueStore, + STORE_TYPE, + new MockTime(), + Serdes.String(), + new ValueTimestampHeadersSerde<>(Serdes.String())); + assertTrue(metered.setFlushListener(null, false)); + } + + @Test + public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() { + setUpWithoutContext(); + assertFalse(metered.setFlushListener(null, false)); + } + + @Test + public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() { + setUpWithExpectSerdes(); + final MeteredTimestampedKeyValueStoreWithHeaders<String, Long> store = new MeteredTimestampedKeyValueStoreWithHeaders<>( + inner, + STORE_TYPE, + new MockTime(), + null, + null + ); + store.init(context, inner); + + try { + store.put("key", ValueTimestampHeaders.make(42L, 60000, new RecordHeaders())); + } catch (final StreamsException exception) { + if (exception.getCause() instanceof ClassCastException) { + throw new AssertionError( + "Serdes are not correctly set from processor context.", exception); + } else { + throw exception; + } + } + } + + @Test + public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() { + setUp(); + final MeteredTimestampedKeyValueStoreWithHeaders<String, Long> store = new MeteredTimestampedKeyValueStoreWithHeaders<>( + inner, + STORE_TYPE, + new MockTime(), + Serdes.String(), + new ValueTimestampHeadersSerde<>(Serdes.Long()) + ); + store.init(context, inner); + + try { + store.put("key", ValueTimestampHeaders.make(42L, 60000, new RecordHeaders())); + } catch (final StreamsException exception) { + if (exception.getCause() instanceof ClassCastException) { + fail("Serdes are not correctly set from constructor parameters."); + } + throw exception; + } + } + + @SuppressWarnings("unused") + @Test + public void shouldTrackOpenIteratorsMetric() { + setUp(); + when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); + init(); + + final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); + assertNotNull(openIteratorsMetric); + + assertEquals(0L, (Long) openIteratorsMetric.metricValue()); + + try (final KeyValueIterator<String, ValueTimestampHeaders<String>> unused = metered.all()) { + assertEquals(1L, (Long) openIteratorsMetric.metricValue()); + } + + assertEquals(0L, (Long) openIteratorsMetric.metricValue()); + } + + @SuppressWarnings("unused") + @Test + public void shouldTimeIteratorDuration() { + setUp(); + when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); + init(); + + final KafkaMetric iteratorDurationAvgMetric = metric("iterator-duration-avg"); + final KafkaMetric iteratorDurationMaxMetric = metric("iterator-duration-max"); + assertNotNull(iteratorDurationAvgMetric); + assertNotNull(iteratorDurationMaxMetric); + + assertEquals(Double.NaN, (Double) iteratorDurationAvgMetric.metricValue()); + assertEquals(Double.NaN, (Double) iteratorDurationMaxMetric.metricValue()); + + try (final KeyValueIterator<String, ValueTimestampHeaders<String>> unused = metered.all()) { + // nothing to do, just close immediately + mockTime.sleep(2); + } + + assertEquals(2.0 * TimeUnit.MILLISECONDS.toNanos(1), (double) iteratorDurationAvgMetric.metricValue()); + assertEquals(2.0 * TimeUnit.MILLISECONDS.toNanos(1), (double) iteratorDurationMaxMetric.metricValue()); + + try (final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.all()) { + // nothing to do, just close immediately + mockTime.sleep(3); + } + + assertEquals(2.5 * TimeUnit.MILLISECONDS.toNanos(1), (double) iteratorDurationAvgMetric.metricValue()); + assertEquals(3.0 * TimeUnit.MILLISECONDS.toNanos(1), (double) iteratorDurationMaxMetric.metricValue()); + } + + @SuppressWarnings("unused") + @Test + public void shouldTrackOldestOpenIteratorTimestamp() { + setUp(); + when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); + init(); + + final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); + assertNotNull(oldestIteratorTimestampMetric); + + KeyValueIterator<String, ValueTimestampHeaders<String>> second = null; + final long secondTimestamp; + try { + try (final KeyValueIterator<String, ValueTimestampHeaders<String>> unused = metered.all()) { + + final long oldestTimestamp = mockTime.milliseconds(); + assertEquals(oldestTimestamp, (Long) oldestIteratorTimestampMetric.metricValue()); + mockTime.sleep(100); + + // open a second iterator before closing the first to test that we still produce the first iterator's timestamp + second = metered.all(); + secondTimestamp = mockTime.milliseconds(); + assertEquals(oldestTimestamp, (Long) oldestIteratorTimestampMetric.metricValue()); + mockTime.sleep(100); + } + + // now that the first iterator is closed, check that the timestamp has advanced to the still open second iterator + assertEquals(secondTimestamp, (Long) oldestIteratorTimestampMetric.metricValue()); + } finally { + if (second != null) { + second.close(); + } + } + // now that all iterators are closed, the metric should be zero + assertEquals(0L, (Long) oldestIteratorTimestampMetric.metricValue()); + } + + private static RecordHeaders makeHeaders() { + RecordHeaders headers = new RecordHeaders(); + headers.add("header-key", "header-value".getBytes()); + return headers; + } + + private static byte[] serializeValueTimestampHeaders(ValueTimestampHeaders<String> vth) { + ValueTimestampHeadersSerializer<String> serializer = new ValueTimestampHeadersSerializer<>(Serdes.String().serializer()); + return serializer.serialize("topic", vth); + } + + @SuppressWarnings("unchecked") + private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) { + final Serde<String> keySerde = mock(Serde.class); + final Serializer<String> keySerializer = mock(Serializer.class); + final Serde<ValueTimestampHeaders<String>> valueSerde = mock(Serde.class); + final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = mock(Deserializer.class); + final Serializer<ValueTimestampHeaders<String>> valueSerializer = mock(Serializer.class); + when(keySerde.serializer()).thenReturn(keySerializer); + // Mock both 2-arg and 3-arg versions of serialize - use lenient to avoid strict stubbing errors + lenient().when(keySerializer.serialize(topic, KEY)).thenReturn(KEY.getBytes()); + lenient().when(keySerializer.serialize(eq(topic), any(RecordHeaders.class), eq(KEY))).thenReturn(KEY.getBytes()); + when(valueSerde.deserializer()).thenReturn(valueDeserializer); + // Mock both 2-arg and 3-arg versions of deserialize - use lenient to avoid strict stubbing errors + lenient().when(valueDeserializer.deserialize(eq(topic), any(byte[].class))).thenReturn(VALUE_TIMESTAMP_HEADERS); + lenient().when(valueDeserializer.deserialize(eq(topic), any(RecordHeaders.class), any(byte[].class))).thenReturn(VALUE_TIMESTAMP_HEADERS); + when(valueSerde.serializer()).thenReturn(valueSerializer); + // Mock both 2-arg and 3-arg versions of serialize - use lenient to avoid strict stubbing errors Review Comment: again ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; +import org.apache.kafka.streams.state.ValueTimestampHeaders; + +import java.util.Objects; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; + +/** + * A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used for recording operation metrics, and hence + * its inner KeyValueStore implementation does not need to provide its own metrics collecting functionality. + * + * The inner {@link KeyValueStore} of this class is of type <Bytes, byte[]>, + * hence we use {@link Serde}s to convert from <K, ValueTimestampHeaders<V>> to <Bytes, byte[]>. + * + * @param <K> key type + * @param <V> value type (wrapped in {@link ValueTimestampHeaders}) + */ +public class MeteredTimestampedKeyValueStoreWithHeaders<K, V> + extends MeteredKeyValueStore<K, ValueTimestampHeaders<V>> + implements TimestampedKeyValueStoreWithHeaders<K, V> { + + MeteredTimestampedKeyValueStoreWithHeaders(final KeyValueStore<Bytes, byte[]> inner, + final String metricScope, + final Time time, + final Serde<K> keySerde, + final Serde<ValueTimestampHeaders<V>> valueSerde) { + super(inner, metricScope, time, keySerde, valueSerde); + } + + @SuppressWarnings("unchecked") + @Override + protected Serde<ValueTimestampHeaders<V>> prepareValueSerdeForStore(final Serde<ValueTimestampHeaders<V>> valueSerde, + final SerdeGetter getter) { + if (valueSerde == null) { + return new ValueTimestampHeadersSerde<>((Serde<V>) getter.valueSerde()); + } else { + return super.prepareValueSerdeForStore(valueSerde, getter); + } + } + + @Override + public ValueTimestampHeaders<V> get(final K key) { + Objects.requireNonNull(key, "key cannot be null"); + try { + return maybeMeasureLatency(() -> outerValue(wrapped().get(keyBytes(key))), time, getSensor); + } catch (final ProcessorStateException e) { + final String message = String.format(e.getMessage(), key); + throw new ProcessorStateException(message, e); + } + } + + @Override + public void put(final K key, + final ValueTimestampHeaders<V> value) { + Objects.requireNonNull(key, "key cannot be null"); + try { + final Headers headers = value.headers(); Review Comment: ```suggestion final Headers headers = value != null ? value.headers() : new RecordHeaders(); ``` ########## streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java: ########## @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.ValueTimestampHeaders; +import org.apache.kafka.test.KeyValueIteratorStub; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +public class MeteredTimestampedKeyValueStoreWithHeadersTest { + private static final String APPLICATION_ID = "test-app"; + private static final String STORE_NAME = "store-name"; + private static final String STORE_TYPE = "scope"; + private static final String STORE_LEVEL_GROUP = "stream-state-metrics"; + private static final String CHANGELOG_TOPIC = "changelog-topic-name"; + private static final String THREAD_ID_TAG_KEY = "thread-id"; + private static final String KEY = "key"; + private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes()); + private static final RecordHeaders HEADERS = makeHeaders(); + private static final ValueTimestampHeaders<String> VALUE_TIMESTAMP_HEADERS = + ValueTimestampHeaders.make("value", 97L, HEADERS); + private static final byte[] VALUE_TIMESTAMP_HEADERS_BYTES = serializeValueTimestampHeaders(VALUE_TIMESTAMP_HEADERS); + private final String threadId = Thread.currentThread().getName(); + private final TaskId taskId = new TaskId(0, 0, "My-Topology"); + @Mock + private KeyValueStore<Bytes, byte[]> inner; + @Mock + private InternalProcessorContext<?, ?> context; + private MockTime mockTime; + + private static final Map<String, Object> CONFIGS = + mkMap(mkEntry(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, APPLICATION_ID)); + + private MeteredTimestampedKeyValueStoreWithHeaders<String, String> metered; + private final KeyValue<Bytes, byte[]> byteKeyValueTimestampHeadersPair = KeyValue.pair(KEY_BYTES, + VALUE_TIMESTAMP_HEADERS_BYTES + ); + private final Metrics metrics = new Metrics(); + private Map<String, String> tags; + + private void setUpWithoutContext() { + mockTime = new MockTime(); + metered = new MeteredTimestampedKeyValueStoreWithHeaders<>( + inner, + "scope", + mockTime, + Serdes.String(), + new ValueTimestampHeadersSerde<>(Serdes.String()) + ); + metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); + tags = mkMap( + mkEntry(THREAD_ID_TAG_KEY, threadId), + mkEntry("task-id", taskId.toString()), + mkEntry(STORE_TYPE + "-state-id", STORE_NAME) + ); + } + + private void setUp() { + setUpWithoutContext(); + when(context.applicationId()).thenReturn(APPLICATION_ID); + when(context.metrics()) + .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime)); + when(context.taskId()).thenReturn(taskId); + when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); + when(inner.name()).thenReturn(STORE_NAME); + when(context.appConfigs()).thenReturn(CONFIGS); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private void setUpWithExpectSerdes() { + setUp(); + when(context.keySerde()).thenReturn((Serde) Serdes.String()); + when(context.valueSerde()).thenReturn((Serde) Serdes.Long()); + } + + private void init() { + metered.init(context, metered); + } + + @Test + public void shouldDelegateInit() { + setUp(); + final MeteredTimestampedKeyValueStoreWithHeaders<String, String> outer = new MeteredTimestampedKeyValueStoreWithHeaders<>( + inner, + STORE_TYPE, + new MockTime(), + Serdes.String(), + new ValueTimestampHeadersSerde<>(Serdes.String()) + ); + doNothing().when(inner).init(context, outer); + outer.init(context, outer); + } + + @Test + public void shouldPassChangelogTopicNameToStateStoreSerde() { + setUp(); + doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC); + } + + @Test + public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() { + setUp(); + final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, taskId.topologyName()); + when(context.changelogFor(STORE_NAME)).thenReturn(null); + doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName); + } + + @Test + public void testMetrics() { + setUp(); + init(); + final JmxReporter reporter = new JmxReporter(); + final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams"); + reporter.contextChange(metricsContext); + + metrics.addReporter(reporter); + assertTrue(reporter.containsMbean(String.format( + "kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s", + STORE_LEVEL_GROUP, + THREAD_ID_TAG_KEY, + threadId, + taskId, + STORE_TYPE, + STORE_NAME + ))); + } + + @Test + public void shouldWriteBytesToInnerStoreAndRecordPutMetric() { + setUp(); + doNothing().when(inner).put(any(Bytes.class), any(byte[].class)); + init(); + + metered.put(KEY, VALUE_TIMESTAMP_HEADERS); + + final KafkaMetric metric = metric("put-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldGetBytesFromInnerStoreAndReturnGetMetric() { + setUp(); + when(inner.get(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES); + init(); + + assertEquals(VALUE_TIMESTAMP_HEADERS, metered.get(KEY)); + + final KafkaMetric metric = metric("get-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() { + setUp(); + when(inner.putIfAbsent(any(Bytes.class), any(byte[].class))).thenReturn(null); + init(); + + metered.putIfAbsent(KEY, VALUE_TIMESTAMP_HEADERS); + + final KafkaMetric metric = metric("put-if-absent-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldPutAllToInnerStoreAndRecordPutAllMetric() { + setUp(); + doNothing().when(inner).putAll(any(List.class)); + init(); + + metered.putAll(Collections.singletonList(KeyValue.pair(KEY, VALUE_TIMESTAMP_HEADERS))); + + final KafkaMetric metric = metric("put-all-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() { + setUp(); + when(inner.delete(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES); + init(); + + metered.delete(KEY); + + final KafkaMetric metric = metric("delete-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() { + setUp(); + when(inner.range(any(Bytes.class), any(Bytes.class))).thenReturn( + new KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampHeadersPair).iterator())); + init(); + + try(final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.range(KEY, KEY)) { + assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value); + assertFalse(iterator.hasNext()); + } + + final KafkaMetric metric = metric("range-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldGetAllFromInnerStoreAndRecordAllMetric() { + setUp(); + when(inner.all()) + .thenReturn(new KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampHeadersPair).iterator())); + init(); + + try(final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.all()) { + assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value); + assertFalse(iterator.hasNext()); + } + + final KafkaMetric metric = metric(new MetricName("all-rate", STORE_LEVEL_GROUP, "", tags)); + assertTrue((Double) metric.metricValue() > 0); + } + + @Test + public void shouldCommitInnerWhenCommitTimeRecords() { + setUp(); + doNothing().when(inner).commit(Map.of()); + init(); + + metered.commit(Map.of()); + + final KafkaMetric metric = metric("flush-rate"); + assertTrue((Double) metric.metricValue() > 0); + } + + private interface CachedKeyValueStore extends KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> { } + + @SuppressWarnings("unchecked") + @Test + public void shouldSetFlushListenerOnWrappedCachingStore() { + setUpWithoutContext(); + final CachedKeyValueStore cachedKeyValueStore = mock(CachedKeyValueStore.class); + + when(cachedKeyValueStore.setFlushListener(any(CacheFlushListener.class), eq(false))).thenReturn(true); + + metered = new MeteredTimestampedKeyValueStoreWithHeaders<>( + cachedKeyValueStore, + STORE_TYPE, + new MockTime(), + Serdes.String(), + new ValueTimestampHeadersSerde<>(Serdes.String())); + assertTrue(metered.setFlushListener(null, false)); + } + + @Test + public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() { + setUpWithoutContext(); + assertFalse(metered.setFlushListener(null, false)); + } + + @Test + public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() { + setUpWithExpectSerdes(); + final MeteredTimestampedKeyValueStoreWithHeaders<String, Long> store = new MeteredTimestampedKeyValueStoreWithHeaders<>( + inner, + STORE_TYPE, + new MockTime(), + null, + null + ); + store.init(context, inner); + + try { + store.put("key", ValueTimestampHeaders.make(42L, 60000, new RecordHeaders())); + } catch (final StreamsException exception) { + if (exception.getCause() instanceof ClassCastException) { + throw new AssertionError( + "Serdes are not correctly set from processor context.", exception); + } else { + throw exception; + } + } + } + + @Test + public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() { + setUp(); + final MeteredTimestampedKeyValueStoreWithHeaders<String, Long> store = new MeteredTimestampedKeyValueStoreWithHeaders<>( + inner, + STORE_TYPE, + new MockTime(), + Serdes.String(), + new ValueTimestampHeadersSerde<>(Serdes.Long()) + ); + store.init(context, inner); + + try { + store.put("key", ValueTimestampHeaders.make(42L, 60000, new RecordHeaders())); + } catch (final StreamsException exception) { + if (exception.getCause() instanceof ClassCastException) { + fail("Serdes are not correctly set from constructor parameters."); + } + throw exception; + } + } + + @SuppressWarnings("unused") + @Test + public void shouldTrackOpenIteratorsMetric() { + setUp(); + when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); + init(); + + final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); + assertNotNull(openIteratorsMetric); + + assertEquals(0L, (Long) openIteratorsMetric.metricValue()); + + try (final KeyValueIterator<String, ValueTimestampHeaders<String>> unused = metered.all()) { + assertEquals(1L, (Long) openIteratorsMetric.metricValue()); + } + + assertEquals(0L, (Long) openIteratorsMetric.metricValue()); + } + + @SuppressWarnings("unused") + @Test + public void shouldTimeIteratorDuration() { + setUp(); + when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); + init(); + + final KafkaMetric iteratorDurationAvgMetric = metric("iterator-duration-avg"); + final KafkaMetric iteratorDurationMaxMetric = metric("iterator-duration-max"); + assertNotNull(iteratorDurationAvgMetric); + assertNotNull(iteratorDurationMaxMetric); + + assertEquals(Double.NaN, (Double) iteratorDurationAvgMetric.metricValue()); + assertEquals(Double.NaN, (Double) iteratorDurationMaxMetric.metricValue()); + + try (final KeyValueIterator<String, ValueTimestampHeaders<String>> unused = metered.all()) { + // nothing to do, just close immediately + mockTime.sleep(2); + } + + assertEquals(2.0 * TimeUnit.MILLISECONDS.toNanos(1), (double) iteratorDurationAvgMetric.metricValue()); + assertEquals(2.0 * TimeUnit.MILLISECONDS.toNanos(1), (double) iteratorDurationMaxMetric.metricValue()); + + try (final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.all()) { + // nothing to do, just close immediately + mockTime.sleep(3); + } + + assertEquals(2.5 * TimeUnit.MILLISECONDS.toNanos(1), (double) iteratorDurationAvgMetric.metricValue()); + assertEquals(3.0 * TimeUnit.MILLISECONDS.toNanos(1), (double) iteratorDurationMaxMetric.metricValue()); + } + + @SuppressWarnings("unused") + @Test + public void shouldTrackOldestOpenIteratorTimestamp() { + setUp(); + when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); + init(); + + final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); + assertNotNull(oldestIteratorTimestampMetric); + + KeyValueIterator<String, ValueTimestampHeaders<String>> second = null; + final long secondTimestamp; + try { + try (final KeyValueIterator<String, ValueTimestampHeaders<String>> unused = metered.all()) { + + final long oldestTimestamp = mockTime.milliseconds(); + assertEquals(oldestTimestamp, (Long) oldestIteratorTimestampMetric.metricValue()); + mockTime.sleep(100); + + // open a second iterator before closing the first to test that we still produce the first iterator's timestamp + second = metered.all(); + secondTimestamp = mockTime.milliseconds(); + assertEquals(oldestTimestamp, (Long) oldestIteratorTimestampMetric.metricValue()); + mockTime.sleep(100); + } + + // now that the first iterator is closed, check that the timestamp has advanced to the still open second iterator + assertEquals(secondTimestamp, (Long) oldestIteratorTimestampMetric.metricValue()); + } finally { + if (second != null) { + second.close(); + } + } + // now that all iterators are closed, the metric should be zero + assertEquals(0L, (Long) oldestIteratorTimestampMetric.metricValue()); + } + + private static RecordHeaders makeHeaders() { + RecordHeaders headers = new RecordHeaders(); + headers.add("header-key", "header-value".getBytes()); + return headers; + } + + private static byte[] serializeValueTimestampHeaders(ValueTimestampHeaders<String> vth) { + ValueTimestampHeadersSerializer<String> serializer = new ValueTimestampHeadersSerializer<>(Serdes.String().serializer()); + return serializer.serialize("topic", vth); + } + + @SuppressWarnings("unchecked") + private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) { + final Serde<String> keySerde = mock(Serde.class); + final Serializer<String> keySerializer = mock(Serializer.class); + final Serde<ValueTimestampHeaders<String>> valueSerde = mock(Serde.class); + final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = mock(Deserializer.class); + final Serializer<ValueTimestampHeaders<String>> valueSerializer = mock(Serializer.class); + when(keySerde.serializer()).thenReturn(keySerializer); + // Mock both 2-arg and 3-arg versions of serialize - use lenient to avoid strict stubbing errors Review Comment: Why do we need the 2-arg one -- it seems we should use it, and if we do, it smells like a bug to me? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
