I'm not sure why you observed that aggregation works ok if String typed key
is used. I think I agree with Radek that the problem comes from the value,
and here is my understanding:
1. The source stream read from the topic named "rtDetailLines" is in type
2. After the map
Here's the solution (props to Damian G)
JsonSerializer keySerializer = new JsonSerializer<>();
JsonDeserializer keyDeserializer = new
JsonDeserializer<>(AggKey.class);
Serde keySerde = Serdes.serdeFrom(keySerializer, keyDeserializer);
then for the aggregator call 'groupByKey(keySerde,
Hmm. That's odd as the aggregation works ok if I use a String value for the
key (and the corresponding String serde).
This error only started occurring when I tried to substitute my 'custom'
key for the original String.
On Tue, Dec 6, 2016 at 12:24 PM, Radek Gruchalski
Yeah, I knew that already, this part of the error:
> > >>> > > org.apache.kafka.streams.processor.internals.
> RecordCollector.send(
> > >>> > RecordCollector.java:73)
points to this line:
0.10.1.0
On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski
wrote:
> Jon,
>
> Are you using 0.10.1 or 0.10.0.1?
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com)
> wrote:
>
> Hi Jon,
>
Jon,
Are you using 0.10.1 or 0.10.0.1?
–
Best regards,
Radek Gruchalski
ra...@gruchalski.com
On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com) wrote:
Hi Jon,
At a glance the code looks ok, i.e, i believe the aggregate() should have
picked up the default Serde set in your
Hi Jon,
At a glance the code looks ok, i.e, i believe the aggregate() should have
picked up the default Serde set in your StreamsConfig. However, you could
try adding the Serdes to the groupBy(..)
i.e.,
rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)
Thanks,
Damian
On Tue,
It's just a bunch of public 'int' and 'String' values. There's an empty
constructor and a copy constructor.
For functions I override 'equals' and the requirements for 'serde' (close,
configure, serializer and deserializer).
@Override
public Serializer serializer() {
JsonSerializer
Do you mind sharing the code of AggKey class?
–
Best regards,
Radek Gruchalski
ra...@gruchalski.com
On December 6, 2016 at 7:26:51 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:
The 2nd.
On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski
wrote:
> Is the error
The 2nd.
On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski
wrote:
> Is the error happening at this stage?
>
> KStream rtRekey = rtDetailLines.map((key, value) ->
> new KeyValue<>(new AggKey(value), value));
>
> or here:
>
> KTable
Is the error happening at this stage?
KStream rtRekey = rtDetailLines.map((key, value) ->
new KeyValue<>(new AggKey(value), value));
or here:
KTable ktRtDetail =
rtRekey.groupByKey().aggregate(
BqRtDetailLogLine_aggregate::new,
new
If I comment out the aggregation step and just .print the .map step I don't
hit the error. It's coming from aggregating the non-String key.
On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski
wrote:
> Jon,
>
> Looking at your code:
>
>
Jon,
Looking at your code:
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
and later:
KStream rtDetailLines =
kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
Is RtDetailLogLine inheriting from String? It is not, as
Using 0.10.1.0
This is my topology:
Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
Hi Jon,
A couple of things: Which version are you using?
Can you share the code you are using to the build the topology?
Thanks,
Damian
On Tue, 6 Dec 2016 at 14:44 Jon Yeargers wrote:
> Im using .map to convert my (k/v) string/Object to Object/Object but when I
>
Im using .map to convert my (k/v) string/Object to Object/Object but when I
chain this to an aggregation step Im getting this exception:
Exception in thread "StreamThread-1" java.lang.ClassCastException:
com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
java.lang.String
at
16 matches
Mail list logo