frankvicky commented on code in PR #21451:
URL: https://github.com/apache/kafka/pull/21451#discussion_r2798730468


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+/**
+ * A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used 
for recording operation metrics, and hence
+ * its inner KeyValueStore implementation does not need to provide its own 
metrics collecting functionality.
+ *
+ * The inner {@link KeyValueStore} of this class is of type <Bytes, 
byte[]>,
+ * hence we use {@link Serde}s to convert from <K, 
ValueTimestampHeaders<V>> to <Bytes, byte[]>.
+ *
+ * @param <K> key type
+ * @param <V> value type (wrapped in {@link ValueTimestampHeaders})
+ */
+public class MeteredTimestampedKeyValueStoreWithHeaders<K, V>
+    extends MeteredKeyValueStore<K, ValueTimestampHeaders<V>>
+    implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+    MeteredTimestampedKeyValueStoreWithHeaders(final KeyValueStore<Bytes, 
byte[]> inner,
+                                               final String metricScope,
+                                               final Time time,
+                                               final Serde<K> keySerde,
+                                               final 
Serde<ValueTimestampHeaders<V>> valueSerde) {
+        super(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Serde<ValueTimestampHeaders<V>> prepareValueSerdeForStore(final 
Serde<ValueTimestampHeaders<V>> valueSerde,
+                                                                        final 
SerdeGetter getter) {
+        if (valueSerde == null) {
+            return new ValueTimestampHeadersSerde<>((Serde<V>) 
getter.valueSerde());
+        } else {
+            return super.prepareValueSerdeForStore(valueSerde, getter);
+        }
+    }
+
+    @Override
+    public ValueTimestampHeaders<V> get(final K key) {
+        Objects.requireNonNull(key, "key cannot be null");
+        try {
+            return maybeMeasureLatency(() -> 
outerValue(wrapped().get(keyBytes(key))), time, getSensor);
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
+        }
+    }
+
+    @Override
+    public void put(final K key,
+                    final ValueTimestampHeaders<V> value) {
+        Objects.requireNonNull(key, "key cannot be null");
+        try {
+            Headers headers = value.headers();

Review Comment:
   `final`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+/**
+ * A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used 
for recording operation metrics, and hence
+ * its inner KeyValueStore implementation does not need to provide its own 
metrics collecting functionality.
+ *
+ * The inner {@link KeyValueStore} of this class is of type &lt;Bytes, 
byte[]&gt;,
+ * hence we use {@link Serde}s to convert from &lt;K, 
ValueTimestampHeaders&lt;V&gt;&gt; to &lt;Bytes, byte[]&gt;.
+ *
+ * @param <K> key type
+ * @param <V> value type (wrapped in {@link ValueTimestampHeaders})
+ */
+public class MeteredTimestampedKeyValueStoreWithHeaders<K, V>
+    extends MeteredKeyValueStore<K, ValueTimestampHeaders<V>>
+    implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+    MeteredTimestampedKeyValueStoreWithHeaders(final KeyValueStore<Bytes, 
byte[]> inner,
+                                               final String metricScope,
+                                               final Time time,
+                                               final Serde<K> keySerde,
+                                               final 
Serde<ValueTimestampHeaders<V>> valueSerde) {
+        super(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Serde<ValueTimestampHeaders<V>> prepareValueSerdeForStore(final 
Serde<ValueTimestampHeaders<V>> valueSerde,
+                                                                        final 
SerdeGetter getter) {
+        if (valueSerde == null) {
+            return new ValueTimestampHeadersSerde<>((Serde<V>) 
getter.valueSerde());
+        } else {
+            return super.prepareValueSerdeForStore(valueSerde, getter);
+        }
+    }
+
+    @Override
+    public ValueTimestampHeaders<V> get(final K key) {
+        Objects.requireNonNull(key, "key cannot be null");
+        try {
+            return maybeMeasureLatency(() -> 
outerValue(wrapped().get(keyBytes(key))), time, getSensor);
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
+        }
+    }
+
+    @Override
+    public void put(final K key,
+                    final ValueTimestampHeaders<V> value) {
+        Objects.requireNonNull(key, "key cannot be null");
+        try {
+            Headers headers = value.headers();
+            maybeMeasureLatency(() -> wrapped().put(keyBytes(key, headers), 
serdes.rawValue(value, headers)), time, putSensor);
+            maybeRecordE2ELatency();
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key, value);
+            throw new ProcessorStateException(message, e);
+        }
+    }
+
+    protected ValueTimestampHeaders<V> outerValue(final byte[] value) {
+        Headers headers = ValueTimestampHeadersDeserializer.headers(value);

Review Comment:
   `final`



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java:
##########
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.test.KeyValueIteratorStub;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class MeteredTimestampedKeyValueStoreWithHeadersTest {
+    private static final String APPLICATION_ID = "test-app";
+    private static final String STORE_NAME = "store-name";
+    private static final String STORE_TYPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String CHANGELOG_TOPIC = "changelog-topic-name";
+    private static final String THREAD_ID_TAG_KEY = "thread-id";
+    private static final String KEY = "key";
+    private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
+    private static final RecordHeaders HEADERS = makeHeaders();
+    private static final ValueTimestampHeaders<String> VALUE_TIMESTAMP_HEADERS 
=
+        ValueTimestampHeaders.make("value", 97L, HEADERS);
+    private static final byte[] VALUE_TIMESTAMP_HEADERS_BYTES = 
serializeValueTimestampHeaders(VALUE_TIMESTAMP_HEADERS);
+    private final String threadId = Thread.currentThread().getName();
+    private final TaskId taskId = new TaskId(0, 0, "My-Topology");
+    @Mock
+    private KeyValueStore<Bytes, byte[]> inner;
+    @Mock
+    private InternalProcessorContext<?, ?> context;
+    private MockTime mockTime;
+
+    private static final Map<String, Object> CONFIGS =
+        mkMap(mkEntry(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, 
APPLICATION_ID));
+
+    private MeteredTimestampedKeyValueStoreWithHeaders<String, String> metered;
+    private final KeyValue<Bytes, byte[]> byteKeyValueTimestampHeadersPair = 
KeyValue.pair(KEY_BYTES,
+        VALUE_TIMESTAMP_HEADERS_BYTES
+    );
+    private final Metrics metrics = new Metrics();
+    private Map<String, String> tags;
+
+    private void setUpWithoutContext() {
+        mockTime = new MockTime();
+        metered = new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            inner,
+            "scope",
+            mockTime,
+            Serdes.String(),
+            new ValueTimestampHeadersSerde<>(Serdes.String())
+        );
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry(THREAD_ID_TAG_KEY, threadId),
+            mkEntry("task-id", taskId.toString()),
+            mkEntry(STORE_TYPE + "-state-id", STORE_NAME)
+        );
+    }
+
+    private void setUp() {
+        setUpWithoutContext();
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.metrics())
+            .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
+        when(context.taskId()).thenReturn(taskId);
+        when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.appConfigs()).thenReturn(CONFIGS);
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private void setUpWithExpectSerdes() {
+        setUp();
+        when(context.keySerde()).thenReturn((Serde) Serdes.String());
+        when(context.valueSerde()).thenReturn((Serde) Serdes.Long());
+    }
+
+    private void init() {
+        metered.init(context, metered);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        setUp();
+        final MeteredTimestampedKeyValueStoreWithHeaders<String, String> outer 
= new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new ValueTimestampHeadersSerde<>(Serdes.String())
+        );
+        doNothing().when(inner).init(context, outer);
+        outer.init(context, outer);
+    }
+
+    @Test
+    public void shouldPassChangelogTopicNameToStateStoreSerde() {
+        setUp();
+        doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
+    }
+
+    @Test
+    public void 
shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
+        setUp();
+        final String defaultChangelogTopicName = 
ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, 
taskId.topologyName());
+        when(context.changelogFor(STORE_NAME)).thenReturn(null);
+        
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
+    }
+
+    @Test
+    public void testMetrics() {
+        setUp();
+        init();
+        final JmxReporter reporter = new JmxReporter();
+        final MetricsContext metricsContext = new 
KafkaMetricsContext("kafka.streams");
+        reporter.contextChange(metricsContext);
+
+        metrics.addReporter(reporter);
+        assertTrue(reporter.containsMbean(String.format(
+            "kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s",
+            STORE_LEVEL_GROUP,
+            THREAD_ID_TAG_KEY,
+            threadId,
+            taskId,
+            STORE_TYPE,
+            STORE_NAME
+        )));
+    }
+
+    @Test
+    public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
+        setUp();
+        doNothing().when(inner).put(any(Bytes.class), any(byte[].class));
+        init();
+
+        metered.put(KEY, VALUE_TIMESTAMP_HEADERS);
+
+        final KafkaMetric metric = metric("put-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldGetBytesFromInnerStoreAndReturnGetMetric() {
+        setUp();
+        
when(inner.get(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
+        init();
+
+        assertEquals(VALUE_TIMESTAMP_HEADERS, metered.get(KEY));
+
+        final KafkaMetric metric = metric("get-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() {
+        setUp();
+        when(inner.putIfAbsent(any(Bytes.class), 
any(byte[].class))).thenReturn(null);
+        init();
+
+        metered.putIfAbsent(KEY, VALUE_TIMESTAMP_HEADERS);
+
+        final KafkaMetric metric = metric("put-if-absent-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldPutAllToInnerStoreAndRecordPutAllMetric() {
+        setUp();
+        doNothing().when(inner).putAll(any(List.class));
+        init();
+
+        metered.putAll(Collections.singletonList(KeyValue.pair(KEY, 
VALUE_TIMESTAMP_HEADERS)));
+
+        final KafkaMetric metric = metric("put-all-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() {
+        setUp();
+        
when(inner.delete(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
+        init();
+
+        metered.delete(KEY);
+
+        final KafkaMetric metric = metric("delete-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() {
+        setUp();
+        when(inner.range(any(Bytes.class), any(Bytes.class))).thenReturn(
+            new 
KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampHeadersPair).iterator()));
+        init();
+
+        final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator 
= metered.range(KEY, KEY);
+        assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+
+        final KafkaMetric metric = metric("range-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldGetAllFromInnerStoreAndRecordAllMetric() {
+        setUp();
+        when(inner.all())
+            .thenReturn(new 
KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampHeadersPair).iterator()));
+        init();
+
+        final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator 
= metered.all();
+        assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value);
+        assertFalse(iterator.hasNext());
+        iterator.close();

Review Comment:
   ditto



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java:
##########
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.test.KeyValueIteratorStub;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class MeteredTimestampedKeyValueStoreWithHeadersTest {
+    private static final String APPLICATION_ID = "test-app";
+    private static final String STORE_NAME = "store-name";
+    private static final String STORE_TYPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String CHANGELOG_TOPIC = "changelog-topic-name";
+    private static final String THREAD_ID_TAG_KEY = "thread-id";
+    private static final String KEY = "key";
+    private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
+    private static final RecordHeaders HEADERS = makeHeaders();
+    private static final ValueTimestampHeaders<String> VALUE_TIMESTAMP_HEADERS 
=
+        ValueTimestampHeaders.make("value", 97L, HEADERS);
+    private static final byte[] VALUE_TIMESTAMP_HEADERS_BYTES = 
serializeValueTimestampHeaders(VALUE_TIMESTAMP_HEADERS);
+    private final String threadId = Thread.currentThread().getName();
+    private final TaskId taskId = new TaskId(0, 0, "My-Topology");
+    @Mock
+    private KeyValueStore<Bytes, byte[]> inner;
+    @Mock
+    private InternalProcessorContext<?, ?> context;
+    private MockTime mockTime;
+
+    private static final Map<String, Object> CONFIGS =
+        mkMap(mkEntry(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, 
APPLICATION_ID));
+
+    private MeteredTimestampedKeyValueStoreWithHeaders<String, String> metered;
+    private final KeyValue<Bytes, byte[]> byteKeyValueTimestampHeadersPair = 
KeyValue.pair(KEY_BYTES,
+        VALUE_TIMESTAMP_HEADERS_BYTES
+    );
+    private final Metrics metrics = new Metrics();
+    private Map<String, String> tags;
+
+    private void setUpWithoutContext() {
+        mockTime = new MockTime();
+        metered = new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            inner,
+            "scope",
+            mockTime,
+            Serdes.String(),
+            new ValueTimestampHeadersSerde<>(Serdes.String())
+        );
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry(THREAD_ID_TAG_KEY, threadId),
+            mkEntry("task-id", taskId.toString()),
+            mkEntry(STORE_TYPE + "-state-id", STORE_NAME)
+        );
+    }
+
+    private void setUp() {
+        setUpWithoutContext();
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.metrics())
+            .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
+        when(context.taskId()).thenReturn(taskId);
+        when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.appConfigs()).thenReturn(CONFIGS);
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private void setUpWithExpectSerdes() {
+        setUp();
+        when(context.keySerde()).thenReturn((Serde) Serdes.String());
+        when(context.valueSerde()).thenReturn((Serde) Serdes.Long());
+    }
+
+    private void init() {
+        metered.init(context, metered);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        setUp();
+        final MeteredTimestampedKeyValueStoreWithHeaders<String, String> outer 
= new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new ValueTimestampHeadersSerde<>(Serdes.String())
+        );
+        doNothing().when(inner).init(context, outer);
+        outer.init(context, outer);
+    }
+
+    @Test
+    public void shouldPassChangelogTopicNameToStateStoreSerde() {
+        setUp();
+        doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
+    }
+
+    @Test
+    public void 
shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
+        setUp();
+        final String defaultChangelogTopicName = 
ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, 
taskId.topologyName());
+        when(context.changelogFor(STORE_NAME)).thenReturn(null);
+        
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
+    }
+
+    @Test
+    public void testMetrics() {
+        setUp();
+        init();
+        final JmxReporter reporter = new JmxReporter();
+        final MetricsContext metricsContext = new 
KafkaMetricsContext("kafka.streams");
+        reporter.contextChange(metricsContext);
+
+        metrics.addReporter(reporter);
+        assertTrue(reporter.containsMbean(String.format(
+            "kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s",
+            STORE_LEVEL_GROUP,
+            THREAD_ID_TAG_KEY,
+            threadId,
+            taskId,
+            STORE_TYPE,
+            STORE_NAME
+        )));
+    }
+
+    @Test
+    public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
+        setUp();
+        doNothing().when(inner).put(any(Bytes.class), any(byte[].class));
+        init();
+
+        metered.put(KEY, VALUE_TIMESTAMP_HEADERS);
+
+        final KafkaMetric metric = metric("put-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldGetBytesFromInnerStoreAndReturnGetMetric() {
+        setUp();
+        
when(inner.get(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
+        init();
+
+        assertEquals(VALUE_TIMESTAMP_HEADERS, metered.get(KEY));
+
+        final KafkaMetric metric = metric("get-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() {
+        setUp();
+        when(inner.putIfAbsent(any(Bytes.class), 
any(byte[].class))).thenReturn(null);
+        init();
+
+        metered.putIfAbsent(KEY, VALUE_TIMESTAMP_HEADERS);
+
+        final KafkaMetric metric = metric("put-if-absent-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldPutAllToInnerStoreAndRecordPutAllMetric() {
+        setUp();
+        doNothing().when(inner).putAll(any(List.class));
+        init();
+
+        metered.putAll(Collections.singletonList(KeyValue.pair(KEY, 
VALUE_TIMESTAMP_HEADERS)));
+
+        final KafkaMetric metric = metric("put-all-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() {
+        setUp();
+        
when(inner.delete(any(Bytes.class))).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
+        init();
+
+        metered.delete(KEY);
+
+        final KafkaMetric metric = metric("delete-rate");
+        assertTrue((Double) metric.metricValue() > 0);
+    }
+
+    @Test
+    public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() {
+        setUp();
+        when(inner.range(any(Bytes.class), any(Bytes.class))).thenReturn(
+            new 
KeyValueIteratorStub<>(Collections.singletonList(byteKeyValueTimestampHeadersPair).iterator()));
+        init();
+
+        final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator 
= metered.range(KEY, KEY);
+        assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value);
+        assertFalse(iterator.hasNext());
+        iterator.close();

Review Comment:
   ```suggestion
           try (final KeyValueIterator<String, ValueTimestampHeaders<String>> 
iterator  = metered.range(KEY, KEY)) {
               assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value);
               assertFalse(iterator.hasNext());
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to