Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Yeah, I knew that already, this part of the error:

> > >>> > > org.apache.kafka.streams.processor.internals.
> RecordCollector.send(
> > >>> > RecordCollector.java:73)

points to this line:
https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java#L73

which means that your error happens on the value, not the key.

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 9:18:53 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

0.10.1.0

On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski <ra...@gruchalski.com>
wrote:

> Jon,
>
> Are you using 0.10.1 or 0.10.0.1?
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com)
> wrote:
>
> Hi Jon,
>
> At a glance the code looks ok, i.e, i believe the aggregate() should have
> picked up the default Serde set in your StreamsConfig. However, you could
> try adding the Serdes to the groupBy(..)
>
> i.e.,
> rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)
>
> Thanks,
> Damian
>
> On Tue, 6 Dec 2016 at 18:42 Jon Yeargers <jon.yearg...@cedexis.com>
wrote:
>
> > It's just a bunch of public 'int' and 'String' values. There's an empty
> > constructor and a copy constructor.
> >
> > For functions I override 'equals' and the requirements for 'serde'
> (close,
> > configure, serializer and deserializer).
> >
> > @Override
> > public Serializer serializer() {
> > JsonSerializer jsonSerializer = new JsonSerializer<>();
> > return jsonSerializer;
> > }
> >
> > @Override
> > public Deserializer deserializer() {
> > JsonDeserializer jsonDeserializer = new
> > JsonDeserializer<>();
> > return jsonDeserializer;
> > }
> >
> >
> >
> >
> > Which relates to:
> >
> > public class JsonSerializer implements Serializer {
> >
> > private Gson gson = new Gson();
> >
> > @Override
> > public void configure(Map<String, ?> map, boolean b) {
> >
> > }
> >
> > @Override
> > public byte[] serialize(String topic, T t) {
> > return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> > }
> >
> > @Override
> > public void close() {
> >
> > }
> > }
> >
> >
> >
> > public class JsonDeserializer implements Deserializer {
> >
> > private Gson gson = new Gson();
> > private Class deserializedClass;
> >
> > public JsonDeserializer(Class deserializedClass) {
> > this.deserializedClass = deserializedClass;
> > }
> >
> > public JsonDeserializer() {
> > }
> >
> > @Override
> > @SuppressWarnings("unchecked")
> > public void configure(Map<String, ?> map, boolean b) {
> > if(deserializedClass == null) {
> > deserializedClass = (Class) map.get("serializedClass");
> > }
> > }
> >
> > @Override
> > public T deserialize(String s, byte[] bytes) {
> > if(bytes == null){
> > return null;
> > }
> >
> > return gson.fromJson(new String(bytes),deserializedClass);
> >
> > }
> >
> > @Override
> > public void close() {
> >
> > }
> > }
> >
> > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski <ra...@gruchalski.com>

> > wrote:
> >
> > > Do you mind sharing the code of AggKey class?
> > >
> > > –
> > > Best regards,
> > > Radek Gruchalski
> > > ra...@gruchalski.com
> > >
> > >
> > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> > jon.yearg...@cedexis.com)
> > > wrote:
> > >
> > > The 2nd.
> > >
> > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <
> ra...@gruchalski.com>
> > > wrote:
> > >
> > >> Is the error happening at this stage?
> > >>
> > >> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key,
> > value)
> > >> -> new KeyValue<>(new AggKey(value), value));
> > >>
> > >> or here:
> > >>
> > >> KTable<Windowed, BqRtDetailLogLine_aggregate> ktRtDetail =
> > >> rtRekey.groupByKey().aggregate(
> > >> BqRtDetailLogLine_aggregate::new,
> > >> new PRTAggregate(),
> > >> TimeWindows.of(60 * 60 * 1000L),
> > >> collectorSerde, "prt_minute_agg_stream");
> > >&g

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Jon,

Are you using 0.10.1 or 0.10.0.1?

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com) wrote:

Hi Jon,

At a glance the code looks ok, i.e, i believe the aggregate() should have
picked up the default Serde set in your StreamsConfig. However, you could
try adding the Serdes to the groupBy(..)

i.e.,
rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)

Thanks,
Damian

On Tue, 6 Dec 2016 at 18:42 Jon Yeargers <jon.yearg...@cedexis.com> wrote:

> It's just a bunch of public 'int' and 'String' values. There's an empty
> constructor and a copy constructor.
>
> For functions I override 'equals'  and the requirements for 'serde' (close,
> configure, serializer and deserializer).
>
>  @Override
> public Serializer serializer() {
> JsonSerializer jsonSerializer = new JsonSerializer<>();
> return jsonSerializer;
> }
>
> @Override
> public Deserializer deserializer() {
> JsonDeserializer jsonDeserializer = new
> JsonDeserializer<>();
> return jsonDeserializer;
> }
>
>
>
>
> Which relates to:
>
> public class JsonSerializer implements Serializer {
>
> private Gson gson = new Gson();
>
> @Override
> public void configure(Map<String, ?> map, boolean b) {
>
> }
>
> @Override
> public byte[] serialize(String topic, T t) {
> return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> }
>
> @Override
> public void close() {
>
> }
> }
>
>
>
> public class JsonDeserializer implements Deserializer {
>
> private Gson gson = new Gson();
> private Class deserializedClass;
>
> public JsonDeserializer(Class deserializedClass) {
> this.deserializedClass = deserializedClass;
> }
>
> public JsonDeserializer() {
> }
>
> @Override
> @SuppressWarnings("unchecked")
> public void configure(Map<String, ?> map, boolean b) {
> if(deserializedClass == null) {
> deserializedClass = (Class) map.get("serializedClass");
> }
> }
>
> @Override
> public T deserialize(String s, byte[] bytes) {
> if(bytes == null){
> return null;
> }
>
> return gson.fromJson(new String(bytes),deserializedClass);
>
> }
>
> @Override
> public void close() {
>
> }
> }
>
> On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski <ra...@gruchalski.com>
> wrote:
>
> > Do you mind sharing the code of AggKey class?
> >
> > –
> > Best regards,
> > Radek Gruchalski
> > ra...@gruchalski.com
> >
> >
> > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> > wrote:
> >
> > The 2nd.
> >
> > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <ra...@gruchalski.com>
> > wrote:
> >
> >> Is the error happening at this stage?
> >>
> >> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key,
> value)
> >>  -> new KeyValue<>(new AggKey(value), value));
> >>
> >> or here:
> >>
> >> KTable<Windowed, BqRtDetailLogLine_aggregate> ktRtDetail =
> >> rtRekey.groupByKey().aggregate(
> >> BqRtDetailLogLine_aggregate::new,
> >> new PRTAggregate(),
> >> TimeWindows.of(60 * 60 * 1000L),
> >> collectorSerde, "prt_minute_agg_stream");
> >>
> >> –
> >>
> >> Best regards,
> >> Radek Gruchalski
> >> ra...@gruchalski.com
> >>
> >>
> >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> >> wrote:
> >>
> >> If I comment out the aggregation step and just .print the .map step I
> >> don't hit the error. It's coming from aggregating the non-String key.
> >>
> >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <ra...@gruchalski.com>
> >> wrote:
> >>
> >>> Jon,
> >>>
> >>> Looking at your code:
> >>>
> >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>
> >>> and later:
> >>>
> >>> KStream<String, RtDetailLogLine> rtDetailLines =
> >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> >>>
> >>> Is RtDetailLogLine inheriting from String? It is not, as the error
> >

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Do you mind sharing the code of AggKey class?

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 7:26:51 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

The 2nd.

On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <ra...@gruchalski.com>
wrote:

> Is the error happening at this stage?
>
> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) ->
> new KeyValue<>(new AggKey(value), value));
>
> or here:
>
> KTable<Windowed, BqRtDetailLogLine_aggregate> ktRtDetail =
> rtRekey.groupByKey().aggregate(
> BqRtDetailLogLine_aggregate::new,
> new PRTAggregate(),
> TimeWindows.of(60 * 60 * 1000L),
> collectorSerde, "prt_minute_agg_stream");
>
> –
>
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> If I comment out the aggregation step and just .print the .map step I
> don't hit the error. It's coming from aggregating the non-String key.
>
> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <ra...@gruchalski.com>
> wrote:
>
>> Jon,
>>
>> Looking at your code:
>>
>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass().getName());
>>
>> and later:
>>
>> KStream<String, RtDetailLogLine> rtDetailLines =
>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>
>> Is RtDetailLogLine inheriting from String? It is not, as the error
>> suggests.
>> You may have to write your own Serializer / Deserializer for
>> RtDetailLogLine.
>>
>> –
>> Best regards,
>> Radek Gruchalski
>> ra...@gruchalski.com
>>
>>
>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
>> wrote:
>>
>> Using 0.10.1.0
>>
>> This is my topology:
>>
>> Properties config = new Properties();
>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass().getName());
>>
>> JsonSerializer sumRecordsSerializer = new
>> JsonSerializer<>();
>> JsonDeserializer sumRecordsDeserializer =
>> new
>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
>> Serde collectorSerde =
>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>>
>> StringSerializer stringSerializer = new StringSerializer();
>> StringDeserializer stringDeserializer = new StringDeserializer();
>> Serde stringSerde =
>> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>>
>> JsonDeserializer prtRecordDeserializer = new
>> JsonDeserializer<>(RtDetailLogLine.class);
>> JsonSerializer prtRecordJsonSerializer = new
>> JsonSerializer<>();
>> Serde prtRecordSerde =
>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>>
>> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>>
>> KStream<String, RtDetailLogLine> rtDetailLines =
>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>
>> // change the keying
>> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value)
>> -> new KeyValue<>(new AggKey(value), value));
>>
>> KTable<Windowed, BqRtDetailLogLine_aggregate> ktRtDetail =
>> rtRekey.groupByKey().aggregate(
>> BqRtDetailLogLine_aggregate::new,
>> new PRTAggregate(),
>> TimeWindows.of(60 * 60 * 1000L),
>> collectorSerde, "prt_minute_agg_stream");
>>
>> ktRtDetail.toStream().print();
>>
>> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
>>
>> kafkaStreams.start();
>>
>>
>> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian@gmail.com> wrote:
>>
>> > Hi Jon,
>> >
>> > A couple of things: Which version are you using?
>> > Can you share the code you are using to the build the topology?
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com>
>> wrote:
>> >
>> > > Im using .map to convert my (k/v) string/Object to Object/Object but
>> > when I
>> > > chain this to a

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Is the error happening at this stage?

KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) ->
new KeyValue<>(new AggKey(value), value));

or here:

KTable<Windowed, BqRtDetailLogLine_aggregate> ktRtDetail =
rtRekey.groupByKey().aggregate(
BqRtDetailLogLine_aggregate::new,
new PRTAggregate(),
TimeWindows.of(60 * 60 * 1000L),
collectorSerde, "prt_minute_agg_stream");

–

Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

If I comment out the aggregation step and just .print the .map step I don't
hit the error. It's coming from aggregating the non-String key.

On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <ra...@gruchalski.com>
wrote:

> Jon,
>
> Looking at your code:
>
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> and later:
>
> KStream<String, RtDetailLogLine> rtDetailLines = 
> kStreamBuilder.stream(stringSerde,
> prtRecordSerde, TOPIC);
>
> Is RtDetailLogLine inheriting from String? It is not, as the error
> suggests.
> You may have to write your own Serializer / Deserializer for
> RtDetailLogLine.
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> Using 0.10.1.0
>
> This is my topology:
>
> Properties config = new Properties();
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> JsonSerializer sumRecordsSerializer = new
> JsonSerializer<>();
> JsonDeserializer sumRecordsDeserializer = new
> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
> Serde collectorSerde =
> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>
> StringSerializer stringSerializer = new StringSerializer();
> StringDeserializer stringDeserializer = new StringDeserializer();
> Serde stringSerde =
> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>
> JsonDeserializer prtRecordDeserializer = new
> JsonDeserializer<>(RtDetailLogLine.class);
> JsonSerializer prtRecordJsonSerializer = new
> JsonSerializer<>();
> Serde prtRecordSerde =
> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>
> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>
> KStream<String, RtDetailLogLine> rtDetailLines =
> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>
> // change the keying
> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value)
> -> new KeyValue<>(new AggKey(value), value));
>
> KTable<Windowed, BqRtDetailLogLine_aggregate> ktRtDetail =
> rtRekey.groupByKey().aggregate(
> BqRtDetailLogLine_aggregate::new,
> new PRTAggregate(),
> TimeWindows.of(60 * 60 * 1000L),
> collectorSerde, "prt_minute_agg_stream");
>
> ktRtDetail.toStream().print();
>
> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
>
> kafkaStreams.start();
>
>
> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian@gmail.com> wrote:
>
> > Hi Jon,
> >
> > A couple of things: Which version are you using?
> > Can you share the code you are using to the build the topology?
> >
> > Thanks,
> > Damian
> >
> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com>
> wrote:
> >
> > > Im using .map to convert my (k/v) string/Object to Object/Object but
> > when I
> > > chain this to an aggregation step Im getting this exception:
> > >
> > > Exception in thread "StreamThread-1" java.lang.ClassCastException:
> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> > > java.lang.String
> > > at
> > >
> > > org.apache.kafka.common.serialization.StringSerializer.serialize(
> > StringSerializer.java:24)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.RecordCollector.send(
> > RecordCollector.java:73)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.SinkNode.
> > process(SinkNode.java:72)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextI

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Jon,

Looking at your code:

config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

and later:

KStream<String, RtDetailLogLine> rtDetailLines =
kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);

Is RtDetailLogLine inheriting from String? It is not, as the error suggests.
You may have to write your own Serializer / Deserializer for
RtDetailLogLine.

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

Using 0.10.1.0

This is my topology:

Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

JsonSerializer sumRecordsSerializer = new
JsonSerializer<>();
JsonDeserializer sumRecordsDeserializer = new
JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
Serde collectorSerde =
Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);

StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
Serde stringSerde =
Serdes.serdeFrom(stringSerializer,stringDeserializer);

JsonDeserializer prtRecordDeserializer = new
JsonDeserializer<>(RtDetailLogLine.class);
JsonSerializer prtRecordJsonSerializer = new
JsonSerializer<>();
Serde prtRecordSerde =
Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);

KStreamBuilder kStreamBuilder = new KStreamBuilder();

KStream<String, RtDetailLogLine> rtDetailLines =
kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);

// change the keying
KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value)
-> new KeyValue<>(new AggKey(value), value));

KTable<Windowed, BqRtDetailLogLine_aggregate> ktRtDetail =
rtRekey.groupByKey().aggregate(
BqRtDetailLogLine_aggregate::new,
new PRTAggregate(),
TimeWindows.of(60 * 60 * 1000L),
collectorSerde, "prt_minute_agg_stream");

ktRtDetail.toStream().print();

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);

kafkaStreams.start();


On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian@gmail.com> wrote:

> Hi Jon,
>
> A couple of things: Which version are you using?
> Can you share the code you are using to the build the topology?
>
> Thanks,
> Damian
>
> On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com>
wrote:
>
> > Im using .map to convert my (k/v) string/Object to Object/Object but
> when I
> > chain this to an aggregation step Im getting this exception:
> >
> > Exception in thread "StreamThread-1" java.lang.ClassCastException:
> > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> > java.lang.String
> > at
> >
> > org.apache.kafka.common.serialization.StringSerializer.serialize(
> StringSerializer.java:24)
> > at
> >
> > org.apache.kafka.streams.processor.internals.RecordCollector.send(
> RecordCollector.java:73)
> > at
> >
> > org.apache.kafka.streams.processor.internals.SinkNode.
> process(SinkNode.java:72)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> KStreamFilterProcessor.process(KStreamFilter.java:44)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KStreamMap$
> KStreamMapProcessor.process(KStreamMap.java:43)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:66)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:181)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:436)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> >
> > My key object implements Serde and returns a JsonSerializer for the
> > 'Serializer()' override.
> >
> > In the config for the topology Im
> > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > AggKey.class.getName());
> >
> > Where else do I need to specify the (de)serializer for my key class?
> >
>


Re: Looking for guidance on setting ZK session timeouts in AWS

2016-12-05 Thread Radek Gruchalski
Thomas,

I’m always running ZK separate from Kafka. Mind you, no multi-region, just
multi-AZ.
I have never had issues with default settings. It’s possible that once your
cluster gets bigger, you may have to increase the timeouts. Never had a
problem with cluster size of ~20 brokers.
Happy to hear from others though.

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 5, 2016 at 5:27:05 PM, Thomas Becker (tobec...@tivo.com) wrote:

Thanks for the reply, Radek. So you're running with 6s then? I'm
surprised, I thought people were generally increasing this value when
running in EC2. Can I ask if you folks are running ZK on the same
instances as your Kafka brokers? We do, and yes we know it's somewhat
frowned upon.

-Tommy
On Mon, 2016-12-05 at 11:00 -0500, Radek Gruchalski wrote:
> Hi Thomas,
>
> Defaults are good for sure. Never had a problem with default timeouts
> in AWS.
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 5, 2016 at 4:58:41 PM, Thomas Becker (tobec...@tivo.com)
> wrote:
> > I know several folks are running Kafka in AWS, can someone give me
> > an
> > idea of what sort of values you're using for ZK session timeouts?
> >
> > --
> >
> >
> > Tommy Becker
> >
> > Senior Software Engineer
> >
> > O +1 919.460.4747
> >
> > tivo.com
> >
> >
> > 
> >
> > This email and any attachments may contain confidential and
> > privileged material for the sole use of the intended recipient. Any
> > review, copying, or distribution of this email (or any attachments)
> > by others is prohibited. If you are not the intended recipient,
> > please contact the sender immediately and permanently delete this
> > email and any attachments. No employee or agent of TiVo Inc. is
> > authorized to conclude any binding agreement on behalf of TiVo Inc.
> > by email. Binding agreements with TiVo Inc. may only be made by a
> > signed written agreement.
-- 


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.


Re: Looking for guidance on setting ZK session timeouts in AWS

2016-12-05 Thread Radek Gruchalski
Hi Thomas,

Defaults are good for sure. Never had a problem with default timeouts in
AWS.

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 5, 2016 at 4:58:41 PM, Thomas Becker (tobec...@tivo.com) wrote:

I know several folks are running Kafka in AWS, can someone give me an
idea of what sort of values you're using for ZK session timeouts?

-- 


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.


Re: Kafka 0.10.1.0 consumer group rebalance broken?

2016-11-29 Thread Radek Gruchalski
You’re most likely correct that it’s not that particular change.
That commit was introduced only 6 days ago, well after releasing 0.10.1.
An mvp would be helpful. Unless someone else on this list knows the issue
immediately.

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On November 29, 2016 at 9:29:22 AM, Bart Vercammen (b...@cloutrix.com)
wrote:

Well, well, mr. Gruchalski, always nice to talk to you ;-)

Not sure whether it is indeed related to:
https://github.com/apache/kafka/commit/4b003d8bcfffded55a00b8ecc9eed8
eb373fb6c7#diff-d2461f78377b588c4f7e2fe8223d5679R633

But I'll have a look and will try to create a test scenario for this that's
able to reproduce the issue at hand.
I'll also include some logs in my following posts.

Thanks for the reply ... food for thought indeed ...


On Mon, Nov 28, 2016 at 10:17 PM, Radek Gruchalski <ra...@gruchalski.com>
wrote:

> There has been plenty of changes in the GroupCoordinator and co between
> these two releases:
> https://github.com/apache/kafka/commit/40b1dd3f495a59abef8a0cba545052
> 6994c92c04#diff-96e4cf31cd54def6b2fb3f7a118c1db3
>
> It might be related to this:
> https://github.com/apache/kafka/commit/4b003d8bcfffded55a00b8ecc9eed8
> eb373fb6c7#diff-d2461f78377b588c4f7e2fe8223d5679R633
>
> If your group is empty, your group is marked dead, when the group is dead,
> no matter what you do, it’ll reply with:
> https://github.com/apache/kafka/blob/trunk/core/src/
> main/scala/kafka/coordinator/GroupCoordinator.scala#L353
>
> Food for thought.
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On November 28, 2016 at 9:04:16 PM, Bart Vercammen (b...@cloutrix.com)
> wrote:
>
> Hi,
>
> It seems that consumer group rebalance is broken in Kafka 0.10.1.0 ?
> When running a small test-project :
> - consumers running in own JVM (with different 'client.id')
> - producer running in own JVM
> - kafka broker : the embedded kafka : KafkaServerStartable
>
> It looks like the consumers loose their hart-beat after a rebalance got
> triggered.
> In the logs on the consumer I can actually see that the heartbeat failed
> due to "invalid member_id"
>
> When running the exact same code on a 0.10.0.1 setup, all works perfectly.
> Anyone else seen this problem?
>
> Greets,
> Bart
>
>


--
Mvg,
Bart Vercammen

[image: Picture]
clouTrix BVBA
+32 486 69 17 68
i...@cloutrix.com


Re: Kafka 0.10.1.0 consumer group rebalance broken?

2016-11-28 Thread Radek Gruchalski
There has been plenty of changes in the GroupCoordinator and co between
these two releases:
https://github.com/apache/kafka/commit/40b1dd3f495a59abef8a0cba5450526994c92c04#diff-96e4cf31cd54def6b2fb3f7a118c1db3

It might be related to this:
https://github.com/apache/kafka/commit/4b003d8bcfffded55a00b8ecc9eed8eb373fb6c7#diff-d2461f78377b588c4f7e2fe8223d5679R633

If your group is empty, your group is marked dead, when the group is dead,
no matter what you do, it’ll reply with:
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala#L353

Food for thought.

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On November 28, 2016 at 9:04:16 PM, Bart Vercammen (b...@cloutrix.com)
wrote:

Hi,

It seems that consumer group rebalance is broken in Kafka 0.10.1.0 ?
When running a small test-project :
- consumers running in own JVM (with different 'client.id')
- producer running in own JVM
- kafka broker : the embedded kafka : KafkaServerStartable

It looks like the consumers loose their hart-beat after a rebalance got
triggered.
In the logs on the consumer I can actually see that the heartbeat failed
due to "invalid member_id"

When running the exact same code on a 0.10.0.1 setup, all works perfectly.
Anyone else seen this problem?

Greets,
Bart


Re: Logo

2013-07-22 Thread Radek Gruchalski
296 looks familiar: https://www.nodejitsu.com/  

Kind regards,

Radek Gruchalski
radek.gruchal...@technicolor.com (mailto:radek.gruchal...@technicolor.com) | 
radek.gruchal...@portico.io (mailto:radek.gruchal...@portico.io) | 

ra...@gruchalski.com
 (mailto:ra...@gruchalski.com)
00447889948663





Confidentiality:
This communication is intended for the above-named person and may be 
confidential and/or legally privileged.
If it has come to you in error you must take no action based on it, nor must 
you copy or show it to anyone; please delete/destroy and inform the sender 
immediately.


On Monday, 22 July 2013 at 18:51, Jay Kreps wrote:

 Hey guys,
  
 We need a logo!
  
 I got a few designs from a 99 designs contest that I would like to put
 forward:
 https://issues.apache.org/jira/browse/KAFKA-982
  
 If anyone else would like to submit a design that would be great.
  
 Let's do a vote to choose one.
  
 -Jay  



Re: Kafka Node.js Integration Questions/Advice

2012-12-22 Thread Radek Gruchalski
That is exactly why we've decided to stick with java. Also support for all 
consumer settings out of the box.

Kind regards,
Radek Gruchalski

On 22 Dec 2012, at 19:17, David Arthur mum...@gmail.com wrote:

 FWIW, message production is quite simpler than consumption. It does
 not require the same complex coordination as the consumers. Producers
 only use ZooKeeper to locate available brokers
 
 Sent from my phone
 
 On Dec 22, 2012, at 1:00 PM, Apoorva Gaurav apoorvagau...@gmail.com wrote:
 
 Thanks Radek,
 We also are thinking of Java / Scala for Consumers, for Producers whether
 franz-kafka is a good choice?
 
 --
 Thanks  Regards,
 Apoorva
 
 On Sat, Dec 22, 2012 at 9:38 PM, Radek Gruchalski 
 radek.gruchal...@portico.io wrote:
 
 We started using node-kafka before we learned franz-kafka was available.
 In node, franz-kafka would be my preferred choice now. But tbh, our
 consumers are all java. node-kafka does not support consumer settings like
 autooffset.reset and so on (or it is not obvious how to use those).
 
 Afair franz-kafka offers those. Also, java zkconsumer gives you the jmx
 monitoring tools, which may be helpful if you want to add some scaling
 logic when consumer is lagging.
 
 Our first choice is node too but we're consuming exclusively with java.
 
 Hope this helps a little.
 
 On 22 Dec 2012, at 05:21, Apoorva Gaurav apoorvagau...@gmail.com wrote:
 
 Which is the best ZK based implementation of kafka in node.js. Our use
 case
 is that a pool of node js http servers will be listening to clients which
 will send json over http. Using node js we'll do minimal decoration and
 compression (preferably snappy) and write to brokers. We might also need
 json to avro conversion but thats not a deal breaker. Consumers will be
 writing these events to S3 (to begin with we don't plan to maintain HDFS
 cluster). To begin with we'll have to support a peak load of 50K events /
 second, average being much less, around 2K events / second. Suggestions
 please. Is any one using franz-kafka in production. I'm only two days
 into
 kafka so don't know a lot, but franz-kafka looks clean and easy to work
 with.
 
 If none of the existing node.js implementation is capable of this then we
 are willing to move to Scala or Java but node.js is the first choice.
 
 Thanks  Regards,
 Apoorva
 
 On Sat, Dec 22, 2012 at 2:25 AM, Radek Gruchalski 
 radek.gruchal...@portico.io wrote:
 
 We are using https://github.com/radekg/node-kafka, occasionally pushing
 about 2500 messages, 3.5K each / second. No issues so far. Different
 story
 with consumers. They are stable but under heavy load we experienced CPU
 problems. I am the maintainer of that fork. The fork comes with ZK
 integration. Another kafka module is this one:
 https://github.com/dannycoates/franz-kafka.
 
 Kind regards,
 Radek Gruchalski
 radek.gruchal...@technicolor.com (mailto:
 radek.gruchal...@technicolor.com)
 | radek.gruchal...@portico.io (mailto:radek.gruchal...@portico.io) |
 ra...@gruchalski.com (mailto:ra...@gruchalski.com)
 00447889948663
 
 Confidentiality:
 This communication is intended for the above-named person and may be
 confidential and/or legally privileged.
 If it has come to you in error you must take no action based on it, nor
 must you copy or show it to anyone; please delete/destroy and inform the
 sender immediately.
 
 
 On Thursday, 20 December 2012 at 18:31, Jun Rao wrote:
 
 Chris,
 
 Not sure how stable those node.js clients are. In 0.8, we plan to
 provide a
 native C version of the producer. A thin node.js layer can potentially
 be
 built on top of that.
 
 Thanks,
 
 Jun
 
 On Thu, Dec 20, 2012 at 8:46 AM, Christopher Alexander 
 calexan...@gravycard.com (mailto:calexan...@gravycard.com) wrote:
 
 During my due diligence to assess use of Kafka for both our activity
 and
 log message streams, I would like to ask the project committers and
 community users about using Kafka with Node.js. Yes, I am aware that a
 Kafka client exists for Node.js (
 https://github.com/marcuswestin/node-kafka), which has spurred
 further
 interest by our front-end team. Here are my questions, excuse me if
 they
 seem noobish.
 
 1. How reliable is the Node.js client (
 https://github.com/marcuswestin/node-kafka) in production
 applications?
 If there are issues, what are they (the GitHub repo currently lists
 none)?
 2. To support real-time activity streams within Node.js, what is the
 recommended consumer polling interval?
 3. General advise observations on integrating a front-end based
 Node.js
 application with Kafka mediated messaging.
 
 Thanks you!
 
 Chris