Re: Kafka with Zookeeper behind AWS ELB

2017-07-20 Thread Pradeep Gollakota
Luigi,

I strongly urge you to consider a 5 node ZK deployment. I've always done
that in the past for resiliency during maintenance. In a 3 node cluster,
you can only tolerate one "failure", so if you bring one node down for
maintenance and another node crashes during said maintenance, your ZK
cluster is down. All the deployments I've had were 5 nodes of ZK and 5
nodes of Kafka.

- Pradeep

On Thu, Jul 20, 2017 at 9:12 AM, Luigi Tagliamonte <
luigi.tagliamont...@gmail.com> wrote:

> Yes Andrey,
> you can use an ENI without EIP on AWS if you only want a private address.
>
> After some consideration, I think that growing the zookeeper cluster more
> than 3 nodes is really unlikely so I think that I will attach 3 ENI to 3
> servers in autoscaling and I will configure Kafka in using this 3 IPs.
> In this way I can get rid of the additional ELB/Haproxy layer, if I will
> ever need to grow the zk ensemble I will re-engineering the solution.
>
> I'm wondering if reusing an old IP on a brand new zk node will create
> issues in the ensemble.
> Is anybody here aware of possible drawbacks?
>
> On Wed, Jul 19, 2017 at 11:58 PM, Andrey Dyachkov <
> andrey.dyach...@gmail.com
> > wrote:
>
> > The problem with EIP it is a public ip.
> > Another option is to have the secondary interface attached to the
> instance
> > on start(or a bit later) with the private static ip, but we are
> > investigating the possibility.
> > On Wed 19. Jul 2017 at 23:38, Luigi Tagliamonte <
> > luigi.tagliamont...@gmail.com> wrote:
> >
> > > Hello Andrey,
> > > I see that the ELB is not going to help directly with the bug, but
> > > introduces a nice layer that makes zookeeper DNS management easier.
> > > Introducing and ELB I don't have to deal with keep DNS in sync for all
> > the
> > > servers in the zk ensemble.
> > > For the moment I can use an HAproxy with EIP and when the bug is
> solved I
> > > can move to ELB.
> > > What do you think about it?
> > > Regards
> > > L.
> > >
> > > On Wed, Jul 19, 2017 at 2:16 PM, Andrey Dyachkov <
> > > andrey.dyach...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > > I have just posted almost the same question in dev list.
> > > > Zookeeper client resolves address only once, on start, introducing
> ELB
> > > > won't really help here (ELBs can be replaced, which involved ip
> > change),
> > > > but I am eager to know if there is a solution for that.
> > > >
> > > > On Wed, 19 Jul 2017 at 23:08 Luigi Tagliamonte <
> > > > luigi.tagliamont...@gmail.com> wrote:
> > > >
> > > > > Hello, Users!
> > > > > I'm designing a Kafka deployment on AWS and it's my first time
> > working
> > > > with
> > > > > Kafka and Zookeeper so I've collected a lot of info so far but also
> > > some
> > > > > questions that I would love to submit to a much expert audience
> like
> > > you.
> > > > >
> > > > > I have been experimenting with exhibitor and zookeeper in auto
> > scaling
> > > > > group and the exhibitor orchestration seems to work so far.
> > > > >
> > > > > I was trying to find a way to configure zookeeper servers in Kafka
> > conf
> > > > and
> > > > > do not have to reconfigure them in case a zookeeper node needs to
> be
> > > > > replaced/dies, so i taught of course of using DNS but then I read
> > that
> > > > > zkclient library used by Kafka has this bug:
> > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-2184.
> > > > >
> > > > > So I'm now thinking about using an ELB in front of the zookeeper
> > > cluster.
> > > > > Teorically on how zookeeper client should work there should be no
> > > problem
> > > > > but I'm wondering if any of you used that and how is the outcome?
> > > > >
> > > > --
> > > >
> > > > With great enthusiasm,
> > > > Andrey
> > > >
> > >
> > --
> >
> > With great enthusiasm,
> > Andrey
> >
>


Re: Scaling up kafka consumers

2017-02-24 Thread Pradeep Gollakota
A single partition can be consumed by at most a single consumer. Consumers
compete to take ownership of a partition. So, in order to gain parallelism
you need to add more partitions.

There is a library that allows multiple consumers to consume from a single
partition https://github.com/gerritjvv/kafka-fast. But I've never used it.

On Fri, Feb 24, 2017 at 7:30 AM, Jakub Stransky 
wrote:

> Hello everyone,
>
> I was reading/checking kafka documentation regarding point-2-point and
> publish subscribe communications patterns in kafka and I am wondering how
> to scale up consumer side in point to point scenario when consuming from
> single kafka topic.
>
> Let say I have a single topic with single partition and I have one node
> where the kafka consumer is running. If I want to scale up my service I add
> another node - which has the same configuration as the first one (topic,
> partition and consumer group id). Those two nodes start competing for
> messages from kafka topic.
>
> What I am not sure in this scenario and is actually subject of my question
> is "*Whether they do get each node unique messages or there is still
> possibility that some messages will be consumed by both nodes etc*".
> Because I can see scenarios that both nodes are started at the same time -
> they gets the same topic offset from zookeeper and started consuming
> messages from that offset. OR am I thinking in a wrong direction?
>
> Thanks
> Jakub
>


Re: How does one deploy to consumers without causing re-balancing for real time use case?

2017-02-10 Thread Pradeep Gollakota
I believe if you're calling the .close() method on shutdown, then the
LeaveGroupRequest will be made. If you're doing a kill -9, I'm not sure if
that request will be made.

On Fri, Feb 10, 2017 at 8:47 AM, Praveen <praveev...@gmail.com> wrote:

> @Pradeep - I just read your thread, the 1hr pause was when all the
> consumers where shutdown simultaneously.  I'm testing out rolling restart
> to get the actual numbers. The initial numbers are promising.
>
> `STOP (1) (1min later kicks off) -> REBALANCE -> START (1) -> REBALANCE
> (takes 1min to get a partition)`
>
> In your thread, Ewen says -
>
> "The LeaveGroupRequest is only sent on a graceful shutdown. If a
> consumer knows it is going to
> shutdown, it is good to proactively make sure the group knows it needs to
> rebalance work because some of the partitions that were handled by the
> consumer need to be handled by some other group members."
>
> So does this mean that the consumer should inform the group ahead of
> time before it goes down? Currently, I just shutdown the process.
>
>
> On Fri, Feb 10, 2017 at 8:35 AM, Pradeep Gollakota <pradeep...@gmail.com>
> wrote:
>
> > I asked a similar question a while ago. There doesn't appear to be a way
> to
> > not triggering the rebalance. But I'm not sure why it would be taking >
> 1hr
> > in your case. For us it was pretty fast.
> >
> > https://www.mail-archive.com/users@kafka.apache.org/msg23925.html
> >
> >
> >
> > On Fri, Feb 10, 2017 at 4:28 AM, Krzysztof Lesniewski, Nexiot AG <
> > krzysztof.lesniew...@nexiot.ch> wrote:
> >
> > > Would be great to get some input on it.
> > >
> > > - Krzysztof Lesniewski
> > >
> > >
> > > On 06.02.2017 08:27, Praveen wrote:
> > >
> > >> I have a 16 broker kafka cluster. There is a topic with 32 partitions
> > >> containing real time data and on the other side, I have 32 boxes w/ 1
> > >> consumer reading from these partitions.
> > >>
> > >> Today our deployment strategy is stop, deploy and start the processes
> on
> > >> all the 32 consumers. This triggers re-balancing and takes a long
> period
> > >> of
> > >> time (> 1hr). Such a long pause isn't good for real time processing.
> > >>
> > >> I was thinking of rolling deploy but I think that will still cause
> > >> re-balancing b/c we will still have consumers go down and come up.
> > >>
> > >> How do you deploy to consumers without triggering re-balancing (or
> > >> triggering one that doesn't affect your SLA) when doing real time
> > >> processing?
> > >>
> > >> Thanks,
> > >> Praveen
> > >>
> > >>
> > >
> >
>


Re: How does one deploy to consumers without causing re-balancing for real time use case?

2017-02-10 Thread Pradeep Gollakota
I asked a similar question a while ago. There doesn't appear to be a way to
not triggering the rebalance. But I'm not sure why it would be taking > 1hr
in your case. For us it was pretty fast.

https://www.mail-archive.com/users@kafka.apache.org/msg23925.html



On Fri, Feb 10, 2017 at 4:28 AM, Krzysztof Lesniewski, Nexiot AG <
krzysztof.lesniew...@nexiot.ch> wrote:

> Would be great to get some input on it.
>
> - Krzysztof Lesniewski
>
>
> On 06.02.2017 08:27, Praveen wrote:
>
>> I have a 16 broker kafka cluster. There is a topic with 32 partitions
>> containing real time data and on the other side, I have 32 boxes w/ 1
>> consumer reading from these partitions.
>>
>> Today our deployment strategy is stop, deploy and start the processes on
>> all the 32 consumers. This triggers re-balancing and takes a long period
>> of
>> time (> 1hr). Such a long pause isn't good for real time processing.
>>
>> I was thinking of rolling deploy but I think that will still cause
>> re-balancing b/c we will still have consumers go down and come up.
>>
>> How do you deploy to consumers without triggering re-balancing (or
>> triggering one that doesn't affect your SLA) when doing real time
>> processing?
>>
>> Thanks,
>> Praveen
>>
>>
>


Re: Consumer Rebalancing Question

2017-01-06 Thread Pradeep Gollakota
What I mean by "flapping" in this context is unnecessary rebalancing
happening. The example I would give is what a Hadoop Datanode would do in
case of a shutdown. By default, it will wait 10 minutes before replicating
the blocks owned by the Datanode so routine maintenance wouldn't cause
unnecessary shuffling of blocks.

In this context, if I'm performing a rolling restart, as soon as worker 1
shuts down, it's work is picked up by other workers. But worker 1 comes
back 3 seconds (or whatever) later and requests the work back. Then worker
2 goes down and it's work is assigned to other workers for 3 seconds before
yet another rebalance. So, in theory, the order of operations will look
something like this:

STOP (1) -> REBALANCE -> START (1) -> REBALANCE -> STOP (2) -> REBALANCE ->
START (2) -> REBALANCE -> 

>From what I understand, there's currently no way to prevent this type of
shuffling of partitions from worker to worker while the consumers are under
maintenance. I'm also not sure if this an issue I don't need to worry about.

- Pradeep

On Thu, Jan 5, 2017 at 8:29 PM, Ewen Cheslack-Postava <e...@confluent.io>
wrote:

> Not sure I understand your question about flapping. The LeaveGroupRequest
> is only sent on a graceful shutdown. If a consumer knows it is going to
> shutdown, it is good to proactively make sure the group knows it needs to
> rebalance work because some of the partitions that were handled by the
> consumer need to be handled by some other group members.
>
> There's no "flapping" in the sense that the leave group requests should
> just inform the other members that they need to take over some of the work.
> I would normally think of "flapping" as meaning that things start/stop
> unnecessarily. In this case, *someone* needs to deal with the rebalance and
> pick up the work being dropped by the worker. There's no flapping because
> it's a one-time event -- one worker is shutting down, decides to drop the
> work, and a rebalance sorts it out and reassigns it to another member of
> the group. This happens once and then the "issue" is resolved without any
> additional interruptions.
>
> -Ewen
>
> On Thu, Jan 5, 2017 at 3:01 PM, Pradeep Gollakota <pradeep...@gmail.com>
> wrote:
>
> > I see... doesn't that cause flapping though?
> >
> > On Wed, Jan 4, 2017 at 8:22 PM, Ewen Cheslack-Postava <e...@confluent.io
> >
> > wrote:
> >
> > > The coordinator will immediately move the group into a rebalance if it
> > > needs it. The reason LeaveGroupRequest was added was to avoid having to
> > > wait for the session timeout before completing a rebalance. So aside
> from
> > > the latency of cleanup/committing offests/rejoining after a heartbeat,
> > > rolling bounces should be fast for consumer groups.
> > >
> > > -Ewen
> > >
> > > On Wed, Jan 4, 2017 at 5:19 PM, Pradeep Gollakota <
> pradeep...@gmail.com>
> > > wrote:
> > >
> > > > Hi Kafka folks!
> > > >
> > > > When a consumer is closed, it will issue a LeaveGroupRequest. Does
> > anyone
> > > > know how long the coordinator waits before reassigning the partitions
> > > that
> > > > were assigned to the leaving consumer to a new consumer? I ask
> because
> > > I'm
> > > > trying to understand the behavior of consumers if you're doing a
> > rolling
> > > > restart.
> > > >
> > > > Thanks!
> > > > Pradeep
> > > >
> > >
> >
>


Consumer Rebalancing Question

2017-01-04 Thread Pradeep Gollakota
Hi Kafka folks!

When a consumer is closed, it will issue a LeaveGroupRequest. Does anyone
know how long the coordinator waits before reassigning the partitions that
were assigned to the leaving consumer to a new consumer? I ask because I'm
trying to understand the behavior of consumers if you're doing a rolling
restart.

Thanks!
Pradeep


Re: kafka + autoscaling groups fuckery

2016-06-28 Thread Pradeep Gollakota
Just out of curiosity, if you guys are in AWS for everything, why not use
Kinesis?

On Tue, Jun 28, 2016 at 3:49 PM, Charity Majors  wrote:

> Hi there,
>
> I just finished implementing kafka + autoscaling groups in a way that made
> sense to me.  I have a _lot_ of experience with ASGs and various storage
> types but I'm a kafka noob (about 4-5 months of using in development and
> staging and pre-launch production).
>
> It seems to be working fine from the Kafka POV but causing troubling side
> effects elsewhere that I don't understand.  I don't know enough about Kafka
> to know if my implementation is just fundamentally flawed for some reason,
> or if so how and why.
>
> My process is basically this:
>
> - *Terminate a node*, or increment the size of the ASG by one.  (I'm not
> doing any graceful shutdowns because I don't want to rely on graceful
> shutdowns, and I'm not attempting to act upon more than one node at a
> time.  Planning on doing a ZK lock or something later to enforce one
> process at a time, if I can work the major kinks out.)
>
> - *Firstboot script,* which runs on all hosts from rc.init.  (We run ASGs
> for *everything.)  It infers things like the chef role, environment,
> cluster name, etc, registers DNS, bootstraps and runs chef-client, etc.
> For storage nodes, it formats and mounts a PIOPS volume under the right
> mount point, or just remounts the volume if it already contains data.  Etc.
>
> - *Run a balancing script from firstboot* on kafka nodes.  It checks to
> see how many brokers there are and what their ids are, and checks for any
> underbalanced partitions with less than 3 ISRs.  Then we generate a new
> assignment file for rebalancing partitions, and execute it.  We watch on
> the host for all the partitions to finish rebalancing, then complete.
>
> *- So far so good*.  I have repeatedly killed kafka nodes and had them
> come up, rebalance the cluster, and everything on the kafka side looks
> healthy.  All the partitions have the correct number of ISRs, etc.
>
> But after doing this, we have repeatedly gotten into a state where
> consumers that are pulling off the kafka partitions enter a weird state
> where their last known offset is *ahead* of the last known offset for that
> partition, and we can't recover from it.
>
> *A example.*  Last night I terminated ... I think it was broker 1002 or
> 1005, and it came back up as broker 1009.  It rebalanced on boot,
> everything looked good from the kafka side.  This morning we noticed that
> the storage node that maps to partition 5 has been broken for like 22
> hours, it thinks the next offset is too far ahead / out of bounds so
> stopped consuming.  This happened shortly after broker 1009 came online and
> the consumer caught up.
>
> From the storage node log:
>
> time="2016-06-28T21:51:48.286035635Z" level=info msg="Serving at
> 0.0.0.0:8089..."
> time="2016-06-28T21:51:48.293946529Z" level=error msg="Error creating
> consumer" error="kafka server: The requested offset is outside the range of
> offsets maintained by the server for the given topic/partition."
> time="2016-06-28T21:51:48.294532365Z" level=error msg="Failed to start
> services: kafka server: The requested offset is outside the range of
> offsets maintained by the server for the given topic/partition."
> time="2016-06-28T21:51:48.29461156Z" level=info msg="Shutting down..."
>
> From the mysql mapping of partitions to storage nodes/statuses:
>
> PRODUCTION ubuntu@retriever-112c6d8d:/srv/hound/retriever/log$
> hound-kennel
>
> Listing by default. Use -action  setstate, addslot, removeslot, removenode> for other actions
>
> PartStatus  Last UpdatedHostname
> 0   live2016-06-28 22:29:10 + UTC   retriever-772045ec
> 1   live2016-06-28 22:29:29 + UTC   retriever-75e0e4f2
> 2   live2016-06-28 22:29:25 + UTC   retriever-78804480
> 3   live2016-06-28 22:30:01 + UTC   retriever-c0da5f85
> 4   live2016-06-28 22:29:42 + UTC   retriever-122c6d8e
> 5   2016-06-28 21:53:48 + UTC
>
>
> PRODUCTION ubuntu@retriever-112c6d8d:/srv/hound/retriever/log$
> hound-kennel -partition 5 -action nextoffset
>
> Next offset for partition 5: 12040353
>
>
> Interestingly, the primary for partition 5 is 1004, and its follower is
> the new node 1009.  (Partition 2 has 1009 as its leader and 1004 as its
> follower, and seems just fine.)
>
> I've attached all the kafka logs for the broker 1009 node since it
> launched yesterday.
>
> I guess my main question is: *Is there something I am fundamentally
> missing about the kafka model that makes it it not play well with
> autoscaling?*  I see a couple of other people on the internet talking
> about using ASGs with kafka, but always in the context of maintaining a
> list of broker ids and reusing them.
>
> *I don't want to do that.  I want the path 

Re: Datacenter to datacenter over the open internet

2015-10-06 Thread Pradeep Gollakota
At Lithium, we have multiple datacenters and we distcp our data across our
Hadoop clusters. We have 2 DCs in NA and 1 in EU. We have a non-redundant
direct connect from our EU cluster to one of our NA DCs. If and when this
fails, we have automatic failover to a VPN that goes over the internet. The
amount of data thats moving across the clusters is not much, so we can get
away with this. We don't have Kafka replication setup yet, but we will be
setting it up using Mirror Maker and the same constraints apply.

Of course opening up your Kafka cluster to be reachable by the internet
would work too, but IMHO a VPN is more secure and reduces the surface area
of your infrastructure that could come under attack. It sucks that you
can't get your executives on board for a p2p direct connect as that is the
best solution.

On Tue, Oct 6, 2015 at 5:48 PM, Gwen Shapira  wrote:

> You can configure "advertised.host.name" for each broker, which is the
> name
> external consumers and producers will use to refer to the brokers.
>
> On Tue, Oct 6, 2015 at 3:31 PM, Tom Brown  wrote:
>
> > Hello,
> >
> > How do you consume a kafka topic from a remote location without a
> dedicated
> > connection? How do you protect the server?
> >
> > The setup: data streams into our datacenter. We process it, and publish
> it
> > to a kafka cluster. The consumer is located in a different datacenter
> with
> > no direct connection. The most efficient scenario would be to setup a
> > point-to-point link but that idea has no traction with our executives. We
> > can setup a VPN; While functional, our IT department assures us that it
> > won't be able to scale.
> >
> > What we're currently planning is to expose the kafka cluster IP addresses
> > to the internet, and only allow access via firewall. Each message will be
> > encrypted with a shared private key, so we're not worried about messages
> > being intercepted. What we are worried about is this: how brokers refer
> to
> > each other-- when a broker directs the consumer to the server that is in
> > charge of a particular region, does it use the host name (that could be
> > externally mapped to the public IP) or does it use the detected/private
> IP
> > address.
> >
> > What solution would you use to consume a remote cluster?
> >
> > --Tom
> >
>


Re: Dealing with large messages

2015-10-06 Thread Pradeep Gollakota
Thanks for the replies!

I was rather hoping not to have to implement a side channel solution. :/

If we have to do this, we may use an HBase table with a TTL the same as our
topic so the large objects are "gc'ed"... thoughts?

On Tue, Oct 6, 2015 at 8:45 AM, Gwen Shapira <g...@confluent.io> wrote:

> Storing large blobs in S3 or HDFS and placing URIs in Kafka is the most
> common solution I've seen in use.
>
> On Tue, Oct 6, 2015 at 8:32 AM, Joel Koshy <jjkosh...@gmail.com> wrote:
>
> > The best practice I think is to just put large objects in a blob store
> > and have messages embed references to those blobs. Interestingly we
> > ended up having to implement large-message-support at LinkedIn but for
> > various reasons were forced to put messages inline (i.e., against the
> > above recommendation). So we ended up having to break up large
> > messages into smaller chunks. This obviously adds considerable
> > complexity to the consumer since the checkpointing can become pretty
> > complicated. There are other nuances as well - we can probably do a
> > short talk on this at an upcoming meetup.
> >
> > Joel
> >
> >
> > On Mon, Oct 5, 2015 at 9:31 PM, Rahul Jain <rahul...@gmail.com> wrote:
> > > In addition to the config changes mentioned in that post, you may also
> > have
> > > to change producer config if you are using the new producer.
> > >
> > > Specifically, *max.request.size* and *request.timeout.ms
> > > <http://request.timeout.ms>* have to be increased to allow the
> producer
> > to
> > > send large messages.
> > >
> > >
> > > On 6 Oct 2015 02:02, "James Cheng" <jch...@tivo.com> wrote:
> > >
> > >> Here’s an article that Gwen wrote earlier this year on handling large
> > >> messages in Kafka.
> > >>
> > >> http://ingest.tips/2015/01/21/handling-large-messages-kafka/
> > >>
> > >> -James
> > >>
> > >> > On Oct 5, 2015, at 11:20 AM, Pradeep Gollakota <
> pradeep...@gmail.com>
> > >> wrote:
> > >> >
> > >> > Fellow Kafkaers,
> > >> >
> > >> > We have a pretty heavyweight legacy event logging system for batch
> > >> > processing. We're now sending the events into Kafka now for realtime
> > >> > analytics. But we have some pretty large messages (> 40 MB).
> > >> >
> > >> > I'm wondering if any of you have use cases where you have to send
> > large
> > >> > messages to Kafka and how you're dealing with them.
> > >> >
> > >> > Thanks,
> > >> > Pradeep
> > >>
> > >>
> > >> 
> > >>
> > >> This email and any attachments may contain confidential and privileged
> > >> material for the sole use of the intended recipient. Any review,
> > copying,
> > >> or distribution of this email (or any attachments) by others is
> > prohibited.
> > >> If you are not the intended recipient, please contact the sender
> > >> immediately and permanently delete this email and any attachments. No
> > >> employee or agent of TiVo Inc. is authorized to conclude any binding
> > >> agreement on behalf of TiVo Inc. by email. Binding agreements with
> TiVo
> > >> Inc. may only be made by a signed written agreement.
> > >>
> >
>


Dealing with large messages

2015-10-05 Thread Pradeep Gollakota
Fellow Kafkaers,

We have a pretty heavyweight legacy event logging system for batch
processing. We're now sending the events into Kafka now for realtime
analytics. But we have some pretty large messages (> 40 MB).

I'm wondering if any of you have use cases where you have to send large
messages to Kafka and how you're dealing with them.

Thanks,
Pradeep


Re: number of topics given many consumers and groups within the data

2015-09-30 Thread Pradeep Gollakota
To add a little more context to Shaun's question, we have around 400
customers. Each customer has a stream of events. Some customers generate a
lot of data while others don't. We need to ensure that each customer's data
is sorted globally by timestamp.

We have two use cases around consumption:

1. A user may consume an individual customers data
2. A user may consume data for all customers

Given these two use cases, I think the better strategy is to have a
separate topic per customer as Todd suggested.

On Wed, Sep 30, 2015 at 9:26 AM, Todd Palino  wrote:

> So I disagree with the idea to use custom partitioning, depending on your
> requirements. Having a consumer consume from a single partition is not
> (currently) that easy. If you don't care which consumer gets which
> partition (group), then it's not that bad. You have 20 partitions, you have
> 20 consumers, and you use custom partitioning as noted. The consumers use
> the high level consumer with a single group, each one will get one
> partition each, and it's pretty straightforward. If a consumer crashes, you
> will end up with two partitions on one of the remaining consumers. If this
> is OK, this is a decent solution.
>
> If, however, you require that each consumer always have the same group of
> data, and you need to know what that group is beforehand, it's more
> difficult. You need to use the simple consumer to do it, which means you
> need to implement a lot of logic for error and status code handling
> yourself, and do it right. In this case, I think your idea of using 400
> separate topics is sound. This way you can still use the high level
> consumer, which takes care of the error handling for you, and your data is
> separated out by topic.
>
> Provided it is not an issue to implement it in your producer, I would go
> with the separate topics. Alternately, if you're not sure you always want
> separate topics, you could go with something similar to your second idea,
> but have a consumer read the single topic and split the data out into 400
> separate topics in Kafka (no need for Cassandra or Redis or anything else).
> Then your real consumers can all consume their separate topics. Reading and
> writing the data one extra time is much better than rereading all of it 400
> times and throwing most of it away.
>
> -Todd
>
>
> On Wed, Sep 30, 2015 at 9:06 AM, Ben Stopford  wrote:
>
> > Hi Shaun
> >
> > You might consider using a custom partition assignment strategy to push
> > your different “groups" to different partitions. This would allow you
> walk
> > the middle ground between "all consumers consume everything” and “one
> topic
> > per consumer” as you vary the number of partitions in the topic, albeit
> at
> > the cost of a little extra complexity.
> >
> > Also, not sure if you’ve seen it but there is quite a good section in the
> > FAQ here <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowmanytopicscanIhave
> ?>
> > on topic and partition sizing.
> >
> > B
> >
> > > On 29 Sep 2015, at 18:48, Shaun Senecal 
> > wrote:
> > >
> > > Hi
> > >
> > >
> > > I heave read Jay Kreps post regarding the number of topics that can be
> > handled by a broker (
> > https://www.quora.com/How-many-topics-can-be-created-in-Apache-Kafka),
> > and it has left me with more questions that I dont see answered anywhere
> > else.
> > >
> > >
> > > We have a data stream which will be consumed by many consumers (~400).
> > We also have many "groups" within our data.  A group in the data
> > corresponds 1:1 with what the consumers would consume, so consumer A only
> > ever see group A messages, consumer B only consumes group B messages,
> etc.
> > >
> > >
> > > The downstream consumers will be consuming via a websocket API, so the
> > API server will be the thing consuming from kafka.
> > >
> > >
> > > If I use a single topic with, say, 20 partitions, the consumers in the
> > API server would need to re-read the same messages over and over for each
> > consumer, which seems like a waste of network and a potential bottleneck.
> > >
> > >
> > > Alternatively, I could use a single topic with 20 partitions and have a
> > single consumer in the API put the messages into cassandra/redis (as
> > suggested by Jay), and serve out the downstream consumer streams that
> way.
> > However, that requires using a secondary sorted storage, which seems
> like a
> > waste (and added complexity) given that Kafka already has the data
> exactly
> > as I need it.  Especially if cassandra/redis are required to maintain a
> > long TTL on the stream.
> > >
> > >
> > > Finally, I could use 1 topic per group, each with a single partition.
> > This would result in 400 topics on the broker, but would allow the API
> > server to simply serve the stream for each consumer directly from kafka
> and
> > wont require additional machinery to serve out the requests.
> > >
> > >
> > > The 400 topic solution makes the 

Re: integrate Camus and Hive?

2015-03-09 Thread Pradeep Gollakota
If I understood your question correctly, you want to be able to read the
output of Camus in Hive and be able to know partition values. If my
understanding is right, you can do so by using the following.

Hive provides the ability to provide custom patterns for partitions. You
can use this in combination with MSCK REPAIR TABLE to automatically detect
and load the partitions into the metastore.

Take a look at this SO
http://stackoverflow.com/questions/24289571/hive-0-13-external-table-dynamic-partitioning-custom-pattern

Does that help?


On Mon, Mar 9, 2015 at 1:42 PM, Yang tedd...@gmail.com wrote:

 I believe many users like us would export the output from camus as a hive
 external table. but the dir structure of camus is like
 //MM/DD/xx

 while hive generally expects /year=/month=MM/day=DD/xx if you
 define that table to be
 partitioned by (year, month, day). otherwise you'd have to add those
 partitions created by camus through a separate command. but in the latter
 case, would a camus job create 1 partitions ? how would we find out the
 /MM/DD values from outside ?  well you could always do something by
 hadoop dfs -ls and then grep the output, but it's kind of not clean


 thanks
 yang



Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 3

2015-02-03 Thread Pradeep Gollakota
Lithium Technologies would love to host you guys for a release party in SF
if you guys want.

:)

On Tue, Feb 3, 2015 at 11:04 AM, Gwen Shapira gshap...@cloudera.com wrote:

 When's the party?
 :)

 On Mon, Feb 2, 2015 at 8:13 PM, Jay Kreps jay.kr...@gmail.com wrote:
  Yay!
 
  -Jay
 
  On Mon, Feb 2, 2015 at 2:23 PM, Neha Narkhede n...@confluent.io wrote:
 
  Great! Thanks Jun for helping with the release and everyone involved for
  your contributions.
 
  On Mon, Feb 2, 2015 at 1:32 PM, Joe Stein joe.st...@stealth.ly wrote:
 
   Huzzah!
  
   Thanks Jun for preparing the release candidates and getting this out
 to
   the
   community.
  
   - Joe Stein
  
   On Mon, Feb 2, 2015 at 2:27 PM, Jun Rao j...@confluent.io wrote:
  
The following are the results of the votes.
   
+1 binding = 3 votes
+1 non-binding = 1 votes
-1 = 0 votes
0 = 0 votes
   
The vote passes.
   
I will release artifacts to maven central, update the dist svn and
   download
site. Will send out an announce after that.
   
Thanks everyone that contributed to the work in 0.8.2.0!
   
Jun
   
On Wed, Jan 28, 2015 at 9:22 PM, Jun Rao j...@confluent.io wrote:
   
This is the third candidate for release of Apache Kafka 0.8.2.0.
   
Release Notes for the 0.8.2.0 release
   
   
  
  
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.html
   
*** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
   
Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
(SHA256) checksum.
   
* Release artifacts to be voted upon (source and binary):
https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
   
* Maven artifacts to be voted upon prior to release:
https://repository.apache.org/content/groups/staging/
   
* scala-doc
   
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
   
* java-doc
   
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
   
* The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0
 tag
   
   
  
  
 https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0dab378cc411f4938a9cea1eb7ea
(commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
   
/***
   
Thanks,
   
Jun
   
   
 --
You received this message because you are subscribed to the Google
Groups
kafka-clients group.
To unsubscribe from this group and stop receiving emails from it,
 send
an
email to kafka-clients+unsubscr...@googlegroups.com.
To post to this group, send email to kafka-clie...@googlegroups.com
 .
Visit this group at http://groups.google.com/group/kafka-clients.
To view this discussion on the web visit
   
  
  
 https://groups.google.com/d/msgid/kafka-clients/CAFc58G-XRpw9ik35%2BCmsYm-uc2hjHet6fOpw4bF90ka9Z%3DhH%3Dw%40mail.gmail.com

  
  
 https://groups.google.com/d/msgid/kafka-clients/CAFc58G-XRpw9ik35%2BCmsYm-uc2hjHet6fOpw4bF90ka9Z%3DhH%3Dw%40mail.gmail.com?utm_medium=emailutm_source=footer
   
.
   
For more options, visit https://groups.google.com/d/optout.
   
  
 
 
 
  --
  Thanks,
  Neha
 
 
  --
  You received this message because you are subscribed to the Google Groups
  kafka-clients group.
  To unsubscribe from this group and stop receiving emails from it, send an
  email to kafka-clients+unsubscr...@googlegroups.com.
  To post to this group, send email to kafka-clie...@googlegroups.com.
  Visit this group at http://groups.google.com/group/kafka-clients.
  To view this discussion on the web visit
 
 https://groups.google.com/d/msgid/kafka-clients/CAOeJiJjkYXyK_3qxJYpchG%2B_-c1Jt6K_skT_1geP%3DEJXV5w9uQ%40mail.gmail.com
 .
 
  For more options, visit https://groups.google.com/d/optout.



Re: New Producer - ONLY sync mode?

2015-02-02 Thread Pradeep Gollakota
This is a great question Otis. Like Gwen said, you can accomplish Sync mode
by setting the batch size to 1. But this does highlight a shortcoming of
the new producer API.

I really like the design of the new API and it has really great properties
and I'm enjoying working with it. However, once API that I think we're
lacking is a batch API. Currently, I have to iterate over a batch and
call .send() on each record, which returns n callbacks instead of 1
callback for the whole batch. This significantly complicates recovery logic
where we need to commit a batch as opposed 1 record at a time.

Do you guys have any plans to add better semantics around batches?

On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira gshap...@cloudera.com wrote:

 If I understood the code and Jay correctly - if you wait for the
 future it will be a similar delay to that of the old sync producer.

 Put another way, if you test it out and see longer delays than the
 sync producer had, we need to find out why and fix it.

 Gwen

 On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
 otis.gospodne...@gmail.com wrote:
  Hi,
 
  Nope, unfortunately it can't do that.  X is a remote app, doesn't listen
 to
  anything external, calls Y via HTTPS.  So X has to decide what to do with
  its data based on Y's synchronous response.  It has to block until Y
  responds.  And it wouldn't be pretty, I think, because nobody wants to
 run
  apps that talk to remove servers and hang on to connections more than
 they
  have to.  But perhaps that is the only way?  Or maybe the answer to I'm
  guessing the delay would be more or less the same as if the Producer was
  using SYNC mode? is YES, in which case the connection from X to Y would
 be
  open for just as long as with a SYNC producer running in Y?
 
  Thanks,
  Otis
  --
  Monitoring * Alerting * Anomaly Detection * Centralized Log Management
  Solr  Elasticsearch Support * http://sematext.com/
 
 
  On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  Can Y have a callback that will handle the notification to X?
  In this case, perhaps Y can be async and X can buffer the data until
  the callback triggers and says all good (or resend if the callback
  indicates an error)
 
  On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
  otis.gospodne...@gmail.com wrote:
   Hi,
  
   Thanks for the info.  Here's the use case.  We have something up
 stream
   sending data, say a log shipper called X.  It sends it to some remote
   component Y.  Y is the Kafka Producer and it puts data into Kafka.
 But Y
   needs to send a reply to X and tell it whether it successfully put all
  its
   data into Kafka.  If it did not, Y wants to tell X to buffer data
 locally
   and resend it later.
  
   If producer is ONLY async, Y can't easily do that.  Or maybe Y would
 just
   need to wait for the Future to come back and only then send the
 response
   back to X?  If so, I'm guessing the delay would be more or less the
 same
  as
   if the Producer was using SYNC mode?
  
   Thanks,
   Otis
   --
   Monitoring * Alerting * Anomaly Detection * Centralized Log Management
   Solr  Elasticsearch Support * http://sematext.com/
  
  
   On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps jay.kr...@gmail.com
 wrote:
  
   Yeah as Gwen says there is no sync/async mode anymore. There is a new
   configuration which does a lot of what async did in terms of allowing
   batching:
  
   batch.size - This is the target amount of data per partition the
 server
   will attempt to batch together.
   linger.ms - This is the time the producer will wait for more data
 to be
   sent to better batch up writes. The default is 0 (send immediately).
 So
  if
   you set this to 50 ms the client will send immediately if it has
 already
   filled up its batch, otherwise it will wait to accumulate the number
 of
   bytes given by batch.size.
  
   To send asynchronously you do
  producer.send(record)
   whereas to block on a response you do
  producer.send(record).get();
   which will wait for acknowledgement from the server.
  
   One advantage of this model is that the client will do it's best to
  batch
   under the covers even if linger.ms=0. It will do this by batching
 any
  data
   that arrives while another send is in progress into a single
   request--giving a kind of group commit effect.
  
   The hope is that this will be both simpler to understand (a single
 api
  that
   always works the same) and more powerful (you always get a response
 with
   error and offset information whether or not you choose to use it).
  
   -Jay
  
  
   On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira gshap...@cloudera.com
 
   wrote:
  
If you want to emulate the old sync producer behavior, you need to
 set
the batch size to 1  (in producer config) and wait on the future
 you
get from Send (i.e. future.get)
   
I can't think of good reasons to do so, though.
   
Gwen
   
   
On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic

Re: New Producer - ONLY sync mode?

2015-02-02 Thread Pradeep Gollakota
I looked at the newly added batch API to Kinesis for inspiration. The
response on the batch put is a list of message-ids and their status (offset
if success else a failure code).

Ideally, I think the server should fail the entire batch or succeed the
entire batch (i.e. no duplicates), but this is pretty hard to implement.
Given that, what Kinesis did is probably good compromise (perhaps while we
wait for exactly once semantics :))

In addition, perhaps adding a flush() method to the producer to allow for
control over when flushes happen might be another good starting point. With
the addition of a flush, it's easier to implement a SyncProducer by doing
something like, flush() - n x send() - flush(). This doesn't guarantee
that a particular batch isn't broken into two, but with sane batch sizes
and sane record sizes, we can assume the guarantee.

On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira gshap...@cloudera.com wrote:

 I've been thinking about that too, since both Flume and Sqoop rely on
 send(List) API of the old API.

 I'd like to see this API come back, but I'm debating how we'd handle
 errors. IIRC, the old API would fail an entire batch on a single
 error, which can lead to duplicates. Having N callbacks lets me retry
 / save / whatever just the messages that had issues.

 If messages had identifiers from the producer side, we could have the
 API call the callback with a list of message-ids and their status. But
 they don't :)

 Any thoughts on how you'd like it to work?

 Gwen


 On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota pradeep...@gmail.com
 wrote:
  This is a great question Otis. Like Gwen said, you can accomplish Sync
 mode
  by setting the batch size to 1. But this does highlight a shortcoming of
  the new producer API.
 
  I really like the design of the new API and it has really great
 properties
  and I'm enjoying working with it. However, once API that I think we're
  lacking is a batch API. Currently, I have to iterate over a batch and
  call .send() on each record, which returns n callbacks instead of 1
  callback for the whole batch. This significantly complicates recovery
 logic
  where we need to commit a batch as opposed 1 record at a time.
 
  Do you guys have any plans to add better semantics around batches?
 
  On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  If I understood the code and Jay correctly - if you wait for the
  future it will be a similar delay to that of the old sync producer.
 
  Put another way, if you test it out and see longer delays than the
  sync producer had, we need to find out why and fix it.
 
  Gwen
 
  On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
  otis.gospodne...@gmail.com wrote:
   Hi,
  
   Nope, unfortunately it can't do that.  X is a remote app, doesn't
 listen
  to
   anything external, calls Y via HTTPS.  So X has to decide what to do
 with
   its data based on Y's synchronous response.  It has to block until Y
   responds.  And it wouldn't be pretty, I think, because nobody wants to
  run
   apps that talk to remove servers and hang on to connections more than
  they
   have to.  But perhaps that is the only way?  Or maybe the answer to
 I'm
   guessing the delay would be more or less the same as if the Producer
 was
   using SYNC mode? is YES, in which case the connection from X to Y
 would
  be
   open for just as long as with a SYNC producer running in Y?
  
   Thanks,
   Otis
   --
   Monitoring * Alerting * Anomaly Detection * Centralized Log Management
   Solr  Elasticsearch Support * http://sematext.com/
  
  
   On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
  
   Can Y have a callback that will handle the notification to X?
   In this case, perhaps Y can be async and X can buffer the data until
   the callback triggers and says all good (or resend if the callback
   indicates an error)
  
   On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
   otis.gospodne...@gmail.com wrote:
Hi,
   
Thanks for the info.  Here's the use case.  We have something up
  stream
sending data, say a log shipper called X.  It sends it to some
 remote
component Y.  Y is the Kafka Producer and it puts data into Kafka.
  But Y
needs to send a reply to X and tell it whether it successfully put
 all
   its
data into Kafka.  If it did not, Y wants to tell X to buffer data
  locally
and resend it later.
   
If producer is ONLY async, Y can't easily do that.  Or maybe Y
 would
  just
need to wait for the Future to come back and only then send the
  response
back to X?  If so, I'm guessing the delay would be more or less the
  same
   as
if the Producer was using SYNC mode?
   
Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log
 Management
Solr  Elasticsearch Support * http://sematext.com/
   
   
On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps jay.kr...@gmail.com
  wrote:
   
Yeah as Gwen says

Re: Kafka ETL Camus Question

2015-02-02 Thread Pradeep Gollakota
Hi Bhavesh,

At Lithium, we don't run Camus in our pipelines yet, though we plan to. But
I just wanted to comment regarding speculative execution. We have it
disabled at the cluster level and typically don't need it for most of our
jobs. Especially with something like Camus, I don't see any need to run
parallel copies of the same task.

On Mon, Feb 2, 2015 at 10:36 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 Hi Jun,

 Thanks for info.  I did not get answer  to my question there so I thought I
 try my luck here :)

 Thanks,

 Bhavesh

 On Mon, Feb 2, 2015 at 9:46 PM, Jun Rao j...@confluent.io wrote:

  You can probably ask the Camus mailing list.
 
  Thanks,
 
  Jun
 
  On Thu, Jan 29, 2015 at 1:59 PM, Bhavesh Mistry 
  mistry.p.bhav...@gmail.com
  wrote:
 
   Hi Kafka Team or Linked-In  Team,
  
   I would like to know if you guys run Camus ETL job with speculative
   execution true or false.  Does it make sense to set this to false ?
  Having
   true, it creates additional load on brokers for each map task (create a
  map
   task to pull same partition twice).  Is there any advantage to this
  having
   it on vs off ?
  
   mapred.map.tasks.speculative.execution
  
   Thanks,
  
   Bhavesh
  
 



Re: Max. storage for Kafka and impact

2014-12-19 Thread Pradeep Gollakota
@Joe, Achanta is using Indian English numerals which is why it's a little
confusing. http://en.wikipedia.org/wiki/Indian_English#Numbering_system
1,00,000 [1 lakh] (Indian English) == 100,000 [1 hundred thousand] (The
rest of the world :P)

On Fri Dec 19 2014 at 9:40:29 AM Achanta Vamsi Subhash 
achanta.va...@flipkart.com wrote:

 Joe,

 - Correction, it's 1,00,000 partitions
 - We can have at max only 1 consumer/partition. Not 50 per 1 partition.
 Yes, we have a hashing mechanism to support future partition increase as
 well. We override the Default Partitioner.
 - We use both Simple and HighLevel consumers depending on the consumption
 use-case.
 - I clearly mentioned that 200 TB/week and not a day.
 - We have separate producers and consumers, each operating as different
 processes in different machines.

 I was explaining why we may end up with so many partitions. I think the
 question about 200 TB/day got deviated.

 Any suggestions reg. the performance impact of the 200TB/week?

 On Fri, Dec 19, 2014 at 10:53 PM, Joe Stein joe.st...@stealth.ly wrote:
 
  Wait, how do you get 2,000 topics each with 50 partitions == 1,000,000
  partitions? I think you can take what I said below and change my 250 to
 25
  as I went with your result (1,000,000) and not your arguments (2,000 x
 50).
 
  And you should think on the processing as a separate step from fetch and
  commit your offset in batch post processing. Then you only need more
  partitions to fetch batches to process in parallel.
 
  Regards, Joestein
 
  On Fri, Dec 19, 2014 at 12:01 PM, Joe Stein joe.st...@stealth.ly
 wrote:
  
   see some comments inline
  
   On Fri, Dec 19, 2014 at 11:30 AM, Achanta Vamsi Subhash 
   achanta.va...@flipkart.com wrote:
  
   We require:
   - many topics
   - ordering of messages for every topic
  
  
   Ordering is only on a per partition basis so you might have to pick a
   partition key that makes sense for what you are doing.
  
  
   - Consumers hit different Http EndPoints which may be slow (in a push
   model). In case of a Pull model, consumers may pull at the rate at
 which
   they can process.
   - We need parallelism to hit with as many consumers. Hence, we
 currently
   have around 50 consumers/topic = 50 partitions.
  
  
   I think you might be mixing up the fetch with the processing. You can
  have
   1 partition and still have 50 message being processed in parallel (so a
   batch of messages).
  
   What language are you working in? How are you doing this processing
   exactly?
  
  
  
   Currently we have:
   2000 topics x 50 = 1,00,000 partitions.
  
  
   If this is really the case then you are going to need at least 250
  brokers
   (~ 4,000 partitions per broker).
  
   If you do that then you are in the 200TB per day world which doesn't
  sound
   to be the case.
  
   I really think you need to strategize more on your processing model
 some
   more.
  
  
  
   The incoming rate of ingestion at max is 100 MB/sec. We are planning
  for a
   big cluster with many brokers.
  
  
   It is possible to handle this on just 3 brokers depending on message
  size,
   ability to batch, durability are also factors you really need to be
   thinking about.
  
  
  
   We have exactly the same use cases as mentioned in this video (usage
 at
   LinkedIn):
   https://www.youtube.com/watch?v=19DvtEC0EbQ​
  
   ​To handle the zookeeper scenario, as mentioned in the above video, we
  are
   planning to use SSDs​ and would upgrade to the new consumer (0.9+)
 once
   its
   available as per the below video.
   https://www.youtube.com/watch?v=7TZiN521FQA
  
   On Fri, Dec 19, 2014 at 9:06 PM, Jayesh Thakrar
   j_thak...@yahoo.com.invalid
wrote:
  
Technically/conceptually it is possible to have 200,000 topics, but
 do
   you
really need it like that?What do you intend to do with those
 messages
  -
i.e. how do you forsee them being processed downstream? And are
 those
topics really there to segregate different kinds of processing or
   different
ids?E.g. if you were LinkedIn, Facebook or Google, would you have
 have
   one
topic per user or one topic per kind of event (e.g. login, pageview,
adview, etc.)Remember there is significant book-keeping done within
Zookeeper - and these many topics will make that book-keeping
   significant.
As for storage, I don't think it should be an issue with sufficient
spindles, servers and higher than default memory configuration.
Jayesh
  From: Achanta Vamsi Subhash achanta.va...@flipkart.com
 To: users@kafka.apache.org users@kafka.apache.org
 Sent: Friday, December 19, 2014 9:00 AM
 Subject: Re: Max. storage for Kafka and impact
   
Yes. We need those many max partitions as we have a central
 messaging
service and thousands of topics.
   
On Friday, December 19, 2014, nitin sharma 
  kumarsharma.ni...@gmail.com
   
wrote:
   
 hi,

 Few things you have to plan for:
 a. Ensure that from 

Re: [DISCUSS] Kafka Security Specific Features

2014-06-06 Thread Pradeep Gollakota
I'm actually not convinced that encryption needs to be handled server side
in Kafka. I think the best solution for encryption is to handle it
producer/consumer side just like compression. This will offload key
management to the users and we'll still be able to leverage the sendfile
optimization for better performance.


On Fri, Jun 6, 2014 at 10:48 AM, Rob Withers robert.w.with...@gmail.com
wrote:

 On consideration, if we have 3 different access groups (1 for production
 WRITE and 2 consumers) they all need to decode the same encryption and so
 all need the same public/private keycerts won't work, unless you write
 a CertAuthority to build multiple certs with the same keys.  Better seems
 to not use certs and wrap the encryption specification with an ACL
 capabilities for each group of access.


 On Jun 6, 2014, at 11:43 AM, Rob Withers wrote:

  This is quite interesting to me and it is an excelent opportunity to
 promote a slightly different security scheme.  Object-capabilities are
 perfect for online security and would use ACL style authentication to gain
 capabilities filtered to those allowed resources for allow actions
 (READ/WRITE/DELETE/LIST/SCAN).  Erights.org has the quitenscential (??)
 object capabilities model and capnproto is impleemting this for C++.  I
 have a java implementation at http://github.com/pauwau/pauwau but the
 master is broken.  0.2 works, basically.  B asically a TLS connection with
 no certificate server, it is peer to peer.  It has some advanced features,
 but the lining of capabilities with authorization so that you can only
 invoke correct services is extended to the secure user.

 Regarding non-repudiation, on disk, why not prepend a CRC?

 Regarding on-disk encryption, multiple users/groups may need to access,
 with different capabilities.  Sounds like zookeeper needs to store a cert
 for each class of access so that a group member can access the decrypted
 data from disk.  Use cert-based async decryption.  The only isue is storing
 the private key in zookeeper.  Perhaps some hash magic could be used.

 Thanks for kafka,
 Rob

 On Jun 5, 2014, at 3:01 PM, Jay Kreps wrote:

  Hey Joe,

 I don't really understand the sections you added to the wiki. Can you
 clarify them?

 Is non-repudiation what SASL would call integrity checks? If so don't SSL
 and and many of the SASL schemes already support this as well as
 on-the-wire encryption?

 Or are you proposing an on-disk encryption scheme? Is this actually
 needed?
 Isn't a on-the-wire encryption when combined with mutual authentication
 and
 permissions sufficient for most uses?

 On-disk encryption seems unnecessary because if an attacker can get root
 on
 the kafka boxes it can potentially modify Kafka to do anything he or she
 wants with data. So this seems to break any security model.

 I understand the problem of a large organization not really having a
 trusted network and wanting to secure data transfer and limit and audit
 data access. The uses for these other things I don't totally understand.

 Also it would be worth understanding the state of other messaging and
 storage systems (Hadoop, dbs, etc). What features do they support. I
 think
 there is a sense in which you don't have to run faster than the bear, but
 only faster then your friends. :-)

 -Jay


 On Wed, Jun 4, 2014 at 5:57 PM, Joe Stein joe.st...@stealth.ly wrote:

  I like the idea of working on the spec and prioritizing. I will update
 the
 wiki.

 - Joestein


 On Wed, Jun 4, 2014 at 1:11 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey Joe,

 Thanks for kicking this discussion off! I totally agree that for

 something

 that acts as a central message broker security is critical feature. I

 think

 a number of people have been interested in this topic and several
 people
 have put effort into special purpose security efforts.

 Since most the LinkedIn folks are working on the consumer right now I

 think

 this would be a great project for any other interested people to take
 on.
 There are some challenges in doing these things distributed but it can

 also

 be a lot of fun.

 I think a good first step would be to get a written plan we can all
 agree
 on for how things should work. Then we can break things down into
 chunks
 that can be done independently while still aiming at a good end state.

 I had tried to write up some notes that summarized at least the
 thoughts

 I

 had had on security:
 https://cwiki.apache.org/confluence/display/KAFKA/Security

 What do you think of that?

 One assumption I had (which may be incorrect) is that although we want

 all

 the things in your list, the two most pressing would be authentication

 and

 authorization, and that was all that write up covered. You have more
 experience in this domain, so I wonder how you would prioritize?

 Those notes are really sketchy, so I think the first goal I would have
 would be to get to a real spec we can all agree on and discuss. A lot
 of
 the security stuff has a 

Re: Remote Zookeeper

2014-03-11 Thread Pradeep Gollakota
Is there a firewall thats blocking connections on port 9092? Also, the
broker list should be comma separated.


On Tue, Mar 11, 2014 at 9:02 AM, A A andthereitg...@hotmail.com wrote:

 Sorry one of the brokers for was down. Brought it back up. Tried the
 following

  $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092
 192.168.1.124:9092 --topic test
 hello brokers

 [2014-03-11 10:16:55,547] WARN Error while fetching metadata
 [{TopicMetadata for topic test -
 No partition metadata for topic test due to
 kafka.common.LeaderNotAvailableException}] for topic [test]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2014-03-11 10:16:55,576] WARN Error while fetching metadata
 [{TopicMetadata for topic test -
 No partition metadata for topic test due to
 kafka.common.LeaderNotAvailableException}] for topic [test]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2014-03-11 10:16:55,578] ERROR Failed to collate messages by topic,
 partition due to: Failed to fetch topic metadata for topic: test
 (kafka.producer.async.DefaultEventHandler)
 [2014-03-11 10:16:55,693] WARN Error while fetching metadata
 [{TopicMetadata for topic test -
 No partition metadata for topic test due to
 kafka.common.LeaderNotAvailableException}] for topic [test]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2014-03-11 10:16:55,706] WARN Error while fetching metadata
 [{TopicMetadata for topic test -
 No partition metadata for topic test due to
 kafka.common.LeaderNotAvailableException}] for topic [test]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2014-03-11 10:16:55,706] ERROR Failed to collate messages by topic,
 partition due to: Failed to fetch topic metadata for topic: test
 (kafka.producer.async.DefaultEventHandler)
 [2014-03-11 10:16:55,815] WARN Error while fetching metadata
 [{TopicMetadata for topic test -
 No partition metadata for topic test due to
 kafka.common.LeaderNotAvailableException}] for topic [test]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2014-03-11 10:16:55,823] WARN Error while fetching metadata
 [{TopicMetadata for topic test -
 No partition metadata for topic test due to
 kafka.common.LeaderNotAvailableException}] for topic [test]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2014-03-11 10:16:55,823] ERROR Failed to collate messages by topic,
 partition due to: Failed to fetch topic metadata for topic: test
 (kafka.producer.async.DefaultEventHandler)
 [2014-03-11 10:16:55,934] WARN Error while fetching metadata
 [{TopicMetadata for topic test -
 No partition metadata for topic test due to
 kafka.common.LeaderNotAvailableException}] for topic [test]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2014-03-11 10:16:55,949] WARN Error while fetching metadata
 [{TopicMetadata for topic test -
 No partition metadata for topic test due to
 kafka.common.LeaderNotAvailableException}] for topic [test]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2014-03-11 10:16:55,949] ERROR Failed to collate messages by topic,
 partition due to: Failed to fetch topic metadata for topic: test
 (kafka.producer.async.DefaultEventHandler)
 [2014-03-11 10:16:56,057] WARN Error while fetching metadata
 [{TopicMetadata for topic test -
 No partition metadata for topic test due to
 kafka.common.LeaderNotAvailableException}] for topic [test]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2014-03-11 10:16:56,059] ERROR Failed to send requests for topics test
 with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
 [2014-03-11 10:16:56,061] ERROR Error in handling batch of 1 events
 (kafka.producer.async.ProducerSendThread)
 kafka.common.FailedToSendMessageException: Failed to send messages after 3
 tries.
 at
 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
 at
 kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
 at
 kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
 at
 kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
 at scala.collection.immutable.Stream.foreach(Stream.scala:254)
 at
 kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
 at
 kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)



 From: andthereitg...@hotmail.com
 To: users@kafka.apache.org
 Subject: RE: Remote Zookeeper
 Date: Tue, 11 Mar 2014 15:59:44 +




 Okay thanks. Just to verify my setup I tried the following on broker1 (by
 publishing to localhost)

  

Re: New Consumer API discussion

2014-02-13 Thread Pradeep Gollakota
Hi Neha,

   6. It seems like #4 can be avoided by using MapTopicPartition,
 Long or MapTopicPartition, TopicPartitionOffset as the argument type.

 How? lastCommittedOffsets() is independent of positions(). I'm not sure I
 understood your suggestion.

I think of subscription as you're subscribing to a Set of TopicPartitions.
Because the argument to positions() is TopicPartitionOffset ... it's
conceivable that the method can be called with two offsets for the same
TopicPartition. One way to handle this, is to accept either the first or
the last offset for a TopicPartition. However, if the argument type is
changed to MapTopicPartition, Long it precludes the possibility of
getting duplicate offsets of the same TopicPartition.

   7. To address #3, maybe we can return ListTopicPartitionOffset that are
 invalid.

 I don't particularly see the advantage of returning a list of invalid

partitions from position(). It seems a bit awkward to return a list to

indicate what is obviously a bug. Prefer throwing an error since the user
 should just fix that logic.

I'm not sure if an Exception is needed or desirable here. I don't see this
as a catastrophic failure or a non-recoverable failure. Even if we just
write the bad offsets to a log file and call it a day, I'm ok with that.
But my main goal is to communicate to the API users somehow that they've
provided bad offests which are simply being ignored.

Hi Jay,

I would also like to shorten the name TopicOffsetPosition. Offset and
 Position are duplicative of each other. So perhaps we could call it a
 PartitionOffset or a TopicPosition or something like that. In general class
 names that are just a concatenation of the fields (e.g.
 TopicAndPartitionAndOffset) seem kind of lazy to me since the name doesn't
 really describe it just enumerates. But that is more of a nit pick.


   1. Did you mean to say TopicPartitionOffset instead of
   TopicOffsetPosition?
   2. +1 on PartitionOffset

The lastCommittedPosition is particular bothersome because:
 1. The name is weird and long
 2. It returns a list of results. But how can you use the list? The only way
 to use the list is to make a map of tp=offset and then look up results in
 this map (or do a for loop over the list for the partition you want).

This is sort of what I was talking about in my previous email. My
suggestion was to change the return type to MapTopicPartition, Long.

What if we made it:
long position(TopicPartition tp)
void seek(TopicOffsetPosition p)
long committed(TopicPartition tp)
void commit(TopicOffsetPosition...);


   1. Absolutely love the idea of position(TopicPartition tp).
   2. I think we also need to provide a method for accessing all positions
   positions() which maybe returns a MapTopicPartition, Long?
   3. What is the difference between position(TopicPartition tp) and
committed(TopicPartition
   tp)?
   4. +1 on commit(PartitionOffset...)
   5. +1 on seek(PartitionOffset p)
   6. We should also provide a seek(PartitionOffset... offsets)

Finally, in all the methods where we're using varargs, we should use an
appropriate Collection data structure. For example, for the
subscribe(TopicPartition...
partitions) method, I think a more accurate API would be
subscribe(SetTopicPartition
partitions). This allows for the code to be self-documenting.


Re: New Consumer API discussion

2014-02-11 Thread Pradeep Gollakota
Hi Jay,

I apologize for derailing the conversation about the consumer API. We
should start a new discussion about hierarchical topics, if we want to keep
talking about it. My final thought on the matter is that, hierarchical
topics is still an important feature to have in Kafka, because it gives us
flexibility to do namespace level access controls.

Getting back to the topic of the Consumer API:

   1. Any thoughts on consistency for method arguments and return types?
   2. lastCommittedOffsets() method returns a
ListTopicPartitionOffsetwhere as the confluence page suggested a
MapTopicPartition,
   Long. I would think that a Map is the more appropriate return type.



On Tue, Feb 11, 2014 at 8:04 AM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Pradeep,

 That wiki is fairly old and it predated more flexible subscription
 mechanisms. In the high-level consumer you currently have wildcard
 subscription and in the new proposed interface you can actually subscribe
 based on any logic you want to create a union of streams. Personally I
 think this gives you everything you would want with a hierarchy and more
 actual flexibility (since you can define groupings however you want). What
 do you think?

 -Jay


 On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota pradeep...@gmail.com
 wrote:

  WRT to hierarchical topics, I'm referring to
  KAFKA-1175https://issues.apache.org/jira/browse/KAFKA-1175.
  I would just like to think through the implications for the Consumer API
 if
  and when we do implement hierarchical topics. For example, in the
  proposal
  https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#
  written
  by Jay, he says that initially wildcard subscriptions are not going
  to be supported. But does that mean that they will be supported in v2? If
  that's the case, that would change the semantics of the Consumer API.
 
  As to having classes for Topic, PartitionId, etc. it looks like I was
  referring to the TopicPartition and TopicPartitionOffset classes (I
 didn't
  realize these were already there). I was only looking at the confluence
  page which shows List[(String, Int, Long)] instead of
  List[TopicParitionOffset] (as is shown in the javadoc). However, I did
  notice that we're not being consistent in the Java version. E.g. we have
  commit(TopicPartitionOffset... offsets) and
  lastCommittedOffsets(TopicPartition... partitions) on the one hand. On
 the
  other hand we have subscribe(String topic, int... partitions). I agree
 that
  creating a class for TopicId today would probably not make too much sense
  today. But with hierarchical topics, I may change my mind. This is
 exactly
  what was done in the HBase API in 0.96 when namespaces were added. 0.96
  HBase API introduced a class called 'TableName' to represent the
 namespace
  and table name.
 
 
  On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
   Thanks for the feedback.
  
   Mattijs -
  
   - Constructors link to
   http://kafka.apache.org/documentation.html#consumerconfigs for valid
   configurations, which lists zookeeper.connect rather than
   metadata.broker.list, the value for BROKER_LIST_CONFIG in
 ConsumerConfig.
   Fixed it to just point to ConsumerConfig for now until we finalize the
  new
   configs
   - Docs for poll(long) mention consumer.commit(true), which I can't find
  in
   the Consumer docs. For a simple consumer setup, that call is something
  that
   would make a lot of sense.
   Missed changing the examples to use consumer.commit(true, offsets). The
   suggestions by Jay would change it to commit(offsets) and
   commitAsync(offsets), which will hopefully make it easier to understand
   those commit APIs.
   - Love the addition of MockConsumer, awesome for unittesting :)
   I'm not quite satisfied with what it does as of right now, but we will
   surely improve it as we start writing the consumer.
  
   Jay -
  
   1. ConsumerRebalanceCallback
   a. Makes sense. Renamed to onPartitionsRevoked
   b. Ya, it will be good to make it forward compatible with Java 8
   capabilities. We can change it to PartitionsAssignedCallback and
PartitionsRevokedCallback or RebalanceBeginCallback and
   RebalanceEndCallback?
   c. Ya, I thought about that but then didn't name it just
   RebalanceCallback since there could be a conflict with a controller
 side
   rebalance callback if/when we have one. However, you can argue that at
  that
   time we can name it ControllerRebalanceCallback instead of polluting a
  user
   facing API. So agree with you here.
   2. Ya, that is a good idea. Changed to subscribe(String topic,
   int...partitions).
   3. lastCommittedOffset() is not necessarily a local access since the
   consumer can potentially ask for the last committed offsets of
 partitions
   that the consumer does not consume and maintain the offsets for. That's
  the
   reason it is batched right now.
   4. Yes, look at
  
  
 
 http://people.apache.org

Re: Building a producer/consumer supporting exactly-once messaging

2014-02-10 Thread Pradeep Gollakota
Have you read this part of the documentation?
http://kafka.apache.org/documentation.html#semantics

Just wondering if that solves your use case.


On Mon, Feb 10, 2014 at 9:11 AM, Garry Turkington 
g.turking...@improvedigital.com wrote:

 Hi,

 I've been doing some prototyping on Kafka for a few months now and like
 what I see. It's a good fit for some of my use cases in the areas of data
 distribution but also for processing - liking a lot of what I see in Samza.
 I'm now working through some of the operational issues and have a question
 to the community.

 I have several data sources that I want to push into Kafka but some of the
 most important are arriving as a stream of files being dropped either into
 a SFTP location or S3. Conceptually the data is really a stream but its
 being chunked and made more batch by the deployment model of the
 operational servers. So pulling the data into Kafka and seeing it more as a
 stream again is a big plus.

 But, I really don't want duplicate messages. I know Kafka provides at
 least once semantics and that's fine, I'm happy to have the de-dupe logic
 external to Kafka. And if I look at my producer I can build up a protocol
 around adding record metadata and using Zookeeper to give me pretty high
 confidence that my clients will know if they are reading from a file that
 was fully published into Kafka or not.

 I had assumed that this wouldn't be a unique use case but on doing a bunch
 of searches I really don't find much in terms of either tools that help or
 even just best practice patterns for handling this type of need to support
 exactly-once message processing.

 So now I'm thinking that either I just need better web search skills or
 that actually this isn't something many others are doing and if so then
 there's likely a reason for that.

 Any thoughts?

 Thanks
 Garry




Re: New Consumer API discussion

2014-02-10 Thread Pradeep Gollakota
Couple of very quick thoughts.

1. +1 about renaming commit(...) and commitAsync(...)
2. I'd also like to extend the above for the poll()  method as well. poll()
and pollWithTimeout(long, TimeUnit)?
3. Have you guys given any thought around how this API would be used with
hierarchical topics?
4. Would it make sense to add classes such as TopicId, PartitionId, etc?
Seems like it would be easier to read code with these classes as opposed to
string and longs.

- Pradeep


On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps jay.kr...@gmail.com wrote:

 A few items:
 1. ConsumerRebalanceCallback
a. onPartitionsRevoked would be a better name.
b. We should discuss the possibility of splitting this into two
 interfaces. The motivation would be that in Java 8 single method interfaces
 can directly take methods which might be more intuitive.
c. If we stick with a single interface I would prefer the name
 RebalanceCallback as its more concise
 2. Should subscribe(String topic, int partition) should be subscribe(String
 topic, int...partition)?
 3. Is lastCommittedOffset call just a local access? If so it would be more
 convenient not to batch it.
 4. How are we going to handle the earliest/latest starting position
 functionality we currently have. Does that remain a config?
 5. Do we need to expose the general ability to get known positions from the
 log? E.g. the functionality in the OffsetRequest...? That would make the
 ability to change position a little easier.
 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
 unit)? Is it Long because it allows null? If so should we just add a poll()
 that polls indefinitely?
 7. I recommend we remove the boolean parameter from commit as it is really
 hard to read code that has boolean parameters without named arguments. Can
 we make it something like commit(...) and commitAsync(...)?
 8. What about the common case where you just want to commit the current
 position for all partitions?
 9. How do you unsubscribe?
 10. You say in a few places that positions() only impacts the starting
 position, but surely that isn't the case, right? Surely it controls the
 fetch position for that partition and can be called at any time? Otherwise
 it is a pretty weird api, right?
 11. How do I get my current position? Not the committed position but the
 offset of the next message that will be given to me?

 One thing that I really found helpful for the API design was writing out
 actual code for different scenarios against the API. I think it might be
 good to do that for this too--i.e. enumerate the various use cases and code
 that use case up to see how it looks. I'm not sure if it would be useful to
 collect these kinds of scenarios from people. I know they have sporadically
 popped up on the mailing list.

 -Jay


 On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  As mentioned in previous emails, we are also working on a
 re-implementation
  of the consumer. I would like to use this email thread to discuss the
  details of the public API. I would also like us to be picky about this
  public api now so it is as good as possible and we don't need to break it
  in the future.
 
  The best way to get a feel for the API is actually to take a look at the
  javadoc
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
  ,
  the hope is to get the api docs good enough so that it is
 self-explanatory.
  You can also take a look at the configs
  here
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
  
 
  Some background info on implementation:
 
  At a high level the primary difference in this consumer is that it
 removes
  the distinction between the high-level and low-level consumer. The
 new
  consumer API is non blocking and instead of returning a blocking
 iterator,
  the consumer provides a poll() API that returns a list of records. We
 think
  this is better compared to the blocking iterators since it effectively
  decouples the threading strategy used for processing messages from the
  consumer. It is worth noting that the consumer is entirely single
 threaded
  and runs in the user thread. The advantage is that it can be easily
  rewritten in less multi-threading-friendly languages. The consumer
 batches
  data and multiplexes I/O over TCP connections to each of the brokers it
  communicates with, for high throughput. The consumer also allows long
 poll
  to reduce the end-to-end message latency for low throughput data.
 
  The consumer provides a group management facility that supports the
 concept
  of a group with multiple consumer instances (just like the current
  consumer). This is done through a custom heartbeat and group management
  protocol transparent to the user. At the same time, it allows users the
  option to subscribe to a fixed set of partitions and not use group
  management at all. The 

Re: Config for new clients (and server)

2014-02-10 Thread Pradeep Gollakota
+1 Jun.


On Mon, Feb 10, 2014 at 2:17 PM, Sriram Subramanian 
srsubraman...@linkedin.com wrote:

 +1 on Jun's suggestion.

 On 2/10/14 2:01 PM, Jun Rao jun...@gmail.com wrote:

 I actually prefer to see those at INFO level. The reason is that the
 config
 system in an application can be complex. Some configs can be overridden in
 different layers and it may not be easy to determine what the final
 binding
 value is. The logging in Kafka will serve as the source of truth.
 
 For reference, ZK client logs all overridden values during initialization.
 It's a one time thing during starting up, so shouldn't add much noise.
 It's
 very useful for debugging subtle config issues.
 
 Exposing final configs programmatically is potentially useful. If we don't
 want to log overridden values out of box, an app can achieve the same
 thing
 using the programming api. The only missing thing is that we won't know
 those unused property keys, which is probably less important than seeing
 the overridden values.
 
 Thanks,
 
 Jun
 
 
 On Mon, Feb 10, 2014 at 10:15 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
  Hey Jun,
 
  I think that is reasonable but would object to having it be debug
 logging?
  I think logging out a bunch of noise during normal operation in a client
  library is pretty ugly. Also, is there value in exposing the final
 configs
  programmatically?
 
  -Jay
 
 
 
  On Sun, Feb 9, 2014 at 9:23 PM, Jun Rao jun...@gmail.com wrote:
 
   +1 on the new config. Just one comment. Currently, when initiating a
  config
   (e.g. ProducerConfig), we log those overridden property values and
 unused
   property keys (likely due to mis-spelling). This has been very useful
 for
   config verification. It would be good to add similar support in the
 new
   config.
  
   Thanks,
  
   Jun
  
  
   On Tue, Feb 4, 2014 at 9:34 AM, Jay Kreps jay.kr...@gmail.com
 wrote:
  
We touched on this a bit in previous discussions, but I wanted to
 draw
   out
the approach to config specifically as an item of discussion.
   
The new producer and consumer use a similar key-value config
 approach
  as
the existing scala clients but have different implementation code to
  help
define these configs. The plan is to use the same approach on the
  server,
once the new clients are complete; so if we agree on this approach
 it
   will
be the new default across the board.
   
Let me split this into two parts. First I will try to motivate the
 use
  of
key-value pairs as a configuration api. Then let me discuss the
  mechanics
of specifying and parsing these. If we agree on the public api then
 the
public api then the implementation details are interesting as this
 will
   be
shared across producer, consumer, and broker and potentially some
  tools;
but if we disagree about the api then there is no point in
 discussing
  the
implementation.
   
Let me explain the rationale for this. In a sense a key-value map of
configs is the worst possible API to the programmer using the
 clients.
   Let
me contrast the pros and cons versus a POJO and motivate why I
 think it
   is
still superior overall.
   
Pro: An application can externalize the configuration of its kafka
   clients
into its own configuration. Whatever config management system the
  client
application is using will likely support key-value pairs, so the
 client
should be able to directly pull whatever configurations are present
 and
   use
them in its client. This means that any configuration the client
  supports
can be added to any application at runtime. With the pojo approach
 the
client application has to expose each pojo getter as some config
   parameter.
The result of many applications doing this is that the config is
   different
for each and it is very hard to have a standard client config shared
across. Moving config into config files allows the usual tooling
  (version
control, review, audit, config deployments separate from code
 pushes,
etc.).
   
Pro: Backwards and forwards compatibility. Provided we stick to our
  java
api many internals can evolve and expose new configs. The
 application
  can
support both the new and old client by just specifying a config that
  will
be unused in the older version (and of course the reverse--we can
  remove
obsolete configs).
   
Pro: We can use a similar mechanism for both the client and the
 server.
Since most people run the server as a stand-alone process it needs a
   config
file.
   
Pro: Systems like Samza that need to ship configs across the network
  can
easily do so as configs have a natural serialized form. This can be
  done
with pojos using java serialization but it is ugly and has bizare
  failure
cases.
   
Con: The IDE gives nice auto-completion for pojos.
   
Con: There are some advantages to javadoc as a documentation
 mechanism
   for
java people.
  

Re: New Consumer API discussion

2014-02-10 Thread Pradeep Gollakota
.

 Pradeep -

 2. Changed to poll(long, TimeUnit) and a negative value for the timeout
 would block in the poll forever until there is new data
 3. We don't have hierarchical topics support. Would you mind explaining
 what you meant?
 4. I'm not so sure that we need a class to express a topic which is a
 string and a separate class for just partition id. We do have a class for
 TopicPartition which uniquely identifies a partition of a topic

 Thanks,
 Neha


 On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota pradeep...@gmail.com
 wrote:

  Couple of very quick thoughts.
 
  1. +1 about renaming commit(...) and commitAsync(...)
  2. I'd also like to extend the above for the poll()  method as well.
 poll()
  and pollWithTimeout(long, TimeUnit)?
  3. Have you guys given any thought around how this API would be used with
  hierarchical topics?
  4. Would it make sense to add classes such as TopicId, PartitionId, etc?
  Seems like it would be easier to read code with these classes as opposed
 to
  string and longs.
 
  - Pradeep
 
 
  On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
   A few items:
   1. ConsumerRebalanceCallback
  a. onPartitionsRevoked would be a better name.
  b. We should discuss the possibility of splitting this into two
   interfaces. The motivation would be that in Java 8 single method
  interfaces
   can directly take methods which might be more intuitive.
  c. If we stick with a single interface I would prefer the name
   RebalanceCallback as its more concise
   2. Should subscribe(String topic, int partition) should be
  subscribe(String
   topic, int...partition)?
   3. Is lastCommittedOffset call just a local access? If so it would be
  more
   convenient not to batch it.
   4. How are we going to handle the earliest/latest starting position
   functionality we currently have. Does that remain a config?
   5. Do we need to expose the general ability to get known positions from
  the
   log? E.g. the functionality in the OffsetRequest...? That would make
 the
   ability to change position a little easier.
   6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
   unit)? Is it Long because it allows null? If so should we just add a
  poll()
   that polls indefinitely?
   7. I recommend we remove the boolean parameter from commit as it is
  really
   hard to read code that has boolean parameters without named arguments.
  Can
   we make it something like commit(...) and commitAsync(...)?
   8. What about the common case where you just want to commit the current
   position for all partitions?
   9. How do you unsubscribe?
   10. You say in a few places that positions() only impacts the starting
   position, but surely that isn't the case, right? Surely it controls the
   fetch position for that partition and can be called at any time?
  Otherwise
   it is a pretty weird api, right?
   11. How do I get my current position? Not the committed position but
 the
   offset of the next message that will be given to me?
  
   One thing that I really found helpful for the API design was writing
 out
   actual code for different scenarios against the API. I think it might
 be
   good to do that for this too--i.e. enumerate the various use cases and
  code
   that use case up to see how it looks. I'm not sure if it would be
 useful
  to
   collect these kinds of scenarios from people. I know they have
  sporadically
   popped up on the mailing list.
  
   -Jay
  
  
   On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede 
 neha.narkh...@gmail.com
   wrote:
  
As mentioned in previous emails, we are also working on a
   re-implementation
of the consumer. I would like to use this email thread to discuss the
details of the public API. I would also like us to be picky about
 this
public api now so it is as good as possible and we don't need to
 break
  it
in the future.
   
The best way to get a feel for the API is actually to take a look at
  the
javadoc
   
  
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
,
the hope is to get the api docs good enough so that it is
   self-explanatory.
You can also take a look at the configs
here
   
  
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html

   
Some background info on implementation:
   
At a high level the primary difference in this consumer is that it
   removes
the distinction between the high-level and low-level consumer.
 The
   new
consumer API is non blocking and instead of returning a blocking
   iterator,
the consumer provides a poll() API that returns a list of records. We
   think
this is better compared to the blocking iterators since it
 effectively
decouples the threading strategy used for processing messages from
 the
consumer. It is worth noting that the consumer is entirely single