Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Jonathan Weeks
+1 on this change — APIs are forever. As much as we’d love to see 0.8.2 release 
ASAP, it is important to get this right.

-JW

> On Nov 24, 2014, at 5:58 PM, Jun Rao  wrote:
> 
> Hi, Everyone,
> 
> I'd like to start a discussion on whether it makes sense to add the
> serializer api back to the new java producer. Currently, the new java
> producer takes a byte array for both the key and the value. While this api
> is simple, it pushes the serialization logic into the application. This
> makes it hard to reason about what type of data is being sent to Kafka and
> also makes it hard to share an implementation of the serializer. For
> example, to support Avro, the serialization logic could be quite involved
> since it might need to register the Avro schema in some remote registry and
> maintain a schema cache locally, etc. Without a serialization api, it's
> impossible to share such an implementation so that people can easily reuse.
> We sort of overlooked this implication during the initial discussion of the
> producer api.
> 
> So, I'd like to propose an api change to the new producer by adding back
> the serializer api similar to what we had in the old producer. Specially,
> the proposed api changes are the following.
> 
> First, we change KafkaProducer to take generic types K and V for the key
> and the value, respectively.
> 
> public class KafkaProducer implements Producer {
> 
>public Future send(ProducerRecord record, Callback
> callback);
> 
>public Future send(ProducerRecord record);
> }
> 
> Second, we add two new configs, one for the key serializer and another for
> the value serializer. Both serializers will default to the byte array
> implementation.
> 
> public class ProducerConfig extends AbstractConfig {
> 
>.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> KEY_SERIALIZER_CLASS_DOC)
>.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> VALUE_SERIALIZER_CLASS_DOC);
> }
> 
> Both serializers will implement the following interface.
> 
> public interface Serializer extends Configurable {
>public byte[] serialize(String topic, T data, boolean isKey);
> 
>public void close();
> }
> 
> This is more or less the same as what's in the old producer. The slight
> differences are (1) the serializer now only requires a parameter-less
> constructor; (2) the serializer has a configure() and a close() method for
> initialization and cleanup, respectively; (3) the serialize() method
> additionally takes the topic and an isKey indicator, both of which are
> useful for things like schema registration.
> 
> The detailed changes are included in KAFKA-1797. For completeness, I also
> made the corresponding changes for the new java consumer api as well.
> 
> Note that the proposed api changes are incompatible with what's in the
> 0.8.2 branch. However, if those api changes are beneficial, it's probably
> better to include them now in the 0.8.2 release, rather than later.
> 
> I'd like to discuss mainly two things in this thread.
> 1. Do people feel that the proposed api changes are reasonable?
> 2. Are there any concerns of including the api changes in the 0.8.2 final
> release?
> 
> Thanks,
> 
> Jun



Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Jonathan Weeks
I suppose it also is going to depend on:

a) How much spare I/O bandwidth the brokers have as well to support a rebuild 
while supporting ongoing requests. Our brokers have spare IO capacity.
b) How many brokers are in the cluster and what the replication factor is — 
e.g. if you have a larger cluster, it is easier to tolerate the loss of a 
single broker. We started with 3 brokers, so the loss of a single broker is 
quite significant — we would prefer possibly degraded performance to having a 
“down” broker.

I do understand that y’all both work at LinkedIn, my point is that all of the 
guidance to date (as recently as this summer) is that in production LinkedIn 
runs on RAID 10, so it is just a bit odd to hear a contrary recommendation, 
although I do understand that best practices are a moving, evolving target.

Best Regards,

-Jonathan


On Oct 22, 2014, at 4:05 PM, Todd Palino  wrote:

> Yeah, Jonathan, I'm the LinkedIn SRE who said that :) And Neha, up until
> recently, sat 8 feet from my desk. The data from the wiki page is off a
> little bit as well (we're running 14 disks now, and 64 GB systems)
> 
> So to hit the first questions, RAID 10 gives higher read performance, and
> also allows you to suffer a disk failure without having to drop the entire
> cluster. As Neha noted, you're going to take a hit on the rebuild, and
> because of ongoing traffic in the cluster it will be for a long time (we
> can easily take half a day to rebuild a disk). But you still get some
> benefit out of the RAID over just killing the data and letting it rebuild
> from the replica, because during that time the cluster is not under
> replicated, so you can suffer another failure. The more servers and disks
> you have, the more often disks are going to fail, not to mention other
> components. Both hardware and software. I like running on the safer side.
> 
> That said, I'm not sure RAID 10 is the answer either. We're going to be
> doing some experimenting with other disk layouts shortly. We've inherited a
> lot of our architecture, and many things have changed in that time. We're
> probably going to test out RAID 5 and 6 to start with and see how much we
> lose from the parity calculations.
> 
> -Todd
> 
> 
> On Wed, Oct 22, 2014 at 3:59 PM, Jonathan Weeks 
> wrote:
> 
>> Neha,
>> 
>> Do you mean RAID 10 or RAID 5 or 6? With RAID 5 or 6, recovery is
>> definitely very painful, but less so with RAID 10.
>> 
>> We have been using the guidance here:
>> 
>> http://www.youtube.com/watch?v=19DvtEC0EbQ#t=190 (LinkedIn Site
>> Reliability Engineers state they run RAID 10 on all Kafka clusters @34:40
>> or so)
>> 
>> Plus: https://cwiki.apache.org/confluence/display/KAFKA/Operations
>> 
>> LinkedIn
>> Hardware
>> We are using dual quad-core Intel Xeon machines with 24GB of memory. In
>> general this should not matter too much, we only see pretty low CPU usage
>> at peak even with GZIP compression enabled and a number of clients that
>> don't batch requests. The memory is probably more than is needed for
>> caching the active segments of the log.
>> The disk throughput is important. We have 8x7200 rpm SATA drives in a RAID
>> 10 array. In general this is the performance bottleneck, and more disks is
>> more better. Depending on how you configure flush behavior you may or may
>> not benefit from more expensive disks (if you flush often then higher RPM
>> SAS drives may be better).
>> OS Settings
>> We use Linux. Ext4 is the filesystem and we run using software RAID 10. We
>> haven't benchmarked filesystems so other filesystems may be superior.
>> We have added two tuning changes: (1) we upped the number of file
>> descriptors since we have lots of topics and lots of connections, and (2)
>> we upped the max socket buffer size to enable high-performance data
>> transfer between data centers (described here).
>> 
>> 
>> Best Regards,
>> 
>> -Jonathan
>> 
>> 
>> 
>> On Oct 22, 2014, at 3:44 PM, Neha Narkhede 
>> wrote:
>> 
>>> In my experience, RAID 10 doesn't really provide value in the presence of
>>> replication. When a disk fails, the RAID resync process is so I/O
>> intensive
>>> that it renders the broker useless until it completes. When this happens,
>>> you actually have to take the broker out of rotation and move the leaders
>>> off of it to prevent it from serving requests in a degraded state. You
>>> might as well shutdown the broker, delete the broker's data and let it
>>> catch up from the leader.
>>> 
>>> On Wed, Oct 22, 2014 at 11:20 AM, Gwen Shap

Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Jonathan Weeks
Neha, 

Do you mean RAID 10 or RAID 5 or 6? With RAID 5 or 6, recovery is definitely 
very painful, but less so with RAID 10.

We have been using the guidance here:

http://www.youtube.com/watch?v=19DvtEC0EbQ#t=190 (LinkedIn Site Reliability 
Engineers state they run RAID 10 on all Kafka clusters @34:40 or so)

Plus: https://cwiki.apache.org/confluence/display/KAFKA/Operations

LinkedIn
Hardware
We are using dual quad-core Intel Xeon machines with 24GB of memory. In general 
this should not matter too much, we only see pretty low CPU usage at peak even 
with GZIP compression enabled and a number of clients that don't batch 
requests. The memory is probably more than is needed for caching the active 
segments of the log.
The disk throughput is important. We have 8x7200 rpm SATA drives in a RAID 10 
array. In general this is the performance bottleneck, and more disks is more 
better. Depending on how you configure flush behavior you may or may not 
benefit from more expensive disks (if you flush often then higher RPM SAS 
drives may be better).
OS Settings
We use Linux. Ext4 is the filesystem and we run using software RAID 10. We 
haven't benchmarked filesystems so other filesystems may be superior.
We have added two tuning changes: (1) we upped the number of file descriptors 
since we have lots of topics and lots of connections, and (2) we upped the max 
socket buffer size to enable high-performance data transfer between data 
centers (described here).


Best Regards,

-Jonathan



On Oct 22, 2014, at 3:44 PM, Neha Narkhede  wrote:

> In my experience, RAID 10 doesn't really provide value in the presence of
> replication. When a disk fails, the RAID resync process is so I/O intensive
> that it renders the broker useless until it completes. When this happens,
> you actually have to take the broker out of rotation and move the leaders
> off of it to prevent it from serving requests in a degraded state. You
> might as well shutdown the broker, delete the broker's data and let it
> catch up from the leader.
> 
> On Wed, Oct 22, 2014 at 11:20 AM, Gwen Shapira 
> wrote:
> 
>> Makes sense. Thanks :)
>> 
>> On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks
>>  wrote:
>>> There are various costs when a broker fails, including broker leader
>> election for each partition, etc., as well as exposing possible issues for
>> in-flight messages, and client rebalancing etc.
>>> 
>>> So even though replication provides partition redundancy, RAID 10 on
>> each broker is usually a good tradeoff to prevent the typical most common
>> cause of broker server failure (e.g. disk failure) as well, and overall
>> smoother operation.
>>> 
>>> Best Regards,
>>> 
>>> -Jonathan
>>> 
>>> 
>>> On Oct 22, 2014, at 11:01 AM, Gwen Shapira 
>> wrote:
>>> 
>>>> RAID-10?
>>>> Interesting choice for a system where the data is already replicated
>>>> between nodes. Is it to avoid the cost of large replication over the
>>>> network? how large are these disks?
>>>> 
>>>> On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino 
>> wrote:
>>>>> In fact there are many more than 4000 open files. Many of our brokers
>> run
>>>>> with 28,000+ open files (regular file handles, not network
>> connections). In
>>>>> our case, we're beefing up the disk performance as much as we can by
>>>>> running in a RAID-10 configuration with 14 disks.
>>>>> 
>>>>> -Todd
>>>>> 
>>>>> On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She 
>> wrote:
>>>>> 
>>>>>> Todd,
>>>>>> 
>>>>>> Actually I'm wondering how kafka handle so much partition, with one
>>>>>> partition there is at least one file on disk, and with 4000 partition,
>>>>>> there will be at least 4000 files.
>>>>>> 
>>>>>> When all these partitions have write request, how did Kafka make the
>> write
>>>>>> operation on the disk to be sequential (which is emphasized in the
>> design
>>>>>> document of Kafka) and make sure the disk access is effective?
>>>>>> 
>>>>>> Thank you for your reply.
>>>>>> 
>>>>>> xiaobinshe
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 2014-10-22 5:10 GMT+08:00 Todd Palino :
>>>>>> 
>>>>>>> As far as the number of partitions a single broker can handle, we've
>> set
>>>>>>> our cap at 4000 partitions (including replicas). Above that we've
>> seen
>>>>>> some
>>>>>>> performance and stability issues.
>>>>>>> 
>>>>>>> -Todd
>>>>>>> 
>>>>>>> On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> hello, everyone
>>>>>>>> 
>>>>>>>> I'm new to kafka, I'm wondering what's the max num of partition can
>> one
>>>>>>>> siggle machine handle in Kafka?
>>>>>>>> 
>>>>>>>> Is there an sugeest num?
>>>>>>>> 
>>>>>>>> Thanks.
>>>>>>>> 
>>>>>>>> xiaobinshe
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>> 
>> 



Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Jonathan Weeks
There are various costs when a broker fails, including broker leader election 
for each partition, etc., as well as exposing possible issues for in-flight 
messages, and client rebalancing etc.

So even though replication provides partition redundancy, RAID 10 on each 
broker is usually a good tradeoff to prevent the typical most common cause of 
broker server failure (e.g. disk failure) as well, and overall smoother 
operation.

Best Regards,

-Jonathan


On Oct 22, 2014, at 11:01 AM, Gwen Shapira  wrote:

> RAID-10?
> Interesting choice for a system where the data is already replicated
> between nodes. Is it to avoid the cost of large replication over the
> network? how large are these disks?
> 
> On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino  wrote:
>> In fact there are many more than 4000 open files. Many of our brokers run
>> with 28,000+ open files (regular file handles, not network connections). In
>> our case, we're beefing up the disk performance as much as we can by
>> running in a RAID-10 configuration with 14 disks.
>> 
>> -Todd
>> 
>> On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She  wrote:
>> 
>>> Todd,
>>> 
>>> Actually I'm wondering how kafka handle so much partition, with one
>>> partition there is at least one file on disk, and with 4000 partition,
>>> there will be at least 4000 files.
>>> 
>>> When all these partitions have write request, how did Kafka make the write
>>> operation on the disk to be sequential (which is emphasized in the design
>>> document of Kafka) and make sure the disk access is effective?
>>> 
>>> Thank you for your reply.
>>> 
>>> xiaobinshe
>>> 
>>> 
>>> 
>>> 2014-10-22 5:10 GMT+08:00 Todd Palino :
>>> 
 As far as the number of partitions a single broker can handle, we've set
 our cap at 4000 partitions (including replicas). Above that we've seen
>>> some
 performance and stability issues.
 
 -Todd
 
 On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She 
 wrote:
 
> hello, everyone
> 
> I'm new to kafka, I'm wondering what's the max num of partition can one
> siggle machine handle in Kafka?
> 
> Is there an sugeest num?
> 
> Thanks.
> 
> xiaobinshe
> 
 
>>> 



Re: Create topic programmatically

2014-10-13 Thread Jonathan Weeks
Sure — take a look at the kafka unit tests as well as admin.AdminUtils , e.g.:

import kafka.admin.AdminUtils
   AdminUtils.createTopic(zkClient, topicNameString, 10, 1)

Best Regards,

-Jonathan

On Oct 13, 2014, at 9:58 AM, hsy...@gmail.com wrote:

> Hi guys,
> 
> Besides TopicCommand, which I believe is not provided to create topic
> programmatically, is there any other way to automate creating topic in
> code? Thanks!
> 
> Best,
> Siyuan



Re: [DISCUSS] 0.8.1.2 Release

2014-09-30 Thread Jonathan Weeks
I was one asking for 0.8.1.2 a few weeks back, when 0.8.2 was at least 6-8 
weeks out.

If we truly believe that 0.8.2 will go “golden” and stable in 2-3 weeks, I, for 
one, don’t need a 0.8.1.2, but it depends on the confidence in shipping 0.8.2 
soonish.

YMMV,

-Jonathan


On Sep 30, 2014, at 12:37 PM, Neha Narkhede  wrote:

> Can we discuss the need for 0.8.1.2? I'm wondering if it's related to the
> timeline of 0.8.2 in any way? For instance, if we can get 0.8.2 out in the
> next 2-3 weeks, do we still need to get 0.8.1.2 out or can people just
> upgrade to 0.8.2?
> 
> On Tue, Sep 30, 2014 at 9:53 AM, Joe Stein  wrote:
> 
>> Hi, I wanted to kick off a specific discussion on a 0.8.1.2 release.
>> 
>> Here are the JIRAs I would like to propose to back port a patch (if not
>> already done so) and apply them to the 0.8.1 branch for a 0.8.1.2 release
>> 
>> https://issues.apache.org/jira/browse/KAFKA-1502 (source jar is empty)
>> https://issues.apache.org/jira/browse/KAFKA-1419 (cross build for scala
>> 2.11)
>> https://issues.apache.org/jira/browse/KAFKA-1382 (Update zkVersion on
>> partition state update failures)
>> https://issues.apache.org/jira/browse/KAFKA-1490 (remove gradlew initial
>> setup output from source distribution)
>> https://issues.apache.org/jira/browse/KAFKA-1645 (some more jars in our
>> src
>> release)
>> 
>> If the community and committers can comment on the patches proposed that
>> would be great. If I missed any bring them up or if you think any I have
>> proposed shouldn't be int he release bring that up too please.
>> 
>> Once we have consensus on this thread my thought was that I would apply and
>> commit the agreed to tickets to the 0.8.1 branch. If any tickets don't
>> apply of course a back port patch has to happen through our standard
>> process (not worried about that we have some engineering cycles to
>> contribute to making that happen). Once that is all done, I will build
>> 0.8.1.2 release artifacts and call a VOTE for RC1.
>> 
>> /***
>> Joe Stein
>> Founder, Principal Consultant
>> Big Data Open Source Security LLC
>> http://www.stealth.ly
>> Twitter: @allthingshadoop 
>> /
>> 



Re: Copying messages from a single partition topic to a multi-partition topic

2014-09-19 Thread Jonathan Weeks
I would look at writing a service that reads from your existing topic and 
writes to a new topic with (e.g. four) partitions.

You will also need to pay attention to the partitioning policy (or implement 
your own), as the default hashing in the current kafka version default can lead 
to poor distribution.

Best Regards,

-Jonathan

 
On Sep 19, 2014, at 8:57 AM, Dennis Haller  wrote:

> Hi,
> 
> We have an interesting problem to solve due to a very large traffic volumes
> on particular topics. In our initial system configuration we had only one
> partition per topic, and in in a couple of topics we have built up huge
> backlogs of several million messages that our consumers are slowly
> processing.
> 
> However, now that we have this constant backlog, we wish to repartition
> those topics into several partitions, and allow parallel consumers to run
> to handle the high message volume.
> 
> If we simply repartition the topic, say from 1 to 4 partitions, the
> backlogged messages stay in partition 1, while partitions 2,3,4 only get
> newly arrived messages. To eat away the backlog, we need to redistribute
> the backlogged messages evenly among the 4 partitions.
> 
> The tools I've seen do not allow me to rewrite or "replay" the existing
> backlogged messages from one partition into the same or another topic with
> several partitions.  - using kafka.tools.MirrorMaker does not allow me to
> move the data within the same zookeeper network, and
> - using kafka.tools.ReplayLogProducer does not write to multiple
> partitions. It seems that it will write only from a single partition to a
> single partition.
> 
> Does anyone have any other way to solve this problem or a better way of
> using the kafka tools?
> 
> Thanks
> Dennis



Re: High level consumer with separate zk

2014-09-10 Thread Jonathan Weeks
When 0.8.2 arrives in the near future, consumer offsets will be stored by the 
brokers, and thus that workload will not be impacting ZK.

Best Regards,

-Jonathan


On Sep 10, 2014, at 8:20 AM, Mike Marzo  wrote:

> Is it possible for the high level consumer to use a different zk cluster
> than the cluster that manages broker leader electivity?
> The high level consumer adds a lot of value but I don't like the idea that
> bad user code could pound the core zk and effectively hurt the kafka
> brokers
> mike marzo
> 908 209-4484



Re: Use case

2014-09-05 Thread Jonathan Weeks
+1

Topic Deletion with 0.8.1.1 is extremely problematic, and coupled with the fact 
that rebalance/broker membership changes pay a cost per partition today, 
whereby excessive partitions extend downtime in the case of a failure; this 
means fewer topics (e.g. hundreds or thousands) is a best practice in the 
published version of kafka. 

There are also secondary impacts on topic count — e.g. useful operational tools 
such as: http://quantifind.com/KafkaOffsetMonitor/ start to become problematic 
in terms of UX with a massive number of topics.

Once topic deletion is a supported feature, the use-case outlined might be more 
tenable.

Best Regards,

-Jonathan

On Sep 5, 2014, at 4:20 AM, Sharninder  wrote:

> I'm not really sure about your exact use-case but I don't think having a
> topic per user is very efficient. Deleting topics in kafka, at the moment,
> isn't really straightforward. You should rethink your date pipeline a bit.
> 
> Also, just because kafka has the ability to store messages for a certain
> time, don't think of it as a data store. Kafka is a streaming system, think
> of it as a fast queue that gives you the ability to move your pointer back.
> 
> --
> Sharninder
> 
> 
> 
> On Fri, Sep 5, 2014 at 4:27 PM, Aris Alexis 
> wrote:
> 
>> Thanks for the reply. If I use it only for activity streams like twitter:
>> 
>> I would want a topic for each #tag and a topic for each user and maybe
>> foreach city. Would that be too many topics or it doesn't matter since most
>> of them will be deleted in a specified interval.
>> 
>> 
>> 
>> Best Regards,
>> Aris Giachnis
>> 
>> 
>> On Fri, Sep 5, 2014 at 6:57 AM, Sharninder  wrote:
>> 
>>> Since you want all chats and mail history persisted all the time, I
>>> personally wouldn't recommend kafka for your requirement. Kafka is more
>>> suitable as a streaming system where events expire after a certain time.
>>> Look at something more general purpose like hbase for persisting data
>>> indefinitely.
>>> 
>>> So, for example all activity streams can go into kafka from where
>> consumers
>>> will pick up messages to parse and put them to hbase or other clients.
>>> 
>>> --
>>> Sharninder
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Fri, Sep 5, 2014 at 12:05 AM, Aris Alexis 
>>> wrote:
>>> 
 Hello,
 
 I am building a big web application that I want to be massively
>> scalable
>>> (I
 am using cassandra and titan as a general db).
 
 I want to implement the following:
 
 real time web chat that is persisted so that user a in the future can
 recall his chat with user b,c,d much like facebook.
 mail like messages in the web application (not sure about this as it is
 somewhat covered by the first one)
 user activity streams
 users subscribing to topics for example florida/musicevents
 
 Could i use kafka for this? can you recommend another technology maybe?
 
>>> 
>> 



Re: Updated Kafka Roadmap?

2014-09-03 Thread Jonathan Weeks
Hi,

I was wondering whether 0.8.2 is on track for being released this month, as I 
haven’t read much about betas or release candidates in any of the kafka groups 
(although I haven’t been following the IRC)?

Although many of the new features targeted for 0.8.2, including topic deletion, 
the new producer client API and offset mgmt outside ZK would be great, we are 
primarily looking for scala 2.11.x support, and have an in-house build that we 
are currently managing, but would appreciate an update on the timing of the 
official road map.

In case it is looking like stabilizing the new features might push the full 
0.8.2 release out a while, an 0.8.1.2 build with just incremental scala 2.11.x 
support would be highly appreciated by many of us!

https://issues.apache.org/jira/browse/KAFKA-1419

If it is going to be a while, we will likely do additional sustainment work 
with our internal Kafka 2.11 build and begin working with other dependencies as 
well (e.g. akka-kafka) - just trying to get all the information before making 
that investment on something that might have a short lifetime.

Best Regards,

-Jonathan


On Aug 3, 2014, at 9:21 PM, Jun Rao  wrote:

> I just updated the wiki with some rough timelines.
> 
> Thanks,
> 
> Jun
> 
> 
> On Fri, Aug 1, 2014 at 11:52 AM, Jonathan Weeks 
> wrote:
> 
>> Howdy,
>> 
>> I was wondering if it would be possible to update the release plan:
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan
>> 
>> aligned with the feature roadmap:
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/Index
>> 
>> We have several active projects actively and planning to use Kafka, and
>> any current guidance on the new releases related to ZK dependence, producer
>> and consumer API/client timing would be very helpful. For example, is 0.8.2
>> possible in August, or is September likely?
>> 
>> Also, any chance something like:
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
>> 
>> …might make it into 0.9?
>> 
>> Thanks!



Re: Trunk backwards compatibility (producer / consumer client questions)

2014-08-29 Thread Jonathan Weeks
Hi Jun,

Jay indicated that the new producer client on trunk is backwards compatible 
with 0.8.1.1 (see thread below) — can you elaborate?

Given the consumer re-write for 0.9, I can definitely see how that would break 
backwards compatibility, but Jay indicates that the producer on the trunk will 
work with older existing brokers...

Thanks,

-Jonathan

On Aug 29, 2014, at 10:32 AM, Jun Rao  wrote:

> The old clients with be compatible with the new broker. However, in order
> to use the new clients, you will need to upgrade to the new broker first.
> 
> Thanks,
> 
> Jun
> 
> 
> On Fri, Aug 29, 2014 at 10:09 AM, Jonathan Weeks 
> wrote:
> 
>> Thanks, Jay. Follow-up questions:
>> 
>> Some of our services will produce and consume. Is there consumer code on
>> trunk that is backwards compatible with an existing 0.8.1.1 broker cluster?
>> If not 0.8.1.1, will the consumer code on trunk work with a 0.8.2 broker
>> cluster when 0.8.2 is released?
>> 
>> (Our code is scala, BTW)
>> 
>> Best Regards,
>> 
>> -Jonathan
>> 
>> 
>> On Aug 26, 2014, at 5:55 PM, Jay Kreps  wrote:
>> 
>>> Also, Jonathan, to answer your question, the new producer on trunk is
>>> running in prod for some use cases at LinkedIn and can be used with
>>> any 0.8.x. version.
>>> 
>>> -Jay
>>> 
>>> On Tue, Aug 26, 2014 at 12:38 PM, Jonathan Weeks
>>>  wrote:
>>>> I am interested in this very topic as well. Also, can the trunk version
>> of the producer be used with an existing 0.8.1.1 broker installation, or
>> does one need to wait for 0.8.2 (at least)?
>>>> 
>>>> Thanks,
>>>> 
>>>> -Jonathan
>>>> 
>>>> On Aug 26, 2014, at 12:35 PM, Ryan Persaud 
>> wrote:
>>>> 
>>>>> Hello,
>>>>> 
>>>>> I'm looking to insert log lines from log files into kafka, but I'm
>> concerned with handling asynchronous send() failures.  Specifically, if
>> some of the log lines fail to send, I want to be notified of the failure so
>> that I can attempt to resend them.
>>>>> 
>>>>> Based on previous threads on the mailing list (
>> http://comments.gmane.org/gmane.comp.apache.kafka.user/1322), I know that
>> the trunk version of kafka supports callbacks for dealing with failures.
>> However, the callback function is not passed any metadata that can be used
>> by the producer end to reference the original message.  Including the key
>> of the message in the RecordMetadata seems like it would be really useful
>> for recovery purposes.  Is anyone using the callback functionality to
>> trigger resends of failed messages?  If so, how are they tying the
>> callbacks to messages?  Is anyone using other methods for handling async
>> errors/resending today?  I can’t imagine that I am the only one trying to
>> do this.  I asked this question on the IRC channel today, and it sparked
>> some discussion, but I wanted to hear from a wider audience.
>>>>> 
>>>>> Thanks for the information,
>>>>> -Ryan
>>>>> 
>>>> 
>> 
>> 



Trunk backwards compatibility (producer / consumer client questions)

2014-08-29 Thread Jonathan Weeks
Thanks, Jay. Follow-up questions:

Some of our services will produce and consume. Is there consumer code on trunk 
that is backwards compatible with an existing 0.8.1.1 broker cluster? If not 
0.8.1.1, will the consumer code on trunk work with a 0.8.2 broker cluster when 
0.8.2 is released?

(Our code is scala, BTW)

Best Regards,

-Jonathan


On Aug 26, 2014, at 5:55 PM, Jay Kreps  wrote:

> Also, Jonathan, to answer your question, the new producer on trunk is
> running in prod for some use cases at LinkedIn and can be used with
> any 0.8.x. version.
> 
> -Jay
> 
> On Tue, Aug 26, 2014 at 12:38 PM, Jonathan Weeks
>  wrote:
>> I am interested in this very topic as well. Also, can the trunk version of 
>> the producer be used with an existing 0.8.1.1 broker installation, or does 
>> one need to wait for 0.8.2 (at least)?
>> 
>> Thanks,
>> 
>> -Jonathan
>> 
>> On Aug 26, 2014, at 12:35 PM, Ryan Persaud  wrote:
>> 
>>> Hello,
>>> 
>>> I'm looking to insert log lines from log files into kafka, but I'm 
>>> concerned with handling asynchronous send() failures.  Specifically, if 
>>> some of the log lines fail to send, I want to be notified of the failure so 
>>> that I can attempt to resend them.
>>> 
>>> Based on previous threads on the mailing list 
>>> (http://comments.gmane.org/gmane.comp.apache.kafka.user/1322), I know that 
>>> the trunk version of kafka supports callbacks for dealing with failures.  
>>> However, the callback function is not passed any metadata that can be used 
>>> by the producer end to reference the original message.  Including the key 
>>> of the message in the RecordMetadata seems like it would be really useful 
>>> for recovery purposes.  Is anyone using the callback functionality to 
>>> trigger resends of failed messages?  If so, how are they tying the 
>>> callbacks to messages?  Is anyone using other methods for handling async 
>>> errors/resending today?  I can’t imagine that I am the only one trying to 
>>> do this.  I asked this question on the IRC channel today, and it sparked 
>>> some discussion, but I wanted to hear from a wider audience.
>>> 
>>> Thanks for the information,
>>> -Ryan
>>> 
>> 



Re: Handling send failures with async producer

2014-08-26 Thread Jonathan Weeks
I am interested in this very topic as well. Also, can the trunk version of the 
producer be used with an existing 0.8.1.1 broker installation, or does one need 
to wait for 0.8.2 (at least)?

Thanks,

-Jonathan

On Aug 26, 2014, at 12:35 PM, Ryan Persaud  wrote:

> Hello,
> 
> I'm looking to insert log lines from log files into kafka, but I'm concerned 
> with handling asynchronous send() failures.  Specifically, if some of the log 
> lines fail to send, I want to be notified of the failure so that I can 
> attempt to resend them.
> 
> Based on previous threads on the mailing list 
> (http://comments.gmane.org/gmane.comp.apache.kafka.user/1322), I know that 
> the trunk version of kafka supports callbacks for dealing with failures.  
> However, the callback function is not passed any metadata that can be used by 
> the producer end to reference the original message.  Including the key of the 
> message in the RecordMetadata seems like it would be really useful for 
> recovery purposes.  Is anyone using the callback functionality to trigger 
> resends of failed messages?  If so, how are they tying the callbacks to 
> messages?  Is anyone using other methods for handling async errors/resending 
> today?  I can’t imagine that I am the only one trying to do this.  I asked 
> this question on the IRC channel today, and it sparked some discussion, but I 
> wanted to hear from a wider audience.
> 
> Thanks for the information,
> -Ryan
> 



Re: Kafka build for Scala 2.11

2014-08-22 Thread Jonathan Weeks
+1 on a 0.8.1.2 release with support for Scala 2.11.x.

-Jonathan


On Aug 22, 2014, at 11:19 AM, Joe Stein  wrote:

> The changes are committed to trunk.  We didn't create the patch for 0.8.1.1
> since there were code changes required and we dropped support for Scala 2.8
> ( so we could just upload the artificats without a vote )
> 
> https://issues.apache.org/jira/secure/attachment/12660369/KAFKA-1419_2014-08-07_10%3A52%3A18.patch
> is the version you want.
> 
> If this is pressing for folks and can't wait for 0.8.2 or don't want to
> upgrade right away then doing a 0.8.1.2 release is an option...maybe some
> other things too...i.e. empty source jars.   I would prepare and vote on it
> if others would too.
> 
> /***
> Joe Stein
> Founder, Principal Consultant
> Big Data Open Source Security LLC
> http://www.stealth.ly
> Twitter: @allthingshadoop
> /
> On Aug 22, 2014 1:03 PM, "Seshadri, Balaji" 
> wrote:
> 
>> Hi Team,
>> 
>> We are trying to compile 0.8.1.1 with Scala 2.11 and its giving me
>> compilation errors.
>> 
>> Please let me know which patch should I apply from below JIRA.I tried with
>> latest one and it failed to apply.
>> 
>> https://issues.apache.org/jira/browse/KAFKA-1419
>> 
>> Thanks,
>> 
>> Balaji
>> 



Re: Kafka build for Scala 2.11

2014-08-22 Thread Jonathan Weeks
I hand-applied this patch https://reviews.apache.org/r/23895/diff/ to the kafka 
0.8.1.1 branch and was able to build successfully:

gradlew -PscalaVersion=2.11.2 
-PscalaCompileOptions.useAnt=false releaseTarGz -x signArchives

I am testing the jar now, and will let you know if I can run 
producers/consumers against a vanilla 0.8.1.1 broker cluster with it...

-Jonathan

On Aug 22, 2014, at 11:02 AM, Seshadri, Balaji  wrote:

> Hi Team,
> 
> We are trying to compile 0.8.1.1 with Scala 2.11 and its giving me 
> compilation errors.
> 
> Please let me know which patch should I apply from below JIRA.I tried with 
> latest one and it failed to apply.
> 
> https://issues.apache.org/jira/browse/KAFKA-1419
> 
> Thanks,
> 
> Balaji



Re: consumer read from specific partition

2014-08-18 Thread Jonathan Weeks
The high level API gives you access to the raw Kafka event which is 
MessageAndMetadata, which has two pieces: key and actual message.

Best Regards,

-Jonathan


On Aug 18, 2014, at 9:50 AM, Josh J  wrote:

>> One tactic that might be worth exploring is to rely on the message key to
> facilitate this.
> 
>> It would require engineering careful functions for the key which hashes
> to the partitions for your topic(s). It would also mean that your consumers
> for the topic would be evaluating the key and discarding messages that
> aren’t relevant.
> 
>> The only other option I can think of if you are using the high-level API
> would be finer-grained topics.
> 
> Yes, this works great ! My producer is bucketing the messages based on the
> key (the key is a timestamp and I simply mod the timestamp as a function of
> the number of buckets). I can then do the same function on the consumer
> when it reads the key. I'm essentially implementing consumer sliding
> window. Any suggestions or tips on where I would implement reading the
> message key?
> 
> Thanks,
> Josh
> 
> 
> On Mon, Aug 18, 2014 at 6:43 PM, Jonathan Weeks 
> wrote:
> 
>> One tactic that might be worth exploring is to rely on the message key to
>> facilitate this.
>> 
>> It would require engineering careful functions for the key which hashes to
>> the partitions for your topic(s). It would also mean that your consumers
>> for the topic would be evaluating the key and discarding messages that
>> aren’t relevant.
>> 
>> The only other option I can think of if you are using the high-level API
>> would be finer-grained topics.
>> 
>> Best Regards,
>> 
>> -Jonathan
>> 
>> On Aug 18, 2014, at 9:14 AM, Josh J  wrote:
>> 
>>> Is it possible to modify and use the high level consumer so that I can
>>> ignore processing certain partitions?
>>> 
>>> 
>>> On Mon, Aug 18, 2014 at 5:07 PM, Sharninder 
>> wrote:
>>> 
>>>> On Mon, Aug 18, 2014 at 7:27 PM, Josh J  wrote:
>>>> 
>>>>>> You can see an example of using the SimpleConsumer here
>>>>> <
>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
>>>>>> 
>>>>> 
>>>>> Any suggestions on where in the code to modify the high level producer
>> to
>>>>> support reading from specific partitions ?
>>>>> 
>>>>> 
>>>> High level producer? I'm assuming you meant to write the high level
>>>> consumer, in which case it isn't possible. The link above, which has an
>>>> example for reading messages off a specific partition, is for the Simple
>>>> consumer, which ironically, is more complex than the high level
>> consumer.
>>>> 
>>>> In short, if you have a usecase where you want to read from a specific
>>>> partition, you will need to implement a simple consumer.
>>>> 
>>>> --
>>>> Sharninder
>>>> 
>>>> Josh
>>>>> .
>>>>> 
>>>>> On Thu, Aug 14, 2014 at 4:27 PM, Neha Narkhede <
>> neha.narkh...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> You can see an example of using the SimpleConsumer here
>>>>>> <
>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
>>>>>>> 
>>>>>> .
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Aug 14, 2014 at 3:23 AM, Sharninder 
>>>>> wrote:
>>>>>> 
>>>>>>> Implement the low level "Simple Consumer".
>>>>>>> 
>>>>>>> --
>>>>>>> Sharninder
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Aug 14, 2014 at 2:16 PM, Josh J  wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> Suppose I have N partitions. I would like to have X different
>>>>> consumer
>>>>>>>> threads ( X < N) read from a specified set of partitions. How can I
>>>>>>> achieve
>>>>>>>> this?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> 
>>>>>>>> Josh
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 
>> 



Re: consumer read from specific partition

2014-08-18 Thread Jonathan Weeks
One tactic that might be worth exploring is to rely on the message key to 
facilitate this.

It would require engineering careful functions for the key which hashes to the 
partitions for your topic(s). It would also mean that your consumers for the 
topic would be evaluating the key and discarding messages that aren’t relevant.

The only other option I can think of if you are using the high-level API would 
be finer-grained topics.

Best Regards,

-Jonathan

On Aug 18, 2014, at 9:14 AM, Josh J  wrote:

> Is it possible to modify and use the high level consumer so that I can
> ignore processing certain partitions?
> 
> 
> On Mon, Aug 18, 2014 at 5:07 PM, Sharninder  wrote:
> 
>> On Mon, Aug 18, 2014 at 7:27 PM, Josh J  wrote:
>> 
 You can see an example of using the SimpleConsumer here
>>> <
>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
 
>>> 
>>> Any suggestions on where in the code to modify the high level producer to
>>> support reading from specific partitions ?
>>> 
>>> 
>> High level producer? I'm assuming you meant to write the high level
>> consumer, in which case it isn't possible. The link above, which has an
>> example for reading messages off a specific partition, is for the Simple
>> consumer, which ironically, is more complex than the high level consumer.
>> 
>> In short, if you have a usecase where you want to read from a specific
>> partition, you will need to implement a simple consumer.
>> 
>> --
>> Sharninder
>> 
>> Josh
>>> .
>>> 
>>> On Thu, Aug 14, 2014 at 4:27 PM, Neha Narkhede 
>>> wrote:
>>> 
 You can see an example of using the SimpleConsumer here
 <
 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> 
 .
 
 
 
 On Thu, Aug 14, 2014 at 3:23 AM, Sharninder 
>>> wrote:
 
> Implement the low level "Simple Consumer".
> 
> --
> Sharninder
> 
> 
> 
> On Thu, Aug 14, 2014 at 2:16 PM, Josh J  wrote:
> 
>> Hi,
>> 
>> Suppose I have N partitions. I would like to have X different
>>> consumer
>> threads ( X < N) read from a specified set of partitions. How can I
> achieve
>> this?
>> 
>> Thanks,
>> 
>> Josh
>> 
> 
 
>>> 
>> 



Re: Architecture: amount of partitions

2014-08-08 Thread Jonathan Weeks

The approach may well depend on your deploy horizon. Currently the offset 
tracking of each partition is done in Zookeeper, which places an upper limit on 
the number of topic/partitions you want to have and operate with any kind of 
efficiency.

In 0.8.2 hopefully coming in the next month or two, consumer offset tracking is 
done via Kafka topics / internally rather than in ZK, so the above partition 
count scalability issue isn’t as severe.

From the Broker side, some filesystems such as XFS have no problem with 
hundreds of thousands of files in a directory. My experience with EXT3,4 with 
lots of files is less happy.

Also, I’m not sure about your retention policy needs for messages in the broker 
(usually 7 days by default). Using Kafka as a long term DB probably isn’t a 
great fit.

Another approach to consider is to store users into fewer topics, and 
differentiate based on a message key which contains the user-id, for example.

Best Regards,

-JW

On Aug 8, 2014, at 12:35 PM, Roman Iakovlev  wrote:

> Dear all,
> 
> 
> 
> I'm new to Kafka, and I'm considering using it for a maybe not very usual
> purpose. I want it to be a backend for data synchronization between a
> magnitude of devices, which are not always online (mobile and embedded
> devices). All the synchronized information belong to some user, and can be
> identified by the user id. There are several data types, and a user can have
> many entries of each data type coming from many different devices.
> 
> 
> 
> This solution has to scale up to hundreds of thousands of users, and, as far
> as I understand, Kafka stores every partition in a single file. I've been
> thinking about creating a topic for every data type and a separate partition
> for every user. Amount of data stored by every user is no more than several
> megabytes over the whole lifetime, because the data stored would be keyed
> messages, and I'm expecting it to be compacted.
> 
> 
> 
> So what I'm wondering is, would Kafka be a right approach for such task, and
> if yes, would this architecture (one topic per data type and one partition
> per user) scale to specified extent?
> 
> 
> 
> Thanks, 
> 
> Roman.
> 



Re: Apache webserver access logs + Kafka producer

2014-08-05 Thread Jonathan Weeks
You can look at something like: 

https://github.com/harelba/tail2kafka

(although I don’t know what the effort would be to update it, as it doesn’t 
look like it has been updated in a couple years)

We are using flume to gather logs, and then sending them to a kafka cluster via 
a flume kafka sink — e.g..

https://github.com/thilinamb/flume-ng-kafka-sink

-Jonathan


On Aug 5, 2014, at 1:40 PM, mvs.s...@gmail.com wrote:

> Hi,
> 
> I want to collect apache web server logs in real time and send it to Kafka
> server. Is there any existing Producer available to do this operation, If
> not can you please provide a way to implement it.
> 
> Regards,
> Sree.



Updated Kafka Roadmap?

2014-08-01 Thread Jonathan Weeks
Howdy, 

I was wondering if it would be possible to update the release plan:

https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan

aligned with the feature roadmap:

https://cwiki.apache.org/confluence/display/KAFKA/Index

We have several active projects actively and planning to use Kafka, and any 
current guidance on the new releases related to ZK dependence, producer and 
consumer API/client timing would be very helpful. For example, is 0.8.2 
possible in August, or is September likely?

Also, any chance something like:

https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer

…might make it into 0.9?

Thanks!