Hi,
I think it **might** be related to this:
final Serializer<HttpSession> httpSessionSerializer = new
JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", Http.class);
httpSessionSerializer.configure(serdeProps, false);
final Deserializer<HttpSession> httpSessionDeserializer = new
JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", Http.class);
httpSessionDeserializer.configure(serdeProps, false);
Shouldn't the class be HttpSession.class ?
On Wed, 2 May 2018 at 16:12 Conrad Crampton <[email protected]>
wrote:
> I'm trying to window over http logs and create an HttpSession i.e. a list
> of http requests (and some other properties). However when in my aggregate
> Merger part (I think) I'm getting a classcastexception I think in when my
> sessions are being merged and cannot for the life of me work out why.
> The exception is at the bottom and I think the relevant code is here.
> Can anyone give a suggestion as to why Http is trying to be cast to
> HttpSession?
> Thanks
>
>
> final Serializer<Http> httpSerializer = new JsonPOJOSerializer<>();
> serdeProps.put("JsonPOJOClass", Http.class);
> httpSerializer.configure(serdeProps, false);
>
> final Deserializer<Http> httpDeserializer = new
> JsonPOJODeserializer<>();
> serdeProps.put("JsonPOJOClass", Http.class);
> httpDeserializer.configure(serdeProps, false);
>
> final Serde<Http> httpSerde = Serdes.serdeFrom(httpSerializer,
> httpDeserializer);
>
> final Serializer<HttpSession> httpSessionSerializer = new
> JsonPOJOSerializer<>();
> serdeProps.put("JsonPOJOClass", Http.class);
> httpSessionSerializer.configure(serdeProps, false);
>
> final Deserializer<HttpSession> httpSessionDeserializer = new
> JsonPOJODeserializer<>();
> serdeProps.put("JsonPOJOClass", Http.class);
> httpSessionDeserializer.configure(serdeProps, false);
>
> final Serde<HttpSession> httpSessionSerde =
> Serdes.serdeFrom(httpSessionSerializer, httpSessionDeserializer);
>
> StreamsBuilder builder = new StreamsBuilder();
>
> KStream<String, HttpSession> httpStream = null;
> try {
> httpStream = builder.stream(
> config.getString(ConfigConstants.HTTP_TOPIC_KEY),
> Consumed.with(Serdes.String(), httpSerde))
> .selectKey((s, http) -> http.getClient() +
> http.getSourceIp() + http.getUseragent())
> .groupByKey(Serialized.with(Serdes.String(),
> httpSerde))
> // window by session
>
> .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(10)))
> .aggregate(
> new Initializer<HttpSession>() {
> @Override
> public HttpSession apply() {
> return new HttpSession();
> }
> },
> new Aggregator<String, Http, HttpSession>() {
> @Override
> public HttpSession apply(String s, Http
> http, HttpSession session) {
> return session.addRequest(http);
> }
> },
> new Merger<String, HttpSession>() {
> @Override
> public HttpSession apply(String s,
> HttpSession session, HttpSession v1)
> log.debug("merging key {}, session {}
> with other {}", s, session, v1);
> return session.merge(v1);}
> },
> Materialized.<String, HttpSession,
> SessionStore<Bytes,
> byte[]>>as(config.getString(StreamsConfig.APPLICATION_ID_CONFIG) +
>
> "-session-store").withKeySerde(Serdes.String()).withValueSerde(httpSessionSerde)
> ).toStream((stringWindowed, session) ->
> (stringWindowed.key()));
> } catch (Exception e) {
> e.printStackTrace();
> }
>
> httpStream
> .filter((key, message) -> message != null)
> .filter((key, message) -> message.getClient() != null)
> .filter((key, message) ->
> httpClients.stream().anyMatch(message.getClient()::equals))
> .foreach((key, message) -> {
> log.info("message {}", message);
> });
>
> final KafkaStreams streams = new KafkaStreams(builder.build(),
> props);
> streams.start();
>
> java.lang.ClassCastException: com.secdata.gi.graph.model.Http cannot be
> cast to com.secdata.gi.graph.model.HttpSession
> at com.secdata.gi.graph.Process$$Lambda$45/1474607212.apply(Unknown Source)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:157)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:154)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
>
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
>
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
> at
>
> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
> at
>
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:176)
> at
>
> org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)
> at
>
> org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
> at
>
> org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
> at
>
> org.apache.kafka.streams.state.internals.CachingSessionStore.close(CachingSessionStore.java:201)
> at
>
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:275)
> at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:238)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:450)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:532)
> at
>
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:500)
> at
>
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:493)
> at
>
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:212)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1213)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:755)
>