Re: Implementing custom key serializer
Yeah, I knew that already, this part of the error: > > >>> > > org.apache.kafka.streams.processor.internals. > RecordCollector.send( > > >>> > RecordCollector.java:73) points to this line: https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java#L73 which means that your error happens on the value, not the key. – Best regards, Radek Gruchalski ra...@gruchalski.com On December 6, 2016 at 9:18:53 PM, Jon Yeargers (jon.yearg...@cedexis.com) wrote: 0.10.1.0 On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski <ra...@gruchalski.com> wrote: > Jon, > > Are you using 0.10.1 or 0.10.0.1? > > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com) > wrote: > > Hi Jon, > > At a glance the code looks ok, i.e, i believe the aggregate() should have > picked up the default Serde set in your StreamsConfig. However, you could > try adding the Serdes to the groupBy(..) > > i.e., > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...) > > Thanks, > Damian > > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers <jon.yearg...@cedexis.com> wrote: > > > It's just a bunch of public 'int' and 'String' values. There's an empty > > constructor and a copy constructor. > > > > For functions I override 'equals' and the requirements for 'serde' > (close, > > configure, serializer and deserializer). > > > > @Override > > public Serializer serializer() { > > JsonSerializer jsonSerializer = new JsonSerializer<>(); > > return jsonSerializer; > > } > > > > @Override > > public Deserializer deserializer() { > > JsonDeserializer jsonDeserializer = new > > JsonDeserializer<>(); > > return jsonDeserializer; > > } > > > > > > > > > > Which relates to: > > > > public class JsonSerializer implements Serializer { > > > > private Gson gson = new Gson(); > > > > @Override > > public void configure(Map<String, ?> map, boolean b) { > > > > } > > > > @Override > > public byte[] serialize(String topic, T t) { > > return gson.toJson(t).getBytes(Charset.forName("UTF-8")); > > } > > > > @Override > > public void close() { > > > > } > > } > > > > > > > > public class JsonDeserializer implements Deserializer { > > > > private Gson gson = new Gson(); > > private Class deserializedClass; > > > > public JsonDeserializer(Class deserializedClass) { > > this.deserializedClass = deserializedClass; > > } > > > > public JsonDeserializer() { > > } > > > > @Override > > @SuppressWarnings("unchecked") > > public void configure(Map<String, ?> map, boolean b) { > > if(deserializedClass == null) { > > deserializedClass = (Class) map.get("serializedClass"); > > } > > } > > > > @Override > > public T deserialize(String s, byte[] bytes) { > > if(bytes == null){ > > return null; > > } > > > > return gson.fromJson(new String(bytes),deserializedClass); > > > > } > > > > @Override > > public void close() { > > > > } > > } > > > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski <ra...@gruchalski.com> > > wrote: > > > > > Do you mind sharing the code of AggKey class? > > > > > > – > > > Best regards, > > > Radek Gruchalski > > > ra...@gruchalski.com > > > > > > > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers ( > > jon.yearg...@cedexis.com) > > > wrote: > > > > > > The 2nd. > > > > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski < > ra...@gruchalski.com> > > > wrote: > > > > > >> Is the error happening at this stage? > > >> > > >> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, > > value) > > >> -> new KeyValue<>(new AggKey(value), value)); > > >> > > >> or here: > > >> > > >> KTable<Windowed, BqRtDetailLogLine_aggregate> ktRtDetail = > > >> rtRekey.groupByKey().aggregate( > > >> BqRtDetailLogLine_aggregate::new, > > >> new PRTAggregate(), > > >> TimeWindows.of(60 * 60 * 1000L), > > >> collectorSerde, "prt_minute_agg_stream"); > > >&g
Re: Implementing custom key serializer
Jon, Are you using 0.10.1 or 0.10.0.1? – Best regards, Radek Gruchalski ra...@gruchalski.com On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com) wrote: Hi Jon, At a glance the code looks ok, i.e, i believe the aggregate() should have picked up the default Serde set in your StreamsConfig. However, you could try adding the Serdes to the groupBy(..) i.e., rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...) Thanks, Damian On Tue, 6 Dec 2016 at 18:42 Jon Yeargers <jon.yearg...@cedexis.com> wrote: > It's just a bunch of public 'int' and 'String' values. There's an empty > constructor and a copy constructor. > > For functions I override 'equals' and the requirements for 'serde' (close, > configure, serializer and deserializer). > > @Override > public Serializer serializer() { > JsonSerializer jsonSerializer = new JsonSerializer<>(); > return jsonSerializer; > } > > @Override > public Deserializer deserializer() { > JsonDeserializer jsonDeserializer = new > JsonDeserializer<>(); > return jsonDeserializer; > } > > > > > Which relates to: > > public class JsonSerializer implements Serializer { > > private Gson gson = new Gson(); > > @Override > public void configure(Map<String, ?> map, boolean b) { > > } > > @Override > public byte[] serialize(String topic, T t) { > return gson.toJson(t).getBytes(Charset.forName("UTF-8")); > } > > @Override > public void close() { > > } > } > > > > public class JsonDeserializer implements Deserializer { > > private Gson gson = new Gson(); > private Class deserializedClass; > > public JsonDeserializer(Class deserializedClass) { > this.deserializedClass = deserializedClass; > } > > public JsonDeserializer() { > } > > @Override > @SuppressWarnings("unchecked") > public void configure(Map<String, ?> map, boolean b) { > if(deserializedClass == null) { > deserializedClass = (Class) map.get("serializedClass"); > } > } > > @Override > public T deserialize(String s, byte[] bytes) { > if(bytes == null){ > return null; > } > > return gson.fromJson(new String(bytes),deserializedClass); > > } > > @Override > public void close() { > > } > } > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski <ra...@gruchalski.com> > wrote: > > > Do you mind sharing the code of AggKey class? > > > > – > > Best regards, > > Radek Gruchalski > > ra...@gruchalski.com > > > > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers ( > jon.yearg...@cedexis.com) > > wrote: > > > > The 2nd. > > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <ra...@gruchalski.com> > > wrote: > > > >> Is the error happening at this stage? > >> > >> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, > value) > >> -> new KeyValue<>(new AggKey(value), value)); > >> > >> or here: > >> > >> KTable<Windowed, BqRtDetailLogLine_aggregate> ktRtDetail = > >> rtRekey.groupByKey().aggregate( > >> BqRtDetailLogLine_aggregate::new, > >> new PRTAggregate(), > >> TimeWindows.of(60 * 60 * 1000L), > >> collectorSerde, "prt_minute_agg_stream"); > >> > >> – > >> > >> Best regards, > >> Radek Gruchalski > >> ra...@gruchalski.com > >> > >> > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers ( > jon.yearg...@cedexis.com) > >> wrote: > >> > >> If I comment out the aggregation step and just .print the .map step I > >> don't hit the error. It's coming from aggregating the non-String key. > >> > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <ra...@gruchalski.com> > >> wrote: > >> > >>> Jon, > >>> > >>> Looking at your code: > >>> > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >>> Serdes.String().getClass().getName()); > >>> > >>> and later: > >>> > >>> KStream<String, RtDetailLogLine> rtDetailLines = > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); > >>> > >>> Is RtDetailLogLine inheriting from String? It is not, as the error > >
Re: Implementing custom key serializer
Do you mind sharing the code of AggKey class? – Best regards, Radek Gruchalski ra...@gruchalski.com On December 6, 2016 at 7:26:51 PM, Jon Yeargers (jon.yearg...@cedexis.com) wrote: The 2nd. On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <ra...@gruchalski.com> wrote: > Is the error happening at this stage? > > KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) -> > new KeyValue<>(new AggKey(value), value)); > > or here: > > KTable<Windowed, BqRtDetailLogLine_aggregate> ktRtDetail = > rtRekey.groupByKey().aggregate( > BqRtDetailLogLine_aggregate::new, > new PRTAggregate(), > TimeWindows.of(60 * 60 * 1000L), > collectorSerde, "prt_minute_agg_stream"); > > – > > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com) > wrote: > > If I comment out the aggregation step and just .print the .map step I > don't hit the error. It's coming from aggregating the non-String key. > > On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <ra...@gruchalski.com> > wrote: > >> Jon, >> >> Looking at your code: >> >> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> Serdes.String().getClass().getName()); >> >> and later: >> >> KStream<String, RtDetailLogLine> rtDetailLines = >> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); >> >> Is RtDetailLogLine inheriting from String? It is not, as the error >> suggests. >> You may have to write your own Serializer / Deserializer for >> RtDetailLogLine. >> >> – >> Best regards, >> Radek Gruchalski >> ra...@gruchalski.com >> >> >> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com) >> wrote: >> >> Using 0.10.1.0 >> >> This is my topology: >> >> Properties config = new Properties(); >> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); >> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); >> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); >> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); >> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> Serdes.String().getClass().getName()); >> >> JsonSerializer sumRecordsSerializer = new >> JsonSerializer<>(); >> JsonDeserializer sumRecordsDeserializer = >> new >> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); >> Serde collectorSerde = >> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); >> >> StringSerializer stringSerializer = new StringSerializer(); >> StringDeserializer stringDeserializer = new StringDeserializer(); >> Serde stringSerde = >> Serdes.serdeFrom(stringSerializer,stringDeserializer); >> >> JsonDeserializer prtRecordDeserializer = new >> JsonDeserializer<>(RtDetailLogLine.class); >> JsonSerializer prtRecordJsonSerializer = new >> JsonSerializer<>(); >> Serde prtRecordSerde = >> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); >> >> KStreamBuilder kStreamBuilder = new KStreamBuilder(); >> >> KStream<String, RtDetailLogLine> rtDetailLines = >> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); >> >> // change the keying >> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) >> -> new KeyValue<>(new AggKey(value), value)); >> >> KTable<Windowed, BqRtDetailLogLine_aggregate> ktRtDetail = >> rtRekey.groupByKey().aggregate( >> BqRtDetailLogLine_aggregate::new, >> new PRTAggregate(), >> TimeWindows.of(60 * 60 * 1000L), >> collectorSerde, "prt_minute_agg_stream"); >> >> ktRtDetail.toStream().print(); >> >> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); >> >> kafkaStreams.start(); >> >> >> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian@gmail.com> wrote: >> >> > Hi Jon, >> > >> > A couple of things: Which version are you using? >> > Can you share the code you are using to the build the topology? >> > >> > Thanks, >> > Damian >> > >> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com> >> wrote: >> > >> > > Im using .map to convert my (k/v) string/Object to Object/Object but >> > when I >> > > chain this to a
Re: Implementing custom key serializer
Is the error happening at this stage? KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) -> new KeyValue<>(new AggKey(value), value)); or here: KTable<Windowed, BqRtDetailLogLine_aggregate> ktRtDetail = rtRekey.groupByKey().aggregate( BqRtDetailLogLine_aggregate::new, new PRTAggregate(), TimeWindows.of(60 * 60 * 1000L), collectorSerde, "prt_minute_agg_stream"); – Best regards, Radek Gruchalski ra...@gruchalski.com On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com) wrote: If I comment out the aggregation step and just .print the .map step I don't hit the error. It's coming from aggregating the non-String key. On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <ra...@gruchalski.com> wrote: > Jon, > > Looking at your code: > > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > > and later: > > KStream<String, RtDetailLogLine> rtDetailLines = > kStreamBuilder.stream(stringSerde, > prtRecordSerde, TOPIC); > > Is RtDetailLogLine inheriting from String? It is not, as the error > suggests. > You may have to write your own Serializer / Deserializer for > RtDetailLogLine. > > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com) > wrote: > > Using 0.10.1.0 > > This is my topology: > > Properties config = new Properties(); > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); > config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > > JsonSerializer sumRecordsSerializer = new > JsonSerializer<>(); > JsonDeserializer sumRecordsDeserializer = new > JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); > Serde collectorSerde = > Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); > > StringSerializer stringSerializer = new StringSerializer(); > StringDeserializer stringDeserializer = new StringDeserializer(); > Serde stringSerde = > Serdes.serdeFrom(stringSerializer,stringDeserializer); > > JsonDeserializer prtRecordDeserializer = new > JsonDeserializer<>(RtDetailLogLine.class); > JsonSerializer prtRecordJsonSerializer = new > JsonSerializer<>(); > Serde prtRecordSerde = > Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); > > KStreamBuilder kStreamBuilder = new KStreamBuilder(); > > KStream<String, RtDetailLogLine> rtDetailLines = > kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); > > // change the keying > KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) > -> new KeyValue<>(new AggKey(value), value)); > > KTable<Windowed, BqRtDetailLogLine_aggregate> ktRtDetail = > rtRekey.groupByKey().aggregate( > BqRtDetailLogLine_aggregate::new, > new PRTAggregate(), > TimeWindows.of(60 * 60 * 1000L), > collectorSerde, "prt_minute_agg_stream"); > > ktRtDetail.toStream().print(); > > KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); > > kafkaStreams.start(); > > > On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian@gmail.com> wrote: > > > Hi Jon, > > > > A couple of things: Which version are you using? > > Can you share the code you are using to the build the topology? > > > > Thanks, > > Damian > > > > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com> > wrote: > > > > > Im using .map to convert my (k/v) string/Object to Object/Object but > > when I > > > chain this to an aggregation step Im getting this exception: > > > > > > Exception in thread "StreamThread-1" java.lang.ClassCastException: > > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to > > > java.lang.String > > > at > > > > > > org.apache.kafka.common.serialization.StringSerializer.serialize( > > StringSerializer.java:24) > > > at > > > > > > org.apache.kafka.streams.processor.internals.RecordCollector.send( > > RecordCollector.java:73) > > > at > > > > > > org.apache.kafka.streams.processor.internals.SinkNode. > > process(SinkNode.java:72) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextI
Re: Implementing custom key serializer
Jon, Looking at your code: config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); and later: KStream<String, RtDetailLogLine> rtDetailLines = kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); Is RtDetailLogLine inheriting from String? It is not, as the error suggests. You may have to write your own Serializer / Deserializer for RtDetailLogLine. – Best regards, Radek Gruchalski ra...@gruchalski.com On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com) wrote: Using 0.10.1.0 This is my topology: Properties config = new Properties(); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); JsonSerializer sumRecordsSerializer = new JsonSerializer<>(); JsonDeserializer sumRecordsDeserializer = new JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); Serde collectorSerde = Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); StringSerializer stringSerializer = new StringSerializer(); StringDeserializer stringDeserializer = new StringDeserializer(); Serde stringSerde = Serdes.serdeFrom(stringSerializer,stringDeserializer); JsonDeserializer prtRecordDeserializer = new JsonDeserializer<>(RtDetailLogLine.class); JsonSerializer prtRecordJsonSerializer = new JsonSerializer<>(); Serde prtRecordSerde = Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, RtDetailLogLine> rtDetailLines = kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); // change the keying KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) -> new KeyValue<>(new AggKey(value), value)); KTable<Windowed, BqRtDetailLogLine_aggregate> ktRtDetail = rtRekey.groupByKey().aggregate( BqRtDetailLogLine_aggregate::new, new PRTAggregate(), TimeWindows.of(60 * 60 * 1000L), collectorSerde, "prt_minute_agg_stream"); ktRtDetail.toStream().print(); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); kafkaStreams.start(); On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian@gmail.com> wrote: > Hi Jon, > > A couple of things: Which version are you using? > Can you share the code you are using to the build the topology? > > Thanks, > Damian > > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com> wrote: > > > Im using .map to convert my (k/v) string/Object to Object/Object but > when I > > chain this to an aggregation step Im getting this exception: > > > > Exception in thread "StreamThread-1" java.lang.ClassCastException: > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to > > java.lang.String > > at > > > > org.apache.kafka.common.serialization.StringSerializer.serialize( > StringSerializer.java:24) > > at > > > > org.apache.kafka.streams.processor.internals.RecordCollector.send( > RecordCollector.java:73) > > at > > > > org.apache.kafka.streams.processor.internals.SinkNode. > process(SinkNode.java:72) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ > KStreamFilterProcessor.process(KStreamFilter.java:44) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.kstream.internals.KStreamMap$ > KStreamMapProcessor.process(KStreamMap.java:43) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:66) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:181) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:436) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > > > > My key object implements Serde and returns a JsonSerializer for the > > 'Serializer()' override. > > > > In the config for the topology Im > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > AggKey.class.getName()); > > > > Where else do I need to specify the (de)serializer for my key class? > > >
Re: Looking for guidance on setting ZK session timeouts in AWS
Thomas, I’m always running ZK separate from Kafka. Mind you, no multi-region, just multi-AZ. I have never had issues with default settings. It’s possible that once your cluster gets bigger, you may have to increase the timeouts. Never had a problem with cluster size of ~20 brokers. Happy to hear from others though. – Best regards, Radek Gruchalski ra...@gruchalski.com On December 5, 2016 at 5:27:05 PM, Thomas Becker (tobec...@tivo.com) wrote: Thanks for the reply, Radek. So you're running with 6s then? I'm surprised, I thought people were generally increasing this value when running in EC2. Can I ask if you folks are running ZK on the same instances as your Kafka brokers? We do, and yes we know it's somewhat frowned upon. -Tommy On Mon, 2016-12-05 at 11:00 -0500, Radek Gruchalski wrote: > Hi Thomas, > > Defaults are good for sure. Never had a problem with default timeouts > in AWS. > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 5, 2016 at 4:58:41 PM, Thomas Becker (tobec...@tivo.com) > wrote: > > I know several folks are running Kafka in AWS, can someone give me > > an > > idea of what sort of values you're using for ZK session timeouts? > > > > -- > > > > > > Tommy Becker > > > > Senior Software Engineer > > > > O +1 919.460.4747 > > > > tivo.com > > > > > > > > > > This email and any attachments may contain confidential and > > privileged material for the sole use of the intended recipient. Any > > review, copying, or distribution of this email (or any attachments) > > by others is prohibited. If you are not the intended recipient, > > please contact the sender immediately and permanently delete this > > email and any attachments. No employee or agent of TiVo Inc. is > > authorized to conclude any binding agreement on behalf of TiVo Inc. > > by email. Binding agreements with TiVo Inc. may only be made by a > > signed written agreement. -- Tommy Becker Senior Software Engineer O +1 919.460.4747 tivo.com This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.
Re: Looking for guidance on setting ZK session timeouts in AWS
Hi Thomas, Defaults are good for sure. Never had a problem with default timeouts in AWS. – Best regards, Radek Gruchalski ra...@gruchalski.com On December 5, 2016 at 4:58:41 PM, Thomas Becker (tobec...@tivo.com) wrote: I know several folks are running Kafka in AWS, can someone give me an idea of what sort of values you're using for ZK session timeouts? -- Tommy Becker Senior Software Engineer O +1 919.460.4747 tivo.com This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.
Re: Kafka 0.10.1.0 consumer group rebalance broken?
You’re most likely correct that it’s not that particular change. That commit was introduced only 6 days ago, well after releasing 0.10.1. An mvp would be helpful. Unless someone else on this list knows the issue immediately. – Best regards, Radek Gruchalski ra...@gruchalski.com On November 29, 2016 at 9:29:22 AM, Bart Vercammen (b...@cloutrix.com) wrote: Well, well, mr. Gruchalski, always nice to talk to you ;-) Not sure whether it is indeed related to: https://github.com/apache/kafka/commit/4b003d8bcfffded55a00b8ecc9eed8 eb373fb6c7#diff-d2461f78377b588c4f7e2fe8223d5679R633 But I'll have a look and will try to create a test scenario for this that's able to reproduce the issue at hand. I'll also include some logs in my following posts. Thanks for the reply ... food for thought indeed ... On Mon, Nov 28, 2016 at 10:17 PM, Radek Gruchalski <ra...@gruchalski.com> wrote: > There has been plenty of changes in the GroupCoordinator and co between > these two releases: > https://github.com/apache/kafka/commit/40b1dd3f495a59abef8a0cba545052 > 6994c92c04#diff-96e4cf31cd54def6b2fb3f7a118c1db3 > > It might be related to this: > https://github.com/apache/kafka/commit/4b003d8bcfffded55a00b8ecc9eed8 > eb373fb6c7#diff-d2461f78377b588c4f7e2fe8223d5679R633 > > If your group is empty, your group is marked dead, when the group is dead, > no matter what you do, it’ll reply with: > https://github.com/apache/kafka/blob/trunk/core/src/ > main/scala/kafka/coordinator/GroupCoordinator.scala#L353 > > Food for thought. > > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On November 28, 2016 at 9:04:16 PM, Bart Vercammen (b...@cloutrix.com) > wrote: > > Hi, > > It seems that consumer group rebalance is broken in Kafka 0.10.1.0 ? > When running a small test-project : > - consumers running in own JVM (with different 'client.id') > - producer running in own JVM > - kafka broker : the embedded kafka : KafkaServerStartable > > It looks like the consumers loose their hart-beat after a rebalance got > triggered. > In the logs on the consumer I can actually see that the heartbeat failed > due to "invalid member_id" > > When running the exact same code on a 0.10.0.1 setup, all works perfectly. > Anyone else seen this problem? > > Greets, > Bart > > -- Mvg, Bart Vercammen [image: Picture] clouTrix BVBA +32 486 69 17 68 i...@cloutrix.com
Re: Kafka 0.10.1.0 consumer group rebalance broken?
There has been plenty of changes in the GroupCoordinator and co between these two releases: https://github.com/apache/kafka/commit/40b1dd3f495a59abef8a0cba5450526994c92c04#diff-96e4cf31cd54def6b2fb3f7a118c1db3 It might be related to this: https://github.com/apache/kafka/commit/4b003d8bcfffded55a00b8ecc9eed8eb373fb6c7#diff-d2461f78377b588c4f7e2fe8223d5679R633 If your group is empty, your group is marked dead, when the group is dead, no matter what you do, it’ll reply with: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala#L353 Food for thought. – Best regards, Radek Gruchalski ra...@gruchalski.com On November 28, 2016 at 9:04:16 PM, Bart Vercammen (b...@cloutrix.com) wrote: Hi, It seems that consumer group rebalance is broken in Kafka 0.10.1.0 ? When running a small test-project : - consumers running in own JVM (with different 'client.id') - producer running in own JVM - kafka broker : the embedded kafka : KafkaServerStartable It looks like the consumers loose their hart-beat after a rebalance got triggered. In the logs on the consumer I can actually see that the heartbeat failed due to "invalid member_id" When running the exact same code on a 0.10.0.1 setup, all works perfectly. Anyone else seen this problem? Greets, Bart
Re: Logo
296 looks familiar: https://www.nodejitsu.com/ Kind regards, Radek Gruchalski radek.gruchal...@technicolor.com (mailto:radek.gruchal...@technicolor.com) | radek.gruchal...@portico.io (mailto:radek.gruchal...@portico.io) | ra...@gruchalski.com (mailto:ra...@gruchalski.com) 00447889948663 Confidentiality: This communication is intended for the above-named person and may be confidential and/or legally privileged. If it has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender immediately. On Monday, 22 July 2013 at 18:51, Jay Kreps wrote: Hey guys, We need a logo! I got a few designs from a 99 designs contest that I would like to put forward: https://issues.apache.org/jira/browse/KAFKA-982 If anyone else would like to submit a design that would be great. Let's do a vote to choose one. -Jay
Re: Kafka Node.js Integration Questions/Advice
That is exactly why we've decided to stick with java. Also support for all consumer settings out of the box. Kind regards, Radek Gruchalski On 22 Dec 2012, at 19:17, David Arthur mum...@gmail.com wrote: FWIW, message production is quite simpler than consumption. It does not require the same complex coordination as the consumers. Producers only use ZooKeeper to locate available brokers Sent from my phone On Dec 22, 2012, at 1:00 PM, Apoorva Gaurav apoorvagau...@gmail.com wrote: Thanks Radek, We also are thinking of Java / Scala for Consumers, for Producers whether franz-kafka is a good choice? -- Thanks Regards, Apoorva On Sat, Dec 22, 2012 at 9:38 PM, Radek Gruchalski radek.gruchal...@portico.io wrote: We started using node-kafka before we learned franz-kafka was available. In node, franz-kafka would be my preferred choice now. But tbh, our consumers are all java. node-kafka does not support consumer settings like autooffset.reset and so on (or it is not obvious how to use those). Afair franz-kafka offers those. Also, java zkconsumer gives you the jmx monitoring tools, which may be helpful if you want to add some scaling logic when consumer is lagging. Our first choice is node too but we're consuming exclusively with java. Hope this helps a little. On 22 Dec 2012, at 05:21, Apoorva Gaurav apoorvagau...@gmail.com wrote: Which is the best ZK based implementation of kafka in node.js. Our use case is that a pool of node js http servers will be listening to clients which will send json over http. Using node js we'll do minimal decoration and compression (preferably snappy) and write to brokers. We might also need json to avro conversion but thats not a deal breaker. Consumers will be writing these events to S3 (to begin with we don't plan to maintain HDFS cluster). To begin with we'll have to support a peak load of 50K events / second, average being much less, around 2K events / second. Suggestions please. Is any one using franz-kafka in production. I'm only two days into kafka so don't know a lot, but franz-kafka looks clean and easy to work with. If none of the existing node.js implementation is capable of this then we are willing to move to Scala or Java but node.js is the first choice. Thanks Regards, Apoorva On Sat, Dec 22, 2012 at 2:25 AM, Radek Gruchalski radek.gruchal...@portico.io wrote: We are using https://github.com/radekg/node-kafka, occasionally pushing about 2500 messages, 3.5K each / second. No issues so far. Different story with consumers. They are stable but under heavy load we experienced CPU problems. I am the maintainer of that fork. The fork comes with ZK integration. Another kafka module is this one: https://github.com/dannycoates/franz-kafka. Kind regards, Radek Gruchalski radek.gruchal...@technicolor.com (mailto: radek.gruchal...@technicolor.com) | radek.gruchal...@portico.io (mailto:radek.gruchal...@portico.io) | ra...@gruchalski.com (mailto:ra...@gruchalski.com) 00447889948663 Confidentiality: This communication is intended for the above-named person and may be confidential and/or legally privileged. If it has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender immediately. On Thursday, 20 December 2012 at 18:31, Jun Rao wrote: Chris, Not sure how stable those node.js clients are. In 0.8, we plan to provide a native C version of the producer. A thin node.js layer can potentially be built on top of that. Thanks, Jun On Thu, Dec 20, 2012 at 8:46 AM, Christopher Alexander calexan...@gravycard.com (mailto:calexan...@gravycard.com) wrote: During my due diligence to assess use of Kafka for both our activity and log message streams, I would like to ask the project committers and community users about using Kafka with Node.js. Yes, I am aware that a Kafka client exists for Node.js ( https://github.com/marcuswestin/node-kafka), which has spurred further interest by our front-end team. Here are my questions, excuse me if they seem noobish. 1. How reliable is the Node.js client ( https://github.com/marcuswestin/node-kafka) in production applications? If there are issues, what are they (the GitHub repo currently lists none)? 2. To support real-time activity streams within Node.js, what is the recommended consumer polling interval? 3. General advise observations on integrating a front-end based Node.js application with Kafka mediated messaging. Thanks you! Chris