I'm trying to window over http logs and create an HttpSession i.e. a list of http requests (and some other properties). However when in my aggregate Merger part (I think) I'm getting a classcastexception I think in when my sessions are being merged and cannot for the life of me work out why. The exception is at the bottom and I think the relevant code is here. Can anyone give a suggestion as to why Http is trying to be cast to HttpSession? Thanks
final Serializer<Http> httpSerializer = new JsonPOJOSerializer<>(); serdeProps.put("JsonPOJOClass", Http.class); httpSerializer.configure(serdeProps, false); final Deserializer<Http> httpDeserializer = new JsonPOJODeserializer<>(); serdeProps.put("JsonPOJOClass", Http.class); httpDeserializer.configure(serdeProps, false); final Serde<Http> httpSerde = Serdes.serdeFrom(httpSerializer, httpDeserializer); final Serializer<HttpSession> httpSessionSerializer = new JsonPOJOSerializer<>(); serdeProps.put("JsonPOJOClass", Http.class); httpSessionSerializer.configure(serdeProps, false); final Deserializer<HttpSession> httpSessionDeserializer = new JsonPOJODeserializer<>(); serdeProps.put("JsonPOJOClass", Http.class); httpSessionDeserializer.configure(serdeProps, false); final Serde<HttpSession> httpSessionSerde = Serdes.serdeFrom(httpSessionSerializer, httpSessionDeserializer); StreamsBuilder builder = new StreamsBuilder(); KStream<String, HttpSession> httpStream = null; try { httpStream = builder.stream( config.getString(ConfigConstants.HTTP_TOPIC_KEY), Consumed.with(Serdes.String(), httpSerde)) .selectKey((s, http) -> http.getClient() + http.getSourceIp() + http.getUseragent()) .groupByKey(Serialized.with(Serdes.String(), httpSerde)) // window by session .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(10))) .aggregate( new Initializer<HttpSession>() { @Override public HttpSession apply() { return new HttpSession(); } }, new Aggregator<String, Http, HttpSession>() { @Override public HttpSession apply(String s, Http http, HttpSession session) { return session.addRequest(http); } }, new Merger<String, HttpSession>() { @Override public HttpSession apply(String s, HttpSession session, HttpSession v1) log.debug("merging key {}, session {} with other {}", s, session, v1); return session.merge(v1);} }, Materialized.<String, HttpSession, SessionStore<Bytes, byte[]>>as(config.getString(StreamsConfig.APPLICATION_ID_CONFIG) + "-session-store").withKeySerde(Serdes.String()).withValueSerde(httpSessionSerde) ).toStream((stringWindowed, session) -> (stringWindowed.key())); } catch (Exception e) { e.printStackTrace(); } httpStream .filter((key, message) -> message != null) .filter((key, message) -> message.getClient() != null) .filter((key, message) -> httpClients.stream().anyMatch(message.getClient()::equals)) .foreach((key, message) -> { log.info("message {}", message); }); final KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); java.lang.ClassCastException: com.secdata.gi.graph.model.Http cannot be cast to com.secdata.gi.graph.model.HttpSession at com.secdata.gi.graph.Process$$Lambda$45/1474607212.apply(Unknown Source) at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:157) at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:154) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:176) at org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38) at org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196) at org.apache.kafka.streams.state.internals.CachingSessionStore.close(CachingSessionStore.java:201) at org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:275) at org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:238) at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:450) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:532) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:500) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:493) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:212) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1213) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:755)