guozhangwang commented on a change in pull request #9388: URL: https://github.com/apache/kafka/pull/9388#discussion_r501347228
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -83,14 +85,40 @@ this.valueSerde = valueSerde; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { - this.context = context; + this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; taskId = context.taskId().toString(); initStoreSerde(context); streamsMetrics = (StreamsMetricsImpl) context.metrics(); + registerMetrics(); + final Sensor restoreSensor = + StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics); + + // register and possibly restore the state from the logs + maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor); + } + + @Override + public void init(final StateStoreContext context, + final StateStore root) { + this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; + taskId = context.taskId().toString(); + initStoreSerde(context); + streamsMetrics = (StreamsMetricsImpl) context.metrics(); + + registerMetrics(); + final Sensor restoreSensor = Review comment: Actually I think we should remove the `restoreSensor` since we no longer restore the state upon init any more? In KIP-444 we no longer have it as a state-store level metric. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ########## @@ -122,7 +158,7 @@ public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listene @Override public void put(final K key, final V value) { - put(key, value, context.timestamp()); + put(key, value, context != null ? context.timestamp() : 0L); Review comment: Thinking about this a bit more: is `this.context` only null in unit tests? It seems a bit overkill to let non-testing code to cope with testing code if yes.. Could we let the mock class to extend from InternalProcessorContext as well? ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java ########## @@ -68,26 +70,47 @@ this.time = time; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { - this.context = context; + this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; initStoreSerde(context); taskId = context.taskId().toString(); streamsMetrics = (StreamsMetricsImpl) context.metrics(); - putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics); - fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics); - flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics); - removeSensor = StateStoreMetrics.removeSensor(threadId, taskId, metricsScope, name(), streamsMetrics); - e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics); + registerMetrics(); final Sensor restoreSensor = StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics); // register and possibly restore the state from the logs maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor); } + @Override + public void init(final StateStoreContext context, + final StateStore root) { + this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; + initStoreSerde(context); + taskId = context.taskId().toString(); + streamsMetrics = (StreamsMetricsImpl) context.metrics(); + + registerMetrics(); + final Sensor restoreSensor = Review comment: Ditto here. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -83,14 +85,40 @@ this.valueSerde = valueSerde; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { - this.context = context; + this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; taskId = context.taskId().toString(); initStoreSerde(context); streamsMetrics = (StreamsMetricsImpl) context.metrics(); + registerMetrics(); Review comment: `this.context` seems only used in the e2e latency as ``` final long e2eLatency = currentTime - context.timestamp(); ``` And in that case we may throw a NPE. Should we augment the condition as ``` if (e2eLatencySensor.shouldRecord() && context != null) ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java ########## @@ -29,7 +29,9 @@ import org.apache.kafka.streams.kstream.internals.FullChangeSerde; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; Review comment: Why not import static the function directly like in other classes? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java ########## @@ -47,9 +48,42 @@ public static StreamsMetricsImpl getMetricsImpl(final ProcessorContext context) return (StreamsMetricsImpl) context.metrics(); } + /** + * Should be removed as part of KAFKA-10217 + */ + public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext context) { + return (StreamsMetricsImpl) context.metrics(); + } + public static String changelogFor(final ProcessorContext context, final String storeName) { return context instanceof InternalProcessorContext ? ((InternalProcessorContext) context).changelogFor(storeName) : ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); } + + public static String changelogFor(final StateStoreContext context, final String storeName) { + return context instanceof InternalProcessorContext + ? ((InternalProcessorContext) context).changelogFor(storeName) + : ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); + } + + public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) { + if (context instanceof InternalProcessorContext) { + return (InternalProcessorContext) context; + } else { + throw new IllegalArgumentException( + "This component requires internal features of Kafka Streams and must be disabled for unit tests." + ); + } + } Review comment: Thanks! ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -83,14 +85,40 @@ this.valueSerde = valueSerde; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { - this.context = context; + this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; taskId = context.taskId().toString(); initStoreSerde(context); streamsMetrics = (StreamsMetricsImpl) context.metrics(); + registerMetrics(); Review comment: NVM, I saw you already did this :) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org