[ https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880923#comment-16880923 ]
Justin Fetherolf commented on KAFKA-8630: ----------------------------------------- The `ProcessorContext` interface already offers a `metrics()` method, and `InternalProcessorContext` for some reason overrides it to return a `StreamsMetricsImpl` rather than `StreamsMetrics`. The bit of poking around in the code I've done this evening it seems to be kind of a mixed bag of stores needing just this `metrics()` method (and thus not needing to do the cast) and needing the other methods that `InternalProcessorContext` offers. Is the possibly pre-existing KIP you're thinking of [KIP-448|https://cwiki.apache.org/confluence/x/SAeZBg]? > Unit testing a streams processor with a WindowStore throws a > ClassCastException > ------------------------------------------------------------------------------- > > Key: KAFKA-8630 > URL: https://issues.apache.org/jira/browse/KAFKA-8630 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils > Affects Versions: 2.3.0 > Reporter: Justin Fetherolf > Priority: Major > > I was attempting to write a unit test for a class implementing the > {{Processor}} interface that contained a {{WindowStore}}, but running the > test fails with a {{ClassCastException}} coming out of > {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to > {{InternalProcessorContext}}. > Minimal code to reproduce: > {code:java} > package com.cantgetthistowork; > import org.apache.kafka.streams.processor.Processor; > import org.apache.kafka.streams.processor.ProcessorContext; > import org.apache.kafka.streams.state.WindowStore; > public class InMemWindowProcessor implements Processor<String, String> { > private ProcessorContext context; > private WindowStore<String, String> windowStore; > @Override > public void init(ProcessorContext context) { > this.context = context; > windowStore = (WindowStore<String, String>) > context.getStateStore("my-win-store"); > } > @Override > public void process(String key, String value) { > } > @Override > public void close() { > } > } > {code} > {code:java} > package com.cantgetthistowork; > import java.time.Duration; > import java.time.Instant; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.streams.processor.MockProcessorContext; > import org.apache.kafka.streams.state.Stores; > import org.apache.kafka.streams.state.WindowStore; > import org.junit.Before; > import org.junit.Test; > public class InMemWindowProcessorTest { > InMemWindowProcessor processor = null; > MockProcessorContext context = null; > @Before > public void setup() { > processor = new InMemWindowProcessor(); > context = new MockProcessorContext(); > WindowStore<String, String> store = > Stores.windowStoreBuilder( > Stores.inMemoryWindowStore( > "my-win-store", > Duration.ofMinutes(10), > Duration.ofSeconds(10), > false > ), > Serdes.String(), > Serdes.String() > ) > .withLoggingDisabled() > .build(); > store.init(context, store); > context.register(store, null); > processor.init(context); > } > @Test > public void testThings() { > Instant baseTime = Instant.now(); > context.setTimestamp(baseTime.toEpochMilli()); > context.setTopic("topic-name"); > processor.process("key1", "value1"); > } > } > {code} > > I was trying this with maven, with mvn --version outputting: > {noformat} > Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; > 2017-04-03T13:39:06-06:00) > Maven home: ~/opt/apache-maven-3.5.0 > Java version: 1.8.0_212, vendor: Oracle Corporation > Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre > Default locale: en_US, platform encoding: UTF-8 > OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: > "unix"{noformat} > And finally the stack trace: > {noformat} > ------------------------------------------------------- > T E S T S > ------------------------------------------------------- > Running com.cantgetthistowork.InMemWindowProcessorTest > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< > FAILURE! > testThings(com.cantgetthistowork.InMemWindowProcessorTest) Time elapsed: > 0.05 sec <<< ERROR! > java.lang.ClassCastException: > org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to > org.apache.kafka.streams.processor.internals.InternalProcessorContext > at > org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90) > at > com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189) > at > org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165) > at > org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75) > Results : > Tests in error: > testThings(com.cantgetthistowork.InMemWindowProcessorTest): > org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to > org.apache.kafka.streams.processor.internals.InternalProcessorContext > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0{noformat} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)