Re: KStreams.reduceByKey passing nulls to my Deserializer?

2016-06-10 Thread Guozhang Wang
Hello Avi,

Yes, this is possible: although we checked nullable keys when doing reduce
/ aggregations:

https://github.com/apache/kafka/blob/0.10.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java#L67

We do not check if the there are any values returned from the underlying
state store, and when we cannot find such a match, null is returned to
deserializer.


I think in general the library should guard this case instead of letting
desers worry about it. Do you want to file a JIRA reporting this bug so we
can follow-up?


Guozhang

On Fri, Jun 10, 2016 at 11:27 AM, Avi Flax  wrote:

>
> > On Jun 10, 2016, at 14:24, Avi Flax  wrote:
> >
> > Hi, I’m using Kafka Streams (0.10.0) with JRuby, most of my
> scripts/nodes are working well at this point, except for one which is using
> reduceByKey.
>
> Whoops, I should probably share my code as well!
>
> Here’s the topology:
>
>
> builder.stream(key_serde, val_serde, 'visit-update-events')
>.reduceByKey(-> (a, b) { a.merge b }, key_serde, val_serde,
> 'intermediate-visits')
>.to(key_serde, val_serde, 'visits’)
>
>
> This is using Ruby syntax, but hopefully it’s fairly readable. I’ve added
> it to this gist as well:
> https://gist.github.com/aviflax/3428cdbaa18aca9bf0a958c6d5eac2bd
>
> Thanks!
> Avi




-- 
-- Guozhang


Manual update offset for new consumer

2016-06-10 Thread Henry Cai
When we were using the old consumer, we can use zookeeper client tool to
manually set the offset for a consumer group.

For the new consumer when the offsets is stored in broker, is there a tool
to do the same thing?


Re: Question about heterogeneous brokers in a cluster

2016-06-10 Thread Kevin A
Thanks Alex and Todd, I really appreciate the insights.

Based on what you've shared: spending more time up-front to homogenize the
nodes reduces some cognitive load for day-to-day support. Keeping things
simple generally wins me over.

Thanks again.

-Kevin

On Thu, Jun 9, 2016 at 10:37 PM, Todd Palino  wrote:

> So as Alex noted, there’s no immediate problem to doing this. Kafka itself
> doesn’t know much about the underlying hardware, so it’s not going to care.
> At the same time, this means that it has no way natively to know that those
> systems have more storage capacity. So they’re not going to automatically
> get more partitions.
>
> You have some options here.
>
>1. You could just ignore it, and treat everything like it’s the smaller
>brokers. This is easy, but you’ll waste your extra storage
>2. You could manually assign more partitions to the larger brokers. This
>requires a little bit of work, but it will more effectively use the
>hardware.
>
> The gotcha with #2 is that you have to make sure you’re not sending too
> much network traffic to the larger brokers, and you need to make sure that
> you’re not exhausting the CPU as well. And, of course, you’re going to have
> to keep an eye on new topics or anything like that to make sure that your
> weighted cluster balance is still where you want it to be, and manually fix
> it if not.
>
> -Todd
>
>
> On Fri, Jun 10, 2016 at 5:26 AM, Alex Loddengaard 
> wrote:
>
> > Hi Kevin,
> >
> > If you keep the same configs on the new brokers with more storage
> capacity,
> > I don't foresee any issues. Although I haven't tried it myself.
> >
> > What may introduce headaches is if you have different configuration
> options
> > per broker. Or if you try to assign more partitions to the newer brokers
> to
> > use more of their disk space.
> >
> > Let's see if others notice anything I'm missing (again, I've never tried
> > this before). Hope this helps.
> >
> > Alex
> >
> > On Thu, Jun 9, 2016 at 10:27 AM, Kevin A  wrote:
> >
> > > Hi there,
> > >
> > > I have a couple of Kafka brokers and thinking about adding a few more.
> > The
> > > new broker machines would have a lot more storage available to them
> than
> > > the existing brokers. Am I setting myself up for operational headaches
> by
> > > deploying a heterogeneous (in terms of storage capacity) cluster?
> > >
> > > (Asked on IRC but thought I'd try here too.)
> > >
> > > Thanks!
> > > -Kevin
> > >
> >
>
>
>
> --
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino
>


Re: KStreams.reduceByKey passing nulls to my Deserializer?

2016-06-10 Thread Avi Flax

> On Jun 10, 2016, at 14:24, Avi Flax  wrote:
> 
> Hi, I’m using Kafka Streams (0.10.0) with JRuby, most of my scripts/nodes are 
> working well at this point, except for one which is using reduceByKey.

Whoops, I should probably share my code as well!

Here’s the topology:


builder.stream(key_serde, val_serde, 'visit-update-events')
   .reduceByKey(-> (a, b) { a.merge b }, key_serde, val_serde, 
'intermediate-visits')
   .to(key_serde, val_serde, 'visits’)


This is using Ruby syntax, but hopefully it’s fairly readable. I’ve added it to 
this gist as well:
https://gist.github.com/aviflax/3428cdbaa18aca9bf0a958c6d5eac2bd

Thanks!
Avi

KStreams.reduceByKey passing nulls to my Deserializer?

2016-06-10 Thread Avi Flax
Hi, I’m using Kafka Streams (0.10.0) with JRuby, most of my scripts/nodes are 
working well at this point, except for one which is using reduceByKey.

This is the first time I’m trying to use the local state store so it’s possible 
there’s something misconfigured, I’m not sure. My config is pretty vanilla and 
minimal.

My debugging so far shows that reduceByKey is passing nil/null values to my 
Deserializer. I wasn’t expecting this and my Deserializer is currently raising 
exceptions in this case.

I guess I’d like to know — is this normal, expected behavior? If so, why, and 
what does it mean, and how am I meant to handle it?

If not, any idea why it might be happening?

My stack trace is pretty crazy due to JRuby (there’s probably a way to filter 
it better but I’m new to JRuby, sorry), but here are the most salient lines:

at RUBY.deserialize(uri:classloader:/lib/avro_utils/avro_kafka_serde.rb:31)
at 
org.jruby.internal.runtime.methods.MixedModeIRMethod.call(org/jruby/internal/runtime/methods/MixedModeIRMethod.java:126)
at 
KafkaAvroHashSerdes$$Deserializer_1394592798.deserialize(KafkaAvroHashSerdes$$Deserializer_1394592798.gen:13)
at 
org.apache.kafka.streams.state.StateSerdes.valueFrom(org/apache/kafka/streams/state/StateSerdes.java:156)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.get(org/apache/kafka/streams/state/internals/RocksDBStore.java:241)
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:100)
at 
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(org/apache/kafka/streams/kstream/internals/KStreamReduce.java:70)

You can see the entire thing here:
https://gist.github.com/aviflax/3428cdbaa18aca9bf0a958c6d5eac2bd

Thank you!
Avi

ELB for Kafka

2016-06-10 Thread Ram Chander
Hi,


Is it possible to have Kafka brokers behind ELB and producers and consumers
talk only to ELB ?
If not, should we directly expose all  brokers to all producers/consumers ?
Please advise.


Regards,
Ram


Re: dynamically changing peer role from voting to observer and vice versa

2016-06-10 Thread Nomar Morado
Anyone?

Printing e-mails wastes valuable natural resources. Please don't print this 
message unless it is absolutely necessary. Thank you for thinking green!

Sent from my iPhone

> On Jun 8, 2016, at 2:31 PM, Nomar Morado  wrote:
> 
> Is there any way in the current API to achieve this?
> 
> I am trying to set up some screen to be able to promote an observer to voting 
> member in case of a loss of quorum for instance.
> 
> Appreciate your help. 
> 
> 
> thanks,
> Nomar


RE: error: ... protocols are incompatible with those of existing members ??

2016-06-10 Thread Martin Gainty


> Date: Fri, 10 Jun 2016 16:54:47 +0530
> Subject: Re: error: ... protocols are incompatible with those of existing 
> members ??
> From: bkap...@memelet.com
> To: users@kafka.apache.org
> 
> I delete the group using kafka-consumer-groups.sh --delete and still I get
> the error.

MG>Barry...assuming this happens when you construct a request to 
KafkaServer..in your log you should see a message: MG>"Kafka request handler %d 
on broker %d handling request %s".format(id, brokerId, req)
MG>what are the id, brokerID and req contents of the message?MG>thanks  
  

KafkaHighLevel consumer in java returns topics which are removed before?

2016-06-10 Thread shahab
Hi,

I have strange problem. I have kafka 9 cluster of 3 nodes. I removed all
topics from zookepper, but when I query topics using Java High Level
consumer it returns all three removed kafka topics. But how this is
possible when I removed them manually, by going into zookeeper shell and
removing them, ?
My java code;

 props.put("bootstrap.servers", server:9092");

props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");


Map> topics =  new KafkaConsumer<>(props
).listTopics();

System.out.println (topics);



best,

Shahab


Re: error: ... protocols are incompatible with those of existing members ??

2016-06-10 Thread Dana Powers
Barry - i believe the error refers to the consumer group "protocol" that is
used to decide which partitions get assigned to which consumers. The way it
works is that each consumer says it wants to join X group and it can
support protocols (1, 2, 3...). The broker looks at all consumers in group
X and picks a protocol that all can support. If there is no common protocol
you would see this error. Example protocols are 'roundrobin' and 'range' .

You should check the configuration of the group protocol for each consumer
and also check that you don't have extra consumers in the group, perhaps
because the group id is reused / common.

Hope this helps,

-Dana
On Jun 10, 2016 4:24 AM, "Barry Kaplan"  wrote:

I delete the group using kafka-consumer-groups.sh --delete and still I get
the error.


Re: ELB for Kafka

2016-06-10 Thread Tom Crayford
Kafka itself handles distribution among brokers and which broker consumers
and producers connect to. There's no need for an ELB, and you have to
directly expose all brokers to producers and consumers.

On Friday, 10 June 2016, Ram Chander  wrote:

> Hi,
>
>
> I am trying to setup Kafka cluster in AWS.
> Is it possible to have Kafka brokers behind ELB and producers and consumers
> talk only to ELB ?
> If not, should we directly expose all  brokers to all producers/consumers ?
> Please advise.
>
>
> Regards,
> Ram
>


ELB for Kafka

2016-06-10 Thread Ram Chander
Hi,


I am trying to setup Kafka cluster in AWS.
Is it possible to have Kafka brokers behind ELB and producers and consumers
talk only to ELB ?
If not, should we directly expose all  brokers to all producers/consumers ?
Please advise.


Regards,
Ram


Regex topics in kafka connect?

2016-06-10 Thread Barry Kaplan
The kafka connect docs say

The Kafka Connect framework manages any changes to the Kafka input, such as
> when the set of input topics changes because of a regex subscription.


But I can find no other information how to define a wildcard/regex topic
subscription. I tried giving the config topic property a regex but that
resulted in a error. I am working with 0.9, could it be that this is only
for 0.10? In any case, I still can't find the docs for regex topics in 0.10
either.

-barry


Re: error: ... protocols are incompatible with those of existing members ??

2016-06-10 Thread Barry Kaplan
I delete the group using kafka-consumer-groups.sh --delete and still I get
the error.


Re: error: ... protocols are incompatible with those of existing members ??

2016-06-10 Thread Barry Kaplan
I didn't really expect this to help, but still, I tried deleting /all/
topics and recreating them. But still my connect app will no longer run due
to this error.

Does this error even have anything to do with persisted state, or is the
broker complaining about live client connections?


Re: JVM Optimizations

2016-06-10 Thread Barry Kaplan
Tom,

Thanks, that's very good to know. What kind of instances EC2 instances are
you using for your brokers?

-barry

On Fri, Jun 10, 2016 at 4:17 PM, Tom Crayford  wrote:

> Barry,
>
> No, because Kafka also relies heavily on the OS page cache, which uses
> memory. You'd roughly want to allocate enough page cache to hold all the
> messages for your consumers for say, 30s.
>
> Kafka also (in our experience on EC2) tends to run out of network far
> before it runs out of memory or disk bandwidth, so colocating brokers makes
> that much more likely.
>
> Thanks
>
> Tom Crayford, Heroku Kafka
>
> On Fri, Jun 10, 2016 at 7:02 AM, Barry Kaplan  wrote:
>
> > If too much heap cause problems, would it make sense to run multiple
> > brokers on a box with lots memory? For example, an EC2 D2 instance types
> > has way way more ram than kafka could ever use - -but it has fast
> connected
> > disks.
> >
> > Would running a broker per disk make sense in this case?
> >
> > -barry
> >
>


Re: JVM Optimizations

2016-06-10 Thread Tom Crayford
Barry,

No, because Kafka also relies heavily on the OS page cache, which uses
memory. You'd roughly want to allocate enough page cache to hold all the
messages for your consumers for say, 30s.

Kafka also (in our experience on EC2) tends to run out of network far
before it runs out of memory or disk bandwidth, so colocating brokers makes
that much more likely.

Thanks

Tom Crayford, Heroku Kafka

On Fri, Jun 10, 2016 at 7:02 AM, Barry Kaplan  wrote:

> If too much heap cause problems, would it make sense to run multiple
> brokers on a box with lots memory? For example, an EC2 D2 instance types
> has way way more ram than kafka could ever use - -but it has fast connected
> disks.
>
> Would running a broker per disk make sense in this case?
>
> -barry
>


error: ... protocols are incompatible with those of existing members ??

2016-06-10 Thread Barry Kaplan
I am getting this error:

Attempt to join group connect-elasticsearch-indexer failed due to: The
> group member's supported protocols are incompatible with those of existing
> members.


This is a single kafka-connect process consuming two topics. The brokers
have never changed, and the version of the kafka libraries have never
changed. What could cause this? I'm guessing this is uncommon as searches
really only yield the code that emits the exception.


Fwd: Monitor the lag for the consumers that are assigned to partitions topic

2016-06-10 Thread Spico Florin
Hello!
   I'm working with Kafka 0.9.1 new consumer API.
The consumer is manually assigned to a partition. For this consumer I would
like to see its progress (meaning the lag).
Since I added the group id consumer-tutorial as property, I assumed that I
can use the command

bin/kafka-consumer-groups.sh --new-consumer --describe --group
consumer-tutorial --bootstrap-server localhost:9092
(as explained here
*http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client
)*

Unfortunately, my group is not shown by the above command. Therefore I
cannot monitor the progress of my conusmer (its lag).



 How can I monitor the lag in the above described scenario (manually
assigned partition)?

I look forward for your answers.
Thanks,
 Florin

P.S. I'm using the below code for testing:




  Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer consumer = new KafkaConsumer<>(props);


*String topic = "my-topic";*
*TopicPartition topicPartition = new TopicPartition(topic, 0);*
*consumer.assign(Arrays.asList(topicPartition));*
consumer.seekToBeginning(topicPartition);
try {
  while (true) {
ConsumerRecords records = consumer.poll(1000);
for (ConsumerRecord record : records)
  System.out.println(record.offset() + ": " + record.value());
  consumer.commitSynch();
  }
} finally {
  consumer.close();
}