Justin Fetherolf created KAFKA-8630: ---------------------------------------
Summary: Unit testing a streams processor with a WindowStore throws a ClassCastException Key: KAFKA-8630 URL: https://issues.apache.org/jira/browse/KAFKA-8630 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.3.0 Reporter: Justin Fetherolf I was attempting to write a unit test for a class implementing the {{Processor}} interface that contained a {{WindowStore}}, but running the test fails with a {{ClassCastException}} coming out of {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to {{InternalProcessorContext}}. Minimal code to reproduce: {{package com.cantgetthistowork; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.WindowStore; public class InMemWindowProcessor implements Processor<String, String> \{ private ProcessorContext context; private WindowStore<String, String> windowStore; @Override public void init(ProcessorContext context) { this.context = context; windowStore = (WindowStore<String, String>) context.getStateStore("my-win-store"); } @Override public void process(String key, String value) \{ } @Override public void close() \{ } }}} {code:java} package com.cantgetthistowork; import java.time.Duration; import java.time.Instant; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; import org.junit.Before; import org.junit.Test; public class InMemWindowProcessorTest { InMemWindowProcessor processor = null; MockProcessorContext context = null; @Before public void setup() { processor = new InMemWindowProcessor(); context = new MockProcessorContext(); WindowStore<String, String> store = Stores.windowStoreBuilder( Stores.inMemoryWindowStore( "my-win-store", Duration.ofMinutes(10), Duration.ofSeconds(10), false ), Serdes.String(), Serdes.String() ) .withLoggingDisabled() .build(); store.init(context, store); context.register(store, null); processor.init(context); } @Test public void testThings() { Instant baseTime = Instant.now(); context.setTimestamp(baseTime.toEpochMilli()); context.setTopic("topic-name"); processor.process("key1", "value1"); } } {code} I was trying this with maven, with mvn --version outputting: {noformat} Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-03T13:39:06-06:00) Maven home: ~/opt/apache-maven-3.5.0 Java version: 1.8.0_212, vendor: Oracle Corporation Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre Default locale: en_US, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: "unix"{noformat} And finally the stack trace: {noformat} ------------------------------------------------------- T E S T S ------------------------------------------------------- Running com.cantgetthistowork.InMemWindowProcessorTest SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< FAILURE! testThings(com.cantgetthistowork.InMemWindowProcessorTest) Time elapsed: 0.05 sec <<< ERROR! java.lang.ClassCastException: org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to org.apache.kafka.streams.processor.internals.InternalProcessorContext at org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90) at com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189) at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165) at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75) Results : Tests in error: testThings(com.cantgetthistowork.InMemWindowProcessorTest): org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to org.apache.kafka.streams.processor.internals.InternalProcessorContext Tests run: 1, Failures: 0, Errors: 1, Skipped: 0{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)