guozhangwang commented on a change in pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#discussion_r501347228



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -83,14 +85,40 @@
         this.valueSerde = valueSerde;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? 
(InternalProcessorContext) context : null;
         taskId = context.taskId().toString();
         initStoreSerde(context);
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
+        registerMetrics();
+        final Sensor restoreSensor =
+            StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, 
name(), streamsMetrics);
+
+        // register and possibly restore the state from the logs
+        maybeMeasureLatency(() -> super.init(context, root), time, 
restoreSensor);
+    }
+
+    @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        this.context = context instanceof InternalProcessorContext ? 
(InternalProcessorContext) context : null;
+        taskId = context.taskId().toString();
+        initStoreSerde(context);
+        streamsMetrics = (StreamsMetricsImpl) context.metrics();
+
+        registerMetrics();
+        final Sensor restoreSensor =

Review comment:
       Actually I think we should remove the `restoreSensor` since we no longer 
restore the state upon init any more? In KIP-444 we no longer have it as a 
state-store level metric.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -122,7 +158,7 @@ public boolean setFlushListener(final 
CacheFlushListener<Windowed<K>, V> listene
     @Override
     public void put(final K key,
                     final V value) {
-        put(key, value, context.timestamp());
+        put(key, value, context != null ? context.timestamp() : 0L);

Review comment:
       Thinking about this a bit more: is `this.context` only null in unit 
tests? It seems a bit overkill to let non-testing code to cope with testing 
code if yes..
   
   Could we let the mock class to extend from InternalProcessorContext as well?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -68,26 +70,47 @@
         this.time = time;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? 
(InternalProcessorContext) context : null;
         initStoreSerde(context);
         taskId = context.taskId().toString();
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
-        putSensor = StateStoreMetrics.putSensor(threadId, taskId, 
metricsScope, name(), streamsMetrics);
-        fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, 
metricsScope, name(), streamsMetrics);
-        flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, 
metricsScope, name(), streamsMetrics);
-        removeSensor = StateStoreMetrics.removeSensor(threadId, taskId, 
metricsScope, name(), streamsMetrics);
-        e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, 
metricsScope, name(), streamsMetrics);
+        registerMetrics();
         final Sensor restoreSensor =
             StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, 
name(), streamsMetrics);
 
         // register and possibly restore the state from the logs
         maybeMeasureLatency(() -> super.init(context, root), time, 
restoreSensor);
     }
 
+    @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        this.context = context instanceof InternalProcessorContext ? 
(InternalProcessorContext) context : null;
+        initStoreSerde(context);
+        taskId = context.taskId().toString();
+        streamsMetrics = (StreamsMetricsImpl) context.metrics();
+
+        registerMetrics();
+        final Sensor restoreSensor =

Review comment:
       Ditto here.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -83,14 +85,40 @@
         this.valueSerde = valueSerde;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? 
(InternalProcessorContext) context : null;
         taskId = context.taskId().toString();
         initStoreSerde(context);
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
+        registerMetrics();

Review comment:
       `this.context` seems only used in the e2e latency as
   
   ```
   final long e2eLatency =  currentTime - context.timestamp();
   ```
   
   And in that case we may throw a NPE. Should we augment the condition as 
   
   ```
   if (e2eLatencySensor.shouldRecord() && context != null)
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -29,7 +29,9 @@
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;

Review comment:
       Why not import static the function directly like in other classes?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
##########
@@ -47,9 +48,42 @@ public static StreamsMetricsImpl getMetricsImpl(final 
ProcessorContext context)
         return (StreamsMetricsImpl) context.metrics();
     }
 
+    /**
+     * Should be removed as part of KAFKA-10217
+     */
+    public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext 
context) {
+        return (StreamsMetricsImpl) context.metrics();
+    }
+
     public static String changelogFor(final ProcessorContext context, final 
String storeName) {
         return context instanceof InternalProcessorContext
             ? ((InternalProcessorContext) context).changelogFor(storeName)
             : 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
     }
+
+    public static String changelogFor(final StateStoreContext context, final 
String storeName) {
+        return context instanceof InternalProcessorContext
+            ? ((InternalProcessorContext) context).changelogFor(storeName)
+            : 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
+    }
+
+    public static InternalProcessorContext asInternalProcessorContext(final 
ProcessorContext context) {
+        if (context instanceof InternalProcessorContext) {
+            return (InternalProcessorContext) context;
+        } else {
+            throw new IllegalArgumentException(
+                "This component requires internal features of Kafka Streams 
and must be disabled for unit tests."
+            );
+        }
+    }

Review comment:
       Thanks!

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -83,14 +85,40 @@
         this.valueSerde = valueSerde;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? 
(InternalProcessorContext) context : null;
         taskId = context.taskId().toString();
         initStoreSerde(context);
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
+        registerMetrics();

Review comment:
       NVM, I saw you already did this :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to