Querying consumer groups programmatically (from Golang)

2017-08-12 Thread Jens Rantil
Hi,

I am one of the maintainers of prometheus-kafka-consumer-group-exporter[1],
which exports consumer group offsets and lag to Prometheus. The way we
currently scrape this information is by periodically executing
`kafka-consumer-groups.sh --describe` for each group and parse the output.

Recently the output from `kafka-consumer-groups.sh --describe` was
changed[2]. While I am working on a patch[3] to accomodate for the new
output format I was wondering if there is an easier, possibly more
stable[4] and more future proof, way for our project to extract the
information we are interested in. Does anyone know of a Go library that
could extract the metrics we need? Or would it make sense to refactor
`kafka-consumer-groups.sh` to support a more structured output? I'd love to
hear your input.

Also, if Kafka exported the same metrics through JMX our project would not
exist, but maybe that's another story...

Cheers,
Jens

[1] https://github.com/kawamuray/prometheus-kafka-consumer-group-exporter
[2]
https://github.com/kawamuray/prometheus-kafka-consumer-group-exporter/issues/24
[3]
https://github.com/kawamuray/prometheus-kafka-consumer-group-exporter/pull/29
[4] We've also encountered `kafka-consumer-groups.sh` hanging a few times
in production. There's a race condition somewhere in the script, most
likely when a topic is rebalancing. Currently we kill the process if it
doesn't finish within a timeout. See
https://github.com/kawamuray/prometheus-kafka-consumer-group-exporter/blob/e4cdc3b1245f636d89d7e227066f02578d732165/kafka/collector.go#L44
.

-- 
Want to communicate with me securely? You can find my PGP key here
.


Re: Support for Kafka

2016-10-10 Thread Jens Rantil
Hi Syed,

Apache Kafka runs on a JVM. I think the question you should ask is -- which
JVM does Apache Kafka require in production*? It doesn't really depend on
anything on a specific Linux distribution.

* ...and I don't have that answer ;-)

Cheers,
Jens

On Wednesday, October 5, 2016, Syed Hussaini <
syed.hussa...@theexchangelab.com> wrote:

> Dear Kafka team.
>
> I am in the Implementation stage of Kafka cluster and looking to find
> out does Apache Kafka supported for Ubuntu 16.04 LTS – Xenial.
>
>
>
> Would be great if you please let us know.
>
>
>
>
>
> [image: The Exchange Lab] <http://www.theexchangelab.com/>
>
> *Syed Hussaini*
> Infrastructure Engineer
>
> 1 Neathouse Place
> 5th Floor
> London, England, SW1V 1LH
>
>
> syed.hussa...@theexchangelab.com
> <javascript:_e(%7B%7D,'cvml','syed.hussa...@theexchangelab.com');>
>
> T 0203 701 3177
>
>
> --
>
> Follow us on Twitter: @exchangelab <https://twitter.com/exchangelab> | Visit
> us on LinkedIn: The Exchange Lab
> <https://www.linkedin.com/company/the-exchange-lab>
>
>
>
>
>


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: any update on this?

2016-09-20 Thread Jens Rantil
> not aware of any shortfall with zookeeper so perhaps you can suggest
advantages for Consul vs Zookeeper?

Maybe it's somewhat off-topic here, but Consul has several advantages over
Zookeeper:
 * It's IMHO easier to maintain, add leader nodes, remove leader nodes etc.
 * Has high level service discovery mechanisms supporting both healthchecks
and and a flexible DNS hook to load balance to services.
 * Talks HTTP, so it's much easier to integrate with any programming
language/platform.
 * Support for multiple datacenters.
 * Zookeeper supports keep-alive as healthcheck. Consul supports more
high-level healthchecks.

While Consul is more opinionated, it also has more features and highlevel.
IMHO, that makes it easier to work with not have to write thick clients.
You can read more about Consul vs. Zookeeper here:
https://www.consul.io/intro/vs/zookeeper.html

Above said, just like Kant wrote, I don't think moving from Zookeeper to
Consul should be a goal. The above Consul pros are reasons for systems to
use Consul instead of Zookeeper. We would be happy if we could ditch
Zookeeper and use Consul for all coordination to avoid the overhead of
having to maintain two highly consistent datastores.

Cheers,
Jens

On Mon, Sep 19, 2016 at 3:31 PM Martin Gainty <mgai...@hotmail.com> wrote:

> Jens/Kant
> not aware of any shortfall with zookeeper so perhaps you can suggest
> advantages for Consul vs Zookeeper?
> Maven (I am building, testing and running kafka internally with maven)
> implements wagon-providers for URLConnection vs HttpURLConnection
> wagonshttps://maven.apache.org/guides/mini/guide-wagon-providers.html
> Thinking a network_provider should work for integrating external network
> provider. how would you architect this integration?
>
> would a configurable network-provider such as maven-wagon-provider work
> for kafka?Martin
>
> > From: kanth...@gmail.com
> > To: users@kafka.apache.org
> > Subject: Re: any update on this?
> > Date: Mon, 19 Sep 2016 09:41:10 +
> >
> > Yes ofcourse the goal shouldn't be moving towards consul. It should just
> be
> > flexible enough for users to pick any distributed coordinated system.
> >
> >
> >
> >
> >
> >
> > On Mon, Sep 19, 2016 2:23 AM, Jens Rantil jens.ran...@tink.se
> > wrote:
> > I think I read somewhere that the long-term goal is to make Kafka
> >
> > independent of Zookeeper alltogether. Maybe not worth spending time on
> >
> > migrating to Consul in that case.
> >
> >
> >
> >
> > Cheers,
> >
> > Jens
> >
> >
> >
> >
> > On Sat, Sep 17, 2016 at 10:38 PM Jennifer Fountain <jfount...@meetme.com
> >
> >
> > wrote:
> >
> >
> >
> >
> > > +2 watching.
> >
> > >
> >
> > > On Sat, Sep 17, 2016 at 2:45 AM, kant kodali <kanth...@gmail.com>
> wrote:
> >
> > >
> >
> > > > https://issues.apache.org/jira/browse/KAFKA-1793
> >
> > > > It would be great to use Consul instead of Zookeeper for Kafka and I
> >
> > > think
> >
> > > > it
> >
> > > > would benefit Kafka a lot from the exponentially growing consul
> >
> > > community.
> >
> > >
> >
> > >
> >
> > >
> >
> > >
> >
> > > --
> >
> > >
> >
> > >
> >
> > > Jennifer Fountain
> >
> > > DevOPS
> >
> > >
> >
> > --
> >
> >
> >
> >
> > Jens Rantil
> >
> > Backend Developer @ Tink
> >
> >
> >
> >
> > Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
> >
> > For urgent matters you can reach me at +46-708-84 18 32.
>

-- 

Jens Rantil
Backend Developer @ Tink

Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
For urgent matters you can reach me at +46-708-84 18 32.


Re: any update on this?

2016-09-19 Thread Jens Rantil
I think I read somewhere that the long-term goal is to make Kafka
independent of Zookeeper alltogether. Maybe not worth spending time on
migrating to Consul in that case.

Cheers,
Jens

On Sat, Sep 17, 2016 at 10:38 PM Jennifer Fountain <jfount...@meetme.com>
wrote:

> +2 watching.
>
> On Sat, Sep 17, 2016 at 2:45 AM, kant kodali <kanth...@gmail.com> wrote:
>
> > https://issues.apache.org/jira/browse/KAFKA-1793
> > It would be great to use Consul instead of Zookeeper for Kafka and I
> think
> > it
> > would benefit Kafka a lot from the exponentially growing consul
> community.
>
>
>
>
> --
>
>
> Jennifer Fountain
> DevOPS
>
-- 

Jens Rantil
Backend Developer @ Tink

Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
For urgent matters you can reach me at +46-708-84 18 32.


Large # of Topics/Partitions

2016-09-16 Thread Jens Rantil
Hi,

This might also be of interest: http://www.confluent
.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/

Cheers,
Jens

On Monday, August 8, 2016, Daniel Fagnan <dan...@segment.com> wrote:

> Thanks Tom! This was very helpful and I’ll explore having a more static
> set of partitions as that seems to fit Kafka a lot better.
>
> Cheers,
> Daniel
>
> > On Aug 8, 2016, at 12:27 PM, Tom Crayford <tcrayf...@heroku.com> wrote:
> >
> > Hi Daniel,
> >
> > Kafka doesn't provide this kind of isolation or scalability for many many
> > streams. The usual design is to use a consistent hash of some "key" to
> > attribute your data to a particular partition. That of course, doesn't
> > isolate things fully, but has everything in a partition dependent on each
> > other.
> >
> > We've found that over a few thousand to a few tens of thousands of
> > partitions clusters hit a lot of issues (it depends on the write pattern,
> > how much memory you give brokers and zookeeper, and if you plan on ever
> > deleting topics).
> >
> > Another option is to manage multiple clusters, and keep under a certain
> > limit of partitions in each cluster. That is of course additional
> > operational overhead and complexity.
> >
> > I'm not sure I 100% understand your mechanism for tracking pending
> offsets,
> > but it seems like that might be your best option.
> >
> > Thanks
> >
> > Tom Crayford
> > Heroku Kafka
> >
> > On Mon, Aug 8, 2016 at 8:12 PM, Daniel Fagnan <dan...@segment.com>
> wrote:
> >
> >> Hey all,
> >>
> >> I’m currently in the process of designing a system around Kafka and I’m
> >> wondering the recommended way to manage topics. Each event stream we
> have
> >> needs to be isolated from each other. A failure from one should not
> affect
> >> another event stream from processing (by failure, we mean a downstream
> >> failure that would require us to replay the messages).
> >>
> >> So my first thought was to create a topic per event stream. This allows
> a
> >> larger event stream to be partitioned for added parallelism but keep the
> >> default # of partitions down as much as possible. This would solve the
> >> isolation requirement in that a topic can keep failing and we’ll
> continue
> >> replaying the messages without affected all the other topics.
> >>
> >> We read it’s not recommended to have your data model dictate the # of
> >> partitions or topics in Kafka and we’re unsure about this approach if we
> >> need to triple our event stream.
> >>
> >> We’re currently looking at 10,000 event streams (or topics) but we don’t
> >> want to be spinning up additional brokers just so we can add more event
> >> stream, especially if the load for each is reasonable.
> >>
> >> Another option we were looking into was to not isolate at the
> >> topic/partition level but to keep a set of pending offsets persisted
> >> somewhere (seemingly what Twitter Heron or Storm does but they don’t
> seem
> >> to persist the pending offsets).
> >>
> >> Thoughts?
>
>

-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: Balancing based on lag

2016-09-12 Thread Jens Rantil
Hi Stevo,

Thank you for your response. Yes, I understand there can only be one active
consumer per partition and I understand that partitions should be spread
evenly. However, there will almost always be cases when they are somewhat
unbalanced.

That said, how consumers are distributed among partitions is not by design.
Let me clarify: My questions was whether there has been any discussion on
1) having the rebalancing algorithm taking lag into account to assign
partitions to consumers in such way that it tries to spread sum of lag per
consumer as evenly as possible and 2) possibly triggering rebalancing
algorithm if lag could be considerably improved.

I have two specific use cases:

   1. I recently added new partitions to a topic and reset offsets to
   beginning. This meant that the old partitions had much more data than the
   new partitions. The rebalance when adding new nodes was definitely not
   "fair" in terms of lag. I had to scale up to have equal number of consumers
   as number of partitions to be sure that a single consumers was not assigned
   two of the old partitions.
   2. Autoscaling. When consumers come and go based on CPU load or
   whatever, they will invariably have different numbers of partitions
   assigned to them. Lag aware rebalancer would definitely be a small
   optimisation that could do quite a lot here.

If no discussion of this has been done, I'll consider writing a KIP for it.

Cheers,
Jens

On Mon, Sep 12, 2016 at 12:25 AM Stevo Slavić <ssla...@gmail.com> wrote:

> Hello Jens,
>
> By design there can be only one active consumer per consumer group per
> partion at a time, only one thread after being assigned (acquiring a lock)
> moves the offset for consumer group for partition, so no concurrency
> problems. Typically that assignment lasts long until rebalancing gets
> triggered e.g. because consumer instance joined or left the same group.
>
> To have all active consumers evenly loaded, one has to have publishing
> evenly distribute messages across the partitions with appropriate
> partitioning strategy. Check which one is being used with which settings.
> It may be tuned well already but maybe not for your test e.g. publishes 20k
> messages to one partition before publishing to next one (assumes lots of
> messages will be published so batches writes, trading of temp uneven
> balancing for better throughput), so if test publishes 40k messages only,
> only two partitions will actually get the data.
>
> Kind regards,
> Stevo Slavic.
>
> On Sun, Sep 11, 2016, 22:49 Jens Rantil <jens.ran...@tink.se> wrote:
>
> > Hi,
> >
> > We have a partition which has many more messages than all other
> partitions.
> > Does anyone know if there has been any discussions on having a partition
> > balancer that tries to balance consumers based on consumer group lag?
> >
> > Example:
> >
> > [
> > { partition: 0, consumers: "192.168.1.2", lag: 2 },
> > { partition: 1, consumers: "192.168.1.2", lag: 2 },
> > { partition: 2, consumers: "192.168.1.3", lag: 0 },
> > ]
> >
> > Clearly, it would be more optimial if "192.168.1.3" also takes care of
> > partition 1.
> >
> > Cheers,
> > Jens
> >
> >
> > --
> > Jens Rantil
> > Backend engineer
> > Tink AB
> >
> > Email: jens.ran...@tink.se
> > Phone: +46 708 84 18 32
> > Web: www.tink.se
> >
> > Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> > <
> >
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> > >
> >  Twitter <https://twitter.com/tink>
> >
>
-- 

Jens Rantil
Backend Developer @ Tink

Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
For urgent matters you can reach me at +46-708-84 18 32.


Balancing based on lag

2016-09-11 Thread Jens Rantil
Hi,

We have a partition which has many more messages than all other partitions.
Does anyone know if there has been any discussions on having a partition
balancer that tries to balance consumers based on consumer group lag?

Example:

[
{ partition: 0, consumers: "192.168.1.2", lag: 2 },
{ partition: 1, consumers: "192.168.1.2", lag: 2 },
{ partition: 2, consumers: "192.168.1.3", lag: 0 },
]

Clearly, it would be more optimial if "192.168.1.3" also takes care of
partition 1.

Cheers,
Jens


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Impact of reducing topic retention with lagging consumer group

2016-09-11 Thread Jens Rantil
Hi,

We have an idempotent consumer group which is consuming a single topic. Due
to a software mistake, we just reset the consumer group's offsets to
consume from beginning.

Our topic retention is 7 days. Our software mistake was made 3 days ago. We
are currently processing the second oldest day of our logs. If I reduce
topic retention to 3 days and brokers purge old logs, will consumer groups
automagically start consuming from the "new beginning" (that is, new
smallest offset)? This would save us some processing time...

Thanks,
Jens
-- 

Jens Rantil
Backend Developer @ Tink

Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
For urgent matters you can reach me at +46-708-84 18 32.


Re: Kafka encryption

2016-05-26 Thread Jens Rantil
> How can we do file system encryption?

Google "LUKS" and you should be able to find some reasources on disk-level
encryption.

Cheers,
Jens

On Wed, May 25, 2016 at 11:59 AM Tom Crayford <tcrayf...@heroku.com> wrote:

> If you're using EBS then it's a single flag to use encrypted drives at the
> provision time of the volume. I don't know about the other storage options,
> I'd recommend looking at the AWS documentation.
>
> Thanks
>
> Tom Crayford
> Heroku Kafka
>
> On Wednesday, 25 May 2016, Snehalata Nagaje <
> snehalata.nag...@harbingergroup.com> wrote:
>
> >
> >
> > Thanks,
> >
> > How can we do file system encryption?
> >
> > we are using aws environment.
> >
> > Thanks,
> > Snehalata
> >
> > - Original Message -
> > From: "Gerard Klijs" <gerard.kl...@dizzit.com <javascript:;>>
> > To: "Users" <users@kafka.apache.org <javascript:;>>
> > Sent: Tuesday, May 24, 2016 7:26:27 PM
> > Subject: Re: Kafka encryption
> >
> > For both old and new consumers/producers you can make your own
> > (de)serializer to do some encryption, maybe that could be an option?
> >
> > On Tue, May 24, 2016 at 2:40 PM Tom Crayford <tcrayf...@heroku.com
> > <javascript:;>> wrote:
> >
> > > Hi,
> > >
> > > There's no encryption at rest. It's recommended to use filesystem
> > > encryption, or encryption of each individual message before producing
> it
> > > for this.
> > >
> > > Only the new producer and consumers have SSL support.
> > >
> > > Thanks
> > >
> > > Tom Crayford
> > > Heroku Kafka
> > >
> > > On Tue, May 24, 2016 at 11:33 AM, Snehalata Nagaje <
> > > snehalata.nag...@harbingergroup.com <javascript:;>> wrote:
> > >
> > > >
> > > >
> > > > Thanks for quick reply.
> > > >
> > > > Do you mean If I see messages in kafka, those will not be readable?
> > > >
> > > > And also, we are using new producer but old consumer , does old
> > consumer
> > > > have ssl support?
> > > >
> > > > As mentioned in document, its not there.
> > > >
> > > >
> > > > Thanks,
> > > > Snehalata
> > > >
> > > > - Original Message -
> > > > From: "Mudit Kumar" <mudit.ku...@askme.in <javascript:;>>
> > > > To: users@kafka.apache.org <javascript:;>
> > > > Sent: Tuesday, May 24, 2016 3:53:26 PM
> > > > Subject: Re: Kafka encryption
> > > >
> > > > Yes,it does that.What specifically you are looking for?
> > > >
> > > >
> > > >
> > > >
> > > > On 5/24/16, 3:52 PM, "Snehalata Nagaje" <
> > > > snehalata.nag...@harbingergroup.com <javascript:;>> wrote:
> > > >
> > > > >Hi All,
> > > > >
> > > > >
> > > > >We have requirement of encryption in kafka.
> > > > >
> > > > >As per docs, we can configure kafka with ssl, for secured
> > communication.
> > > > >
> > > > >But does kafka also stores data in encrypted format?
> > > > >
> > > > >
> > > > >Thanks,
> > > > >Snehalata
> > > >
> > >
> >
>
-- 

Jens Rantil
Backend Developer @ Tink

Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
For urgent matters you can reach me at +46-708-84 18 32.


Re: Large kafka deployment on virtual hardware

2016-05-23 Thread Jens Rantil
Hi Jahn,

How is the load on Zookeeper? How often are you committing your offsets?
Could that be an issue?

Cheers,
Jens

Den mån 23 maj 2016 18:12Jahn Roux <j...@comprsa.com> skrev:

> I have a large Kafka deployment on virtual hardware: 120 brokers on 32gb
> memory 8 core virtual machines. Gigabit network, RHEL 6.7. 4 Topics, 1200
> partitions each, replication factor of 2 and running Kafka 0.8.1.2
>
>
>
> We are running into issues where our cluster is not keeping up. We have 4
> sets of producers (30 producers per set) set to produce to the 4 topics
> (producers produce to multiple topics). The messages are about 150 byte on
> average and we are attempting to produce between 1 million and 2 million
> messages a second per producer set.
>
>
>
> We run into issues after about 1 million messages a second - just for that
> producer set, the producer buffers fill up and we are blocked from
> producing
> messages. This does not seem to impact the other producer sets - they run
> without issues until they too reach about 1m messages a second.
>
>
>
> Looking at the metrics available to us we do not see a bottleneck, we don't
> see disk I/O maxing out, CPU and network are nominal. We have tried
> increasing and decreasing the Kafka cluster size to no avail, we have gone
> from 100 partitions to 1200 partitions per topic. We have increased and
> decreased the number of producers and yet we run into the same issues. Our
> Kafka config is mostly out the box - 1 hour log roll/retention, increased
> the buffer sizes a bit but other than that it's out the box.
>
>
>
> I was wondering if someone has some recommendations for identifying the
> bottleneck and/or what configuration values we should be taking a look at?
> Is there known issues with Kafka on virtualized hardware or things to watch
> out for when deploying to VMs? Are there use cases where Kafka is being
> used
> in a similar way - +4 million messages a second of discrete 150 byte
> messages?
>
>
>
> Kind regards,
>
>
>
> Jahn Roux
>
>
>
>
>
> ---
> This email has been checked for viruses by Avast antivirus software.
> https://www.avast.com/antivirus
>
-- 

Jens Rantil
Backend Developer @ Tink

Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
For urgent matters you can reach me at +46-708-84 18 32.


Re: How to build kafka jar with all dependencies?

2016-05-05 Thread Jens Rantil
Hi Ravi,

You might want to ask this question on the Kafka developers mailing list.

Cheers,
Jens

On Wednesday, May 4, 2016, ravi singh <rrs120...@gmail.com> wrote:

> I used .*/gradlew jarAll* but still scala libs are missing from the jar?
> ​It should be something ​very simple which I might be missing. Please let
> me know if anyone knows.
>
>
> --
> *Regards,*
> *Ravi*
>


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: Kafka Consumer consuming large number of messages

2016-05-04 Thread Jens Rantil
Hi,

This is a known issue. The 0.10 release will fix this. See
https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records
for some background.

Cheers,
Jens

Den ons 4 maj 2016 19:32Abhinav Solan <abhinav.so...@gmail.com> skrev:

> Hi,
>
> I am using kafka-0.9.0.1 and have configured the Kafka consumer  to fetch
> 8192 bytes by setting max.partition.fetch.bytes
>
> Here are the properties I am using
>
> props.put("bootstrap.servers", servers);
> props.put("group.id", "perf-test");
> props.put("offset.storage", "kafka");
> props.put("enable.auto.commit", "false");
> props.put("session.timeout.ms", 6);
> props.put("request.timeout.ms", 7);
> props.put("heartbeat.interval.ms", 5);
> props.put("auto.offset.reset", "latest");
> props.put("max.partition.fetch.bytes", "8192");
> props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
> I am setting up 12 Consumers with 4 workers each to listen on a topic with
> 200 partitions.
> I have also enabled the compression when sending to Kafka.
>
> The problem I am getting is, even though the fetch size is less, the
> consumers when polling, poll too many records. If the topics have many
> messages and it is behind in the consumption it tries to fetch bigger size,
> if the consumer is not behind then it try and fetch around 45, but anyways
> if I set the max.partition.fetch.bytes shouldn't the fetch size have an
> upper limit ? Is there any other setting I am missing here ?
> I am myself controlling the message size so it's not that some bigger
> messages are coming through, each message must be around 200-300 bytes
> only.
>
> Due the large number of messages it is polling, the inner process sometimes
> not able to finish the process within the heartbeat interval limit, which
> makes the consumer rebalancing kick in, again and again, this only happens
> when the consumer is way behind in offset e.g there are 10 messages to
> be processed in the topic.
>
> Thanks
>
-- 

Jens Rantil
Backend Developer @ Tink

Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
For urgent matters you can reach me at +46-708-84 18 32.


Re: Hash partition of key with skew

2016-05-03 Thread Jens Rantil
Hi,

Not sure if this helps, but the way Loggly seem to do it is to have a
separate topic for "noisy neighbors". See [1].

[1]
https://www.loggly.com/blog/loggly-loves-apache-kafka-use-unbreakable-messaging-better-log-management/

Cheers,
Jens

On Wed, Apr 27, 2016 at 9:11 PM Srikanth <srikanth...@gmail.com> wrote:

> Hello,
>
> Is there a recommendation for handling producer side partitioning based on
> a key with skew?
> We want to partition on something like clientId. Problem is, this key has
> an uniform distribution.
> Its equally likely to see a key with 3k occurrence/day vs 100k/day vs
> 65million/day.
> Cardinality of key is around 1500 and there are approx 1 billion records
> per day.
> Partitioning by hashcode(key)%numOfPartition will create a few "hot
> partitions" and cause a few brokers(and consumer threads) to be overloaded.
> May be these partitions with heavy load are evenly distributed among
> brokers, may be they are not.
>
> I read KIP-22
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-22+-+Expose+a+Partitioner+interface+in+the+new+producer
> >
> that
> explains how one could write a custom partitioner.
> I'd like to know how it was used to solve such data skew.
> We can compute some statistics on key distribution offline and use it in
> the partitioner.
> Is that a good idea? Or is it way too much logic for a partitioner?
> Anything else to consider?
> Any thoughts or reference will be helpful.
>
> Thanks,
> Srikanth
>
-- 

Jens Rantil
Backend Developer @ Tink

Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
For urgent matters you can reach me at +46-708-84 18 32.


Adding a broker

2016-05-03 Thread Jens Rantil
Hi,

When I added a replicated broker to a cluster, will it first stream
historical logs from the master? Or will it simply starts storing new
messages from producers?

Thanks,
Jens
-- 

Jens Rantil
Backend Developer @ Tink

Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
For urgent matters you can reach me at +46-708-84 18 32.


Re: Encryption at Rest

2016-04-25 Thread Jens Rantil
IMHO, I think that responsibility should lie on the file system, not Kafka.
Feels like a waste of time and double work to implement that unless there's
a really good reason for it. Let's try to keep Kafka a focused product that
does one thing well.

Cheers,
Jens

On Fri, Apr 22, 2016 at 3:31 AM Tauzell, Dave <dave.tauz...@surescripts.com>
wrote:

> I meant encryption of the data at rest.  We utilize filesytem encryption
> for other products; just wondering if anything was on the Kafka roadmap.
>
> Dave
>
> > On Apr 21, 2016, at 18:12, Martin Gainty <mgai...@hotmail.com> wrote:
> >
> > Dave-
> > so you want username/password credentials to be sent in response to an
> HTTP Get as clear text?
> > if not this has been asked and answered with Axishttps://
> axis.apache.org/axis2/java/rampart/
> >
> > Martin
> > __
> >
> >
> >
> >> From: dave.tauz...@surescripts.com
> >> To: users@kafka.apache.org
> >> Subject: Encryption at Rest
> >> Date: Thu, 21 Apr 2016 21:31:56 +
> >>
> >> Has there been any discussion or work on at rest encryption for Kafka?
> >>
> >> Thanks,
> >>  Dave
> >>
> >> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of the
> individual or entity to whom they are addressed. If you have received this
> e-mail in error, please notify the sender by reply e-mail immediately and
> destroy all copies of the e-mail and any attachments.
> >
>
-- 

Jens Rantil
Backend Developer @ Tink

Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
For urgent matters you can reach me at +46-708-84 18 32.


Re: Concurrency and KafkaConsumer?

2016-04-25 Thread Jens Rantil
Hi Richard,

> Will this have any impact on the KafkaConsumer?

Make sure you are calling poll() more often than session.timeout.ms,
otherwise Kafka will initiate a rebalance of the consumers for that topic.
An alternative is that you pause() your consumer while being blocked.

You might want to look at
https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records
which
will be included in the next release. This will allow you to rate limit the
number of incoming messages from the poll call instead of rate limit the
messages you are sending outgoing. The advantage of that will also be that
the consumer will make sure you don't loose messages if your application
crashes when messages are consumed, but lingering in your BlockingQueue.

> The proper usage of KafkaConsumer in this case would be:
>
>- Create a thread with a new instance of KafkaConsumer used in it.
>- Ensure the group id for that consumer is unique
>- Ensure only one consumer for each topic is running to avoid
>re-balancing

Sounds sane to me. Note that a single consumer also can subscibe to
multiple topics, but that would obviously make your application single
threaded...

Hope this helps.

Cheers,
Jens

On Fri, Apr 22, 2016 at 2:33 AM Richard L. Burton III <mrbur...@gmail.com>
wrote:

> I have a case in which I'm iterating over the results of a poll(..)
> in KafkaConsumer. While I'm iterating over the records, the thread is
> blocked. e.g., I'm writing to a BlockingQueue that's full.
>
> Will this have any impact on the KafkaConsumer?
>
> I have one more follow up question.
>
> Let's say I have 20 topics. I plan to spawn 20 KafkaConsumer's to process
> each topic because I'm looking to rate limit the speed at which the records
> are processed to avoid overloading external systems.
>
> The proper usage of KafkaConsumer in this case would be:
>
>
>- Create a thread with a new instance of KafkaConsumer used in it.
>- Ensure the group id for that consumer is unique
>- Ensure only one consumer for each topic is running to avoid
>re-balancing
>
>
>
> --
> -Richard L. Burton III
> @rburton
>
-- 

Jens Rantil
Backend Developer @ Tink

Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
For urgent matters you can reach me at +46-708-84 18 32.


Re: poll() semantics

2016-04-24 Thread Jens Rantil
Hi Richard,

> which defaults to a very large large number, will affect the number of
records returned by each call to poll()

No, it will affect the total sum of the message sizes fetched. This is not
the same as "number of messages". The upcoming release of 9.1 (not out yet)
will contain a setting that allows you to set a cap on the maximum number
of messages that poll() returns. See also
https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records
.

Cheers,
Jens

On Sat, Apr 23, 2016 at 2:20 AM Richard Rodseth <rrods...@gmail.com> wrote:

> To answer my own question (partially), I have learned that
>
> max.partition.fetch.bytes
>
> , which defaults to a very large large number, will affect the number of
> records returned by each call to poll()
>
> I also learned that seekToBeginning is a partition-level thing, but
>
>  props.put("auto.offset.reset","earliest")
> has the desired effect.
>
> On Fri, Apr 22, 2016 at 11:08 AM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
> > Do I understand correctly that poll() will return a subset of the
> messages
> > in a topic each time it is called? So if I want to replay all messages, I
> > would seek to the beginning and call poll in a loop? Not easily knowing
> > when I was done, without a high watermark
> >
> > https://issues.apache.org/jira/browse/KAFKA-2076
> >
> > This is a pretty basic question, but I don't think it is explained in the
> > JavaDoc
> >
> >
> >
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> >
> > Thanks
> >
>
-- 

Jens Rantil
Backend Developer @ Tink

Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
For urgent matters you can reach me at +46-708-84 18 32.


Re: Kafka topics with infinite retention?

2016-03-14 Thread Jens Rantil
This is definitely an interesting use case. However, you need to be aware
that changing the broker topology won't rebalance the preexisting data from
the previous brokers. That is, you risk loosing data.

Cheers,
Jens

On Wed, Mar 9, 2016 at 2:10 PM Daniel Schierbeck <da...@zendesk.com.invalid>
wrote:

> I'm considering an architecture where Kafka acts as the primary datastore,
> with infinite retention of messages. The messages in this case will be
> domain events that must not be lost. Different downstream consumers would
> ingest the events and build up various views on them, e.g. aggregated
> stats, indexes by various properties, full text search, etc.
>
> The important bit is that I'd like to avoid having a separate datastore for
> long-term archival of events, since:
>
> 1) I want to make it easy to spin up new materialized views based on past
> events, and only having to deal with Kafka is simpler.
> 2) Instead of having some sort of two-phased import process where I need to
> first import historical data and then do a switchover to the Kafka topics,
> I'd rather just start from offset 0 in the Kafka topics.
> 3) I'd like to be able to use standard tooling where possible, and most
> tools for ingesting events into e.g. Spark Streaming would be difficult to
> use unless all the data was in Kafka.
>
> I'd like to know if anyone here has tried this use case. Based on the
> presentations by Jay Kreps and Martin Kleppmann I would expect that someone
> had actually implemented some of the ideas they're been pushing. I'd also
> like to know what sort of problems Kafka would pose for long-term storage –
> would I need special storage nodes, or would replication be sufficient to
> ensure durability?
>
> Daniel Schierbeck
> Senior Staff Engineer, Zendesk
>
-- 

Jens Rantil
Backend Developer @ Tink

Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
For urgent matters you can reach me at +46-708-84 18 32.


Re: Kafka Applicability - Large Messages

2016-03-14 Thread Jens Rantil
Just making it more explicit: AFAIK, all Kafka consumers I've seen loads
the incoming messages into memory. Unless you make it possible to stream it
do disk or something you need to make sure your consumers has the available
memory.

Cheers,
Jens

On Fri, Mar 4, 2016 at 6:07 PM Cees de Groot  wrote:

> 1GB sounds like a tad steep, you may want to do some testing, as Kafka
> needs to be told that such large messages can arrive and broker will then
> pre-allocate buffers for that. Personally, I'd stop short of low megabytes,
> anything bigger can be dropped off in e.g. S3 and then you just queue a
> link for further processing.
>
> I'm not saying it's impossible, Kafka handles large messages better than
> most other tools out there, but you do want to do a test setup to make sure
> that it'll handle the sort of traffic you fling at it in any case.
>
> On Fri, Mar 4, 2016 at 4:26 AM, Mahesh Dharmasena 
> wrote:
>
> > We have a client with several thousand stores which send and receive
> > messages to main system that resides on the headquarters.
> >
> > A single Store sends and receive around 50 to 100 messages per day.
> >
> > Average Message size could be from 2KB to 1GB.
> >
> > Please let me know whether I can adapt Apache Kafka for the solution?
> >
> >
> > - Mahesh.
> >
>
>
>
> --
>
> *Cees de Groot*
> PRINCIPAL SOFTWARE ENGINEER
> [image: PagerDuty logo] 
> pagerduty.com
> c...@pagerduty.com 
> +1(416)435-4085
>
> [image: Twitter] [image: FaceBook]
> [image: Google+]
> [image: LinkedIn]
> [image: Blog]
> 
>
-- 
*Henrik Hedvall*
Lead Designer
henrik.hedv...@tink.se
+46 72 505 57 59

Tink AB
Wallingatan 5
111 60 Stockholm, Sweden
www.tink.se


Re: About the number of partitions

2016-03-02 Thread Jens Rantil
Hi Kim,

You are correct in that the number of partitions sets the upper limit on
consumer parallelization. That is, a single consumer in a group can consume
multiple partitions, however multiple consumers in a group can't consume a
single partition.

Also, since partitions are spread across your brokers, really it's the
ratio nPartitions/nBrokers that you want to optimize for.

Given the above parallelization limit, it would make sense to have a very
large ratio. This would have other implications:

   - Your brokers will have a lot of smaller files to they will have to
   flush periodically. This can incur a lot of overhead and introduce
   latencies. Especially on a spinning disk where seeks are expensive.
   - Brokers are generally set to rotate their logs at a certain size. It
   could be hard to tune rotation with many small files.

Given this, really you need to benchmark for your use case with your
message sizes etc.

Side-note: Note that for autoscaling you will have to overprovision your
partitions somewhat to not hit the parallelization limit.

Cheers,
Jens

On Wed, Mar 2, 2016 at 1:11 AM, BYEONG-GI KIM <bg...@bluedigm.com> wrote:

> Hello.
>
> I have questions about how many partitions are optimal while using kafka.
> As far as I know, even if there are multiple consumers that belong to a
> consumer group, say *group_A*, only one consumer can receive a kafka
> message produced by a producer if there is a partition. So, as a result,
> multiple partitions are required in order to distribute the message to all
> the consumers in group_A if I want the consumers to get the message.
>
> Is it right?
>
> I'm considering developing several kafka consumer applications, e.g.,
> message saver, message analyzer, etc., so a message from a producer must be
> consumed by those kinds of consumers.
>
> Any advice and help would be really appreciated.
>
> Thanks in advance!
>
> Best regards
>
> Kim
>



-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: Unable to start cluster after crash (0.8.2.2)

2016-02-29 Thread Jens Rantil
Double post. Please keep discussion in the other thread.

Cheers,
Jens

On Wed, Feb 24, 2016 at 4:39 PM, Anthony Sparks <anthony.spark...@gmail.com>
wrote:

> Hello,
>
> Our Kafka cluster (3 servers, each server has Zookeeper and Kafka installed
> and running) crashed, and actually out of the 6 processes only one
> Zookeeper instance remained alive.  The logs do not indicate much, the only
> errors shown were:
>
> 2016-02-21T12:21:36.881+: 27445381.013: [GC (Allocation Failure)
> 27445381.013: [ParNew: 136472K->159K(153344K), 0.0047077 secs]
> 139578K->3265K(507264K), 0.0048552 secs] [Times: user=0.01 sys=0.00,
> real=0.01 secs]
>
> These errors were both in the Zookeeper and the Kafka logs, and it appears
> they have been happening everyday (with no impact on Kafka, except for
> maybe now?).
>
> The crash is concerning, but not as concerning as what we are encountering
> right now.  I am unable to get the cluster back up.  Two of the three nodes
> halt with this fatal error:
>
> [2016-02-23 21:18:47,251] FATAL [ReplicaFetcherThread-0-0], Halting because
> log truncation is not allowed for topic audit_data, Current leader 0's
> latest offset 52844816 is less than replica 1's latest offset 52844835
> (kafka.server.ReplicaFetcherThread)
>
> The other node that manages to stay alive is unable to fulfill writes
> because we have min.ack set to 2 on the producers (requiring at least two
> nodes to be available).  We could change this, but that doesn't fix our
> overall problem.
>
> In browsing the Kafka code, in ReplicaFetcherThread.scala there is this
> little nugget:
>
> // Prior to truncating the follower's log, ensure that doing so is not
> disallowed by the configuration for unclean leader election.
> // This situation could only happen if the unclean election configuration
> for a topic changes while a replica is down. Otherwise,
> // we should never encounter this situation since a non-ISR leader cannot
> be elected if disallowed by the broker configuration.
> if (!LogConfig.fromProps(brokerConfig.toProps,
> AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
> topicAndPartition.topic)).uncleanLeaderElectionEnable) {
> // Log a fatal error and shutdown the broker to ensure that data loss
> does not unexpectedly occur.
> fatal("Halting because log truncation is not allowed for topic
> %s,".format(topicAndPartition.topic) +
>   " Current leader %d's latest offset %d is less than replica %d's
> latest offset %d"
>   .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId,
> replica.logEndOffset.messageOffset))
> Runtime.getRuntime.halt(1)
> }
>
> For each one of our Kafka instances we have them set at:
> *unclean.leader.election.enable=false
> *which hasn't changed at all since we deployed the cluster (verified by
> file modification stamps).  This to me would indicate the above comment
> assertion is incorrect; we have encountered a non-ISR leader elected even
> though it is configured not to do so.
>
> Any ideas on how to work around this?
>
> Thank you,
>
> Tony Sparks
>



-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: Is it possible to configure Kafka Mirror to specify fixed ports to connect to Remote DC

2016-02-29 Thread Jens Rantil
Hi Munir,

Are you referring to outbound or inbound ports from/to the Mirror tool?

Cheers,
Jens

On Wed, Feb 24, 2016 at 6:01 PM, Munir Khan (munkhan) <munk...@cisco.com>
wrote:

> Hi,
> I am trying out Kafka Mirror for moving kafka message between DC. In our
> case we have to make it work through firewalls and use  specific TCP ports
> permitted by ACL.  What I have seen so far each Mirror instance opens a
> number of tcp connections to the remote DC and the port numbers are not
> fixed. Is there a way to configure Kafka Mirror so that is always uses
> specific ports ?
>
> Best Regards
> Munir Khan
>
>


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: Fetching meta from Kafka continuously.

2016-02-29 Thread Jens Rantil
Hi,

Could it be that you need rebalance your topics, perhaps?

Cheers,
Jens

On Wed, Feb 24, 2016 at 7:25 PM, Kim Chew <kchew...@gmail.com> wrote:

> We have shut down some nodes from our cluster yesterday and now we are
> seeing tons of these in the log,
>
>
> 2016-02-24 18:11:23 INFO  SyncProducer:68 - Connected to
> ip-172-30-198-64.us-west-2.compute.internal:9092 for producing
> 2016-02-24 18:11:23 INFO  SyncProducer:68 - Disconnecting from
> ip-172-30-198-64.us-west-2.compute.internal:9092
> 2016-02-24 18:11:23 INFO  ConsumerFetcherManager:68 -
> [ConsumerFetcherManager-1456295103918] Added fetcher for partitions
> ArrayBuffer()
> 2016-02-24 18:11:23 INFO  VerifiableProperties:68 - Verifying properties
> 2016-02-24 18:11:23 INFO  VerifiableProperties:68 - Property client.id is
> overridden to utils_backup
> 2016-02-24 18:11:23 INFO  VerifiableProperties:68 - Property
> metadata.broker.list is overridden to
>
> ip-172-30-198-19.us-west-2.compute.internal:9092,ip-172-30-198-64.us-west-2.compute.internal:9092,ip-172-30-200-37.us-west-2.compute.internal:9092
> 2016-02-24 18:11:23 INFO  VerifiableProperties:68 - Property
> request.timeout.ms is overridden to 3
> 2016-02-24 18:11:23 INFO  ClientUtils$:68 - Fetching metadata from broker
> id:8,host:ip-172-30-198-64.us-west-2.compute.internal,port:9092 with
> correlation id 189686 for 2 topic(s) Set(container, profile-data-2)
>
> I am new to Kafka so although I know it has something to do with the
> brokers, I would like to know what has happened and what is the best way to
> fix it?
>
> TIA
>



-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: Kafka node liveness check

2016-02-29 Thread Jens Rantil
Hi,

I assume you first want to ask yourself what liveness you would like to
check for. I guess the most realistic check is to put a "ping" message on
the broken and make sure that you can consume it.

Cheers,
Jens

On Fri, Feb 26, 2016 at 12:38 PM, tao xiao <xiaotao...@gmail.com> wrote:

> Hi team,
>
> What is the best way to verify a specific Kafka node functions properly?
> Telnet the port is one of the approach but I don't think it tells me
> whether or not the broker can still receive/send traffics. I am thinking to
> ask for metadata from the broker using consumer.partitionsFor. If it can
> return partitioninfo it is considered live. Is this a good approach?
>



-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: LVM overhead

2016-02-11 Thread Jens Rantil
Hi,


I suggest you run a micro benchmark and test it for your usecase. Should be 
pretty straight forward.




Cheers,

Jens





–
Skickat från Mailbox

On Thu, Feb 11, 2016 at 4:24 PM, yazgoo  wrote:

> Hi everyone,
> I have multiple disks on my broker.
> Do you know if there's a noticeable overhead using LVM versus multiple
> log.dirs ?
> Thanks

Re: Log message key content into LOG4j

2016-02-10 Thread Jens Rantil
Hi Robert,

I'm not sure Kafka is a good usecase storing the logs for that long.
However, it is a good candidate for an intermediate storage before you
store your logs somewhere else (S3 or other blob storage).

/J

On Mon, Feb 8, 2016 at 11:53 AM, Robert Drygała <diddo...@gmail.com> wrote:

> Hi
>
> Is it possibile to log message header / key into server logs ( Log4j )  on
> message broker ?
> I would like to track message after a few years , so I need to prove that
> message exists on message broker in some particular time.
>
> my case :
>
> Publisher log information into business log (e.g. log4j) that message was
> send , Message broker log message into kafka log and message header / key
> into business log ( e.g. log4j ),
> Consumer log information into business log ( e.g. log4j ), that message was
> received.
>
> So  I'm not responsible for consumer/ producer side, I'm responsible for
> message broker and after 2 years I will have to prove that message exists
> on MessageBroker and I can prove that using e.g. logs from this time. ( log
> should look like this : Message ID (from message key ) , timestamp  )
>
>
> regards Robert
>



-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: Session timeout and heartbeat interval

2016-02-08 Thread Jens Rantil
Hi again,

A somewhat related question is also how the heartbeat interval and session
timeout relates to the poll timeout. Must the poll timeout always be lower
than the heartbeat interval?

Cheers,
Jens

On Monday, February 8, 2016, Jens Rantil <jens.ran...@tink.se> wrote:

> Hi,
>
> I am trying to wrap my head around the difference between "
> session.timeout.ms" and "heartbeat.interval.ms". Clearly they are
> somewhat tied to the same logic since heartbeat value should be less than
> 1/3 of the session timeout.
>
> Let me add some specific questions:
>
>- Will a session be considered "dead" if it hasn't received a
>heartbeat within "session.timeout.ms"? In that case giving 2*heartbeat
>as slack for missed heartbeats seem like a LOT of slack. Given the default
>values, a client will have 27 seconds(!) to be able to send a second
>heartbeat.
>- Why can't session timeout simply be based on heartbeat interval?
>
> Could anyone clarify this a bit? Also, if you are writing a new consumer,
> what is your reasoning when setting these two value?
>
> Thanks,
> Jens
>
>
> --
> Jens Rantil
> Backend engineer
> Tink AB
>
> Email: jens.ran...@tink.se
> <javascript:_e(%7B%7D,'cvml','jens.ran...@tink.se');>
> Phone: +46 708 84 18 32
> Web: www.tink.se
>
> Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> <http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
>  Twitter <https://twitter.com/tink>
>
>

-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Session timeout and heartbeat interval

2016-02-08 Thread Jens Rantil
Hi,

I am trying to wrap my head around the difference between "
session.timeout.ms" and "heartbeat.interval.ms". Clearly they are somewhat
tied to the same logic since heartbeat value should be less than 1/3 of the
session timeout.

Let me add some specific questions:

   - Will a session be considered "dead" if it hasn't received a heartbeat
   within "session.timeout.ms"? In that case giving 2*heartbeat as slack
   for missed heartbeats seem like a LOT of slack. Given the default values, a
   client will have 27 seconds(!) to be able to send a second heartbeat.
   - Why can't session timeout simply be based on heartbeat interval?

Could anyone clarify this a bit? Also, if you are writing a new consumer,
what is your reasoning when setting these two value?

Thanks,
Jens


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: Apache Kafka Case Studies

2016-02-03 Thread Jens Rantil
Hi Joe,

This might be interesting:
https://engineering.linkedin.com/kafka/running-kafka-scale

Cheers,
Jens

On Wed, Feb 3, 2016 at 4:15 PM, Joe San <codeintheo...@gmail.com> wrote:

> Dear Kafka users,
>
> I'm looking for some case studies around using Kafka on big projects.
> Specifically, I'm looking for some architectural insights into how I could
> orchestrate my data pipeline using Kafka on an enterprise system.
>
> Some pointers on some architectural best practices, slides on how some
> organisation X used Apache Kafka in their landscape would be ideal.
>
> Any suggestions?
>
> Thanks and Regards,
> Joe
>



-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Only interested in certain partitions

2016-01-27 Thread Jens Rantil
Hi,

Background: I am using Kafka 0.9 using the Java client. I have a consumer
with a fixed set of keys that it is interested in from a given topic.
Assuming I have many partitions, I could manually assign my consumer to
only listen to the relevant partitions, given my keys.

Question: Given a key and a topic, when using the new Java consumer, how
can I figure out which partition the key will be written to? If not
possible, I will file a JIRA.

Thanks,
Jens

-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: security: encryption at rest and key rotation idea

2016-01-24 Thread Jens Rantil
Josh,

Could you maybe post your final solution here in this thread? I'm curious
to hear about which way you finally went with this.

Thanks,
Jens

On Fri, Jan 22, 2016 at 12:47 AM, Josh Wo <z...@lendingclub.com> wrote:

> Thanks Jim. I like the fact that the offset management will not require us
> to customize kafka. I will think more on this. maybe a time based seek will
> just work...i think the math you proposed require partition setup should be
> exactly the same as the original and partitioner should map the message to
> the same partition id (hopeful it is always true, haven't verified).
>
> BTW, any concern with codec approach apart from customization/make codec
> pluggable?
>
> Thanks,
> Josh
> 
> From: Jim Hoagland <jim_hoagl...@symantec.com>
> Sent: Thursday, January 21, 2016 1:02 PM
> To: users@kafka.apache.org; Josh Wo
> Subject: Re: security: encryption at rest and key rotation idea
>
> For the offset, at the start of topic (and perhaps periodically in the
> topic), the script could make a note of the corresponding offset in the
> previous topic.  The consumer could then see the correspondence between
> the current topic offsets and the previous topic offsets and do some math
> to get to where they left off.  That's just the start of idea for a
> possible approach; it would have to be thought through more carefully.
> Not sure, but you may need to handle cases where messages get re-ordered.
>
> -- Jim
>
>
> On 1/20/16, 11:31 AM, "Josh Wo" <z...@lendingclub.com> wrote:
>
> >Jim,
> >So I guess the problem of copying to a different topic (or would rather
> >have a replicated cluster) is when existing consumer do the "switch" to
> >new topic, how is the offset to be set correctly so they don't replay the
> >whole thing again. While we can certain do idempotency with consumer,
> >they are not going to handle the volume from beginning on regular basis
> >(BTW, the key rotation will be on regular basis). Maybe implement custom
> >offset storage somewhere else?
> >
> >I think encryption at rest will be very interesting to most of enterprise
> >grade user moving to cloud and good to see interests here. It is
> >currently a blocker for us, while kafka 0.9 made progress with SASL at
> >communication front, I see less talk around encryption at rest and
> >support for live/in-place key rotation/re-encrypt. It does seem to me
> >easy to implement by just exposing pluggability of codec and JMX for
> >cleaner thread invocation and things will be taken care  of transparently?
> >
> >Any interesting from other users of this proposal?
> >
> >Thanks,
> >Josh
> >
> >
> >
> >
> >From: Jim Hoagland <jim_hoagl...@symantec.com>
> >Sent: Wednesday, January 20, 2016 11:00 AM
> >To: users@kafka.apache.org; Josh Wo
> >Subject: Re: security: encryption at rest and key rotation idea
> >
> >You could do this with (I expect) reasonable efficiency and with no
> >changes to Kafka code by using multiple topics.
> >
> >You can have a script that in a streaming manner reads out all messages in
> >a topic, decrypts them with the old key, encrypts them with the new key,
> >and adds them to a new topic.  At the time you need to re-encrypt all the
> >messages in a topic, invoke the script.  You would probably have one topic
> >per encryption key.  The producers would know the right topic to write to
> >since they know which version of the encryption key.  The consumers would
> >have to have some logic to switch over.  Perhaps the last message in a
> >particular topic could tell the consumers to switch to a different topic.
> >
> >If you invoke the script from someplace close to the Kafka brokers, the
> >overhead should I think not be too much higher than custom codec approach
> >you were talking about.  Just some local network traffic, with the amount
> >depending on the size of the topic and your replication factor.
> >
> >-- Jim
> >
> >On 1/20/16, 10:37 AM, "Josh Wo" <z...@lendingclub.com> wrote:
> >
> >>Hi Jens,
> >>I got your point but some of our use case cannot just rely on TTL. We try
> >>to have long expiry for message and rather compact them (dedup) so we can
> >>replay messages as system of records. When key is lost, we will invalid
> >>the old key so message encrypted by old message will not be able to
> >>decrypt if we don't re-encrypt them also.
> >>
> >>Josh
> >>

Dealing with diverse consumption speeds

2016-01-24 Thread Jens Rantil
Hi,

How are you dealing with a slow consumer in Kafka? In the best of world,
each consumer will have the exact same specs and the exact same workload.
But unfortunately that's rarely true: Virtual machines share hardware with
other VMs, some Kafka tasks takes longer to process, some partition keys
occasionally make the Kafka cluster unbalanced etc.

On a larger perspective, maybe it would be nice if a consumer group would
occasionally rebalance consumers based on lag.

Cheers,
Jens

-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: Cluster of two nodes?

2016-01-19 Thread Jens Rantil
Hi,

Regarding Zookeeper, it will not give you any higher availability running 2
instances instead of 1. You will need three Zookeeper instances to be able
to loose one. Two Zookeeper instances will simply make your Zookeeper
ensemble run slower without any benefit.

Running two nodes/brokers of Kafka should be fine. Running the on the same
instances as Zookeeper nodes should also be fine.

Cheers,
Jens

On Tue, Jan 19, 2016 at 5:06 PM, Tech Bolek <techy_bo...@yahoo.com.invalid>
wrote:

> Hello, is there any value in running zookeeper and kafka in a cluster of
> two nodes? I.e. one instance of zookeeper and kafka on each node?




-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: Apache Kafka Topic Design

2016-01-19 Thread Jens Rantil
Hi again Joe,

I would suggest keeping things simple and using a single topic and a single
partition with replication in the beginning if possible. You can split your
topic up into multiple topics later if needed. You can split up your topic
into more partitions later if needed.

If you need strict ordering of the events I suggest you use a partition key
from the start in your producers.

Cheers,
Jens

On Tue, Jan 19, 2016 at 6:43 PM, Joe San <codeintheo...@gmail.com> wrote:

> Hi Jens,
>
> Thanks for your suggestion. So here is a bit more text:
>
> I have a List of signals for a specific device:
>
> Map[DeviceId, List[Signal]], where Signal is case class Signal(name:
> String, value: String)
>
> Now I will write a producer to connect to the device and read these signals
> at constant intervals. I want to know how I should create my topic assuming
> that I have say 10 devices to start with, later scaling it up to 10
> such devices.
>
> How should I model my topic? Should I create one topic per device?
>
> Thanks and Regards,
> Joe
>
> On Tue, Jan 19, 2016 at 4:58 PM, Jens Rantil <jens.ran...@tink.se> wrote:
>
> > Hi Joe,
> >
> > I think you are leaving out all your requirements to be able to get
> decent
> > answer from anyone here.
> >
> > On Tue, Jan 19, 2016 at 8:47 AM, Joe San <codeintheo...@gmail.com>
> wrote:
> >
> > > I soon realized that this is a doomed approach.
> > >
> >
> > Could you elaborate? Why would it be doomed?
> >
> >
> > > Is there a better way to model my topic so that I could
> > > guarantee replication and high throughput and scale if I need to add
> > 10,000
> > > of those devices? Any suggestions?
> > >
> >
> > What does "high throughput and scale" mean? What is "high throughput" for
> > you? What is "scale" for you?
> >
> > Let's start with a simple proposal: How about having a single topic with
> a
> > single partition and you replicate it to two other brokers? You can
> > increase the number of partitions in the future and you have your
> > "guaranteed" (whatever that means) replication.
> >
> > Cheers,
> > Jens
> >
> > --
> > Jens Rantil
> > Backend engineer
> > Tink AB
> >
> > Email: jens.ran...@tink.se
> > Phone: +46 708 84 18 32
> > Web: www.tink.se
> >
> > Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> > <
> >
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> > >
> >  Twitter <https://twitter.com/tink>
> >
>



-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: security: encryption at rest and key rotation idea

2016-01-19 Thread Jens Rantil
Hi Josh,


Kafka will/can expire message logs after a certain TTL. You can't simply rely 
on expiration for key rotation? That is, you start to produce messages with a 
different key while your consumer temporarily handles the overlap of keys for 
the duration of the TTL.




Just an idea,

Jens





–
Skickat från Mailbox

On Wed, Jan 20, 2016 at 12:34 AM, Josh Wo  wrote:

> We are trying to deploy kafka into EC2 and one of the requirement from 
> infosec is to have kafka encryption at rest (stored with encrypted value). We 
> also need to be able to rotate encryption keys and re-encrypt all the 
> messages on regular basis since we are a financial company. The re-encryption 
> feels challenging since kafka messages are immutable from client side 
> (producer and consumer). Some ideas floating around to have replicated 
> clustered but then it will mess up all the offsets of the consumer and 
> switching is complicated from operational perspective.
> One idea we have is to achieve this is to plugin our own "compression" codec 
> which deal with both compression and encryption logic and leverage compaction 
> cycle to re-write all the messages by calling decompress and compress into a 
> new file. It feels this approach can also have zero impact to the 
> consumer/producer if they are using the same "codec" for compression since 
> the offsets will be intact.
> My current understanding is the codecs are hardcoded right now (we are using 
> .9) so it will require us to customize kafka. Also compaction cannot be 
> triggered on-demand, which is needed in case of the key loss. So before we 
> take on customization of kafka, I am just wondering if our thinking is on the 
> right track.
> I hope some of the committers from Confluent/Hornton/Cloudera can comment on 
> that and the road map to support encryption at rest and key rotation, or 
> otherwise alternative to what is proposed. Also please let me know if my 
> question/problem is not clear.
> Thanks,
> Josh
> 
> DISCLAIMER: The information transmitted is intended only for the person or 
> entity to which it is addressed and may contain confidential and/or 
> privileged material. Any review, re-transmission, dissemination or other use 
> of, or taking of any action in reliance upon this information by persons or 
> entities other than the intended recipient is prohibited. If you received 
> this in error, please contact the sender and destroy any copies of this 
> document and any attachments.

Re: Partitions and consumer assignment

2016-01-16 Thread Jens Rantil
Hi,


You are correct. The others will remain idle. This is why you generally want to 
have at least the same number of partitions as consumers.




Cheers,

Jens





–
Skickat från Mailbox

On Sat, Jan 16, 2016 at 2:34 AM, Jason J. W. Williams
 wrote:

> Hi,
> I'm trying to make sure I understand this statement in the docs:
> "Each broker partition is consumed by a single consumer within a given
> consumer group. The consumer must establish its ownership of a given
> partition before any consumption can begin."
> If I have:
> * a topic with 1 partition
> * subscribe a consumer group to the topic
> * the consumer group has 10 consumers belonging to it
> Will only 1 consumer of the 10 ever receive messages from the topic, and
> the other 9 remain idle? Or does this mean only 1 consumer at a time from
> the group will be consuming...in a round-robin fashion?
> -J

Re: How to chose the size of a Kafka broker

2016-01-14 Thread Jens Rantil
Hi Vladoiu,

I am by no means a Kafka expert, but what are you optimizing for?

   - Cost could be a variable.
   - Time to bring on a new broker could be another variable. For large
   machines that could take longer since they need to stream more data.

Cheers,
Jens

On Wed, Jan 13, 2016 at 1:09 PM, Vladoiu Catalin <vladoiu.cata...@gmail.com>
wrote:

> Hi guys,
>
> I've run into a long conversation with my colleagues when we discussed the
> size of the Brokers for our new Kafka cluster and we still haven't reached
> a final conclusion.
>
> Our main concern is the size of the requests 10-20MB per request (producer
> will send big requests), maybe more and we estimate that we will have 4-5TB
> per day.
>
> Our debate is between:
> 1. Having a smaller cluster(not so many brokers) but big config, something
> like this:
> Disk: 11 x 4TB, CPU: 48 Core, RAM: 252 GB. We chose this configuration
> because our Hadoop cluster has that config and can easily handle that
> amount of data.
> 2. Having a bigger number of brokers but smaller broker config.
>
> I was hopping that somebody with more experience in using Kafka can advice
> on this.
>
> Thanks,
> Catalin
>



-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: how to reset kafka offset in zookeeper

2015-12-18 Thread Jens Rantil
Hi,

I noticed that a consumer in the new consumer API supports setting the
offset for a partition to beginning. I assume doing so also would update
the offset in Zookeeper eventually.

Cheers,
Jens

On Friday, December 18, 2015, Akhilesh Pathodia <pathodia.akhil...@gmail.com>
wrote:

> Hi,
>
> I want to reset the kafka offset in zookeeper so that the consumer will
> start reading messages from first offset. I am using flume as a consumer to
> kafka. I have set the kafka property kafka.auto.offset.reset to "smallest",
> but it does not reset the offset in zookeeper and that's why flume will not
> read messages from first offset.
>
> Is there any way to reset kafka offset in zookeeper?
>
> Thanks,
> Akhilesh
>


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: Better strategy for sending a message to multiple topics

2015-12-18 Thread Jens Rantil
Hi,


Why don't your consumers instead subscribe to a single topic used to broadcast 
to all of them? That way your consumers and producer will be much simpler.




Cheers,

Jens





–
Skickat från Mailbox

On Fri, Dec 18, 2015 at 4:16 PM, Abel .  wrote:

> Hi,
> I have this scenario where I need to send a message to multiple topics. I
> create a single KafkaProducer, prepare the payload and then I call the send
> method of the producer for each topic with the correspoding ProducerRecord
> for the topic and the fixed message. However, I have noticed that this
> procedure takes some time depending on the number of topics. For instance,
> to send a message to 30 topics it takes more than 3s because each request
> takes about 100ms to return from the send method. Is there a better way to
> accomplish this same task? Any recommendation?
> Regards,
> Abel.

Re: Kafka User Group meeting Link

2015-12-17 Thread Jens Rantil
Hi,


In which part of the world?




Cheers,

Jens





–
Skickat från Mailbox

On Thu, Dec 17, 2015 at 8:23 AM, prabhu v 
wrote:

> Hi,
> Can anyone provide me the link for the KAFKA USER Group meetings which
> happened on Jun. 14, 2012 and June 3, 2014??
> Link provided in the below wiki page is not a valid one..
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+papers+and+presentations
> -- 
> Regards,
> Prabhu.V
> -- 
> Regards,
> Prabhu.V

Low-latency, high message size variance

2015-12-16 Thread Jens Rantil
;
> > }
> >
> > This seems closer to the spirit of the poll loop, and it makes handling
> > commits a lot easier. You still have to deal with the rebalance problem,
> > but at least you don't have to deal with the queue. It's still a little
> > complex though. Maybe the consumer needs a ping() API which does the same
> > thing as poll() but doesn't send or return any fetches. That would
> simplify
> > things a little more:
> >
> > while (running) {
> >   ConsumerRecords<K, V> records = consumer.poll(1000);
> >   Future future = executor.submit(new Processor(records));
> >   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
> > consumer.ping();
> >   consumer.commitSync();
> > }
> >
> > Anyway, I'll think about it a little more and see if any other approaches
> > come to mind. I do agree that we should have a way to handle this case
> > without too much extra work.
> >
> >
> > -Jason
> >
> >
> > On Tue, Dec 15, 2015 at 5:09 AM, Jens Rantil <jens.ran...@tink.se>
> wrote:
> >
> >> Hi Jason,
> >>
> >> Thanks for your response. See replies inline:
> >>
> >> On Tuesday, December 15, 2015, Jason Gustafson <ja...@confluent.io>
> >> wrote:
> >>
> >> > Hey Jens,
> >> >
> >> > I'm not sure I understand why increasing the session timeout is not an
> >> > option. Is the issue that there's too much uncertainly about
> processing
> >> > time to set an upper bound for each round of the poll loop?
> >> >
> >>
> >> Yes, that's the issue.
> >>
> >>
> >> > One general workaround would be to move the processing into another
> >> thread.
> >> > For example, you could add messages to a blocking queue and have the
> >> > processor poll them. We have a pause() API which can be used to
> prevent
> >> > additional fetches when the queue is full. This would allow you to
> >> continue
> >> > the poll loop instead of blocking in offer(). The tricky thing would
> be
> >> > handling rebalances since you'd need to clear the queue and get the
> last
> >> > offset from the processors. Enough people probably will end up doing
> >> > something like this that we should probably work through an example on
> >> how
> >> > to do it properly.
> >> >
> >>
> >> Hm, as far as I've understood the consumer will only send heartbeats to
> >> the
> >> broker when poll() is being called. If I would call pause() on a
> consumer
> >> (from a separate thread) I understand poll() will block undefinitely.
> Will
> >> the polling consumer still send heartbeats when blocked? Or would a
> pause
> >> for too long (while my records are being processed) eventually lead to
> >> session timeout? If the latter, that would sort of defeat the purpose
> >> since
> >> I am trying to avoid unnecessary rebalancing of consumers when there is
> >> high pressure on the consumers.
> >>
> >> Regarding handling of rebalancing for a queue solution you describe; It
> >> really sounds very complicated. It's probably doable, but doesn't this
> >> sort
> >> of defeat the purpose of the high level consumer API? I mean, it sounds
> >> like it should gracefully handle slow consumption of varying size. I
> might
> >> be wrong.
> >>
> >> Thanks,
> >> Jens
> >>
> >>
> >> --
> >> Jens Rantil
> >> Backend engineer
> >> Tink AB
> >>
> >> Email: jens.ran...@tink.se
> >> Phone: +46 708 84 18 32
> >> Web: www.tink.se
> >>
> >> Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> >> <
> >>
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> >> >
> >>  Twitter <https://twitter.com/tink>
> >>
> >
> >
>


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: Low-latency, high message size variance

2015-12-16 Thread Jens Rantil
Hi again Jason,

Sorry for a bit of a late response - I'm travelling and check my e-mail
spuriously.

I have a specific question regarding they pause solution quoted below:

On Tuesday, December 15, 2015, Jason Gustafson <ja...@confluent.io> wrote:
>
> while (running) {
>   ConsumerRecords<K, V> records = consumer.poll(1000);
>   if (queue.offer(records))
> continue;
>
>   TopicPartition[] assignment = toArray(consumer.assignment());
>   consumer.pause(assignment);
>   while (!queue.offer(records, heartbeatIntervalMs, TimeUnit.MILLISECONDS))
> consumer.poll(0);
>   consumer.resume(assignment);
> }
>

As far as I've understood, the `KafkaConsumer` has a background thread that
fetches records, right? If so, isn't there a race condition between the
`consumer.poll(1000);` call and `consumer.pause(assignment);` where the
fetcher might fetch, and commit, messages that I then collect on my first
`consumer.poll(0);` call? Since `consumer.poll(0);` then would return a
non-empty list, I would essentially ignoring messages? Or is the pause()
call both 1) making sure consumer#poll never returns anything _and_ 2)
pauses the background fetcher?

Cheers,
Jens


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: Low-latency, high message size variance

2015-12-15 Thread Jens Rantil
Hi Jason,

Thanks for your response. See replies inline:

On Tuesday, December 15, 2015, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Jens,
>
> I'm not sure I understand why increasing the session timeout is not an
> option. Is the issue that there's too much uncertainly about processing
> time to set an upper bound for each round of the poll loop?
>

Yes, that's the issue.


> One general workaround would be to move the processing into another thread.
> For example, you could add messages to a blocking queue and have the
> processor poll them. We have a pause() API which can be used to prevent
> additional fetches when the queue is full. This would allow you to continue
> the poll loop instead of blocking in offer(). The tricky thing would be
> handling rebalances since you'd need to clear the queue and get the last
> offset from the processors. Enough people probably will end up doing
> something like this that we should probably work through an example on how
> to do it properly.
>

Hm, as far as I've understood the consumer will only send heartbeats to the
broker when poll() is being called. If I would call pause() on a consumer
(from a separate thread) I understand poll() will block undefinitely. Will
the polling consumer still send heartbeats when blocked? Or would a pause
for too long (while my records are being processed) eventually lead to
session timeout? If the latter, that would sort of defeat the purpose since
I am trying to avoid unnecessary rebalancing of consumers when there is
high pressure on the consumers.

Regarding handling of rebalancing for a queue solution you describe; It
really sounds very complicated. It's probably doable, but doesn't this sort
of defeat the purpose of the high level consumer API? I mean, it sounds
like it should gracefully handle slow consumption of varying size. I might
be wrong.

Thanks,
Jens


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>


Re: Low-latency, high message size variance

2015-12-13 Thread Jens Rantil
Hi again,


For the record I filed an issue about this here: 
https://issues.apache.org/jira/browse/KAFKA-2986




Cheers,

Jens





–
Skickat från Mailbox

On Fri, Dec 11, 2015 at 7:56 PM, Jens Rantil <jens.ran...@tink.se> wrote:

> Hi,
> We've been experimenting a little with running Kafka internally for better
> handling temporary throughput peaks of asynchronous tasks. However, we've
> had a big issues making Kafka work for us and I am starting to question
> whether its a good fit.
> Our usecase:
>- High latency. At peaks, each consumer requires ~20 seconds to handle a
>single message/task.
>- Extreme variation in message size: Serialized tasks are in the range
>of ~300 bytes up to ~3 MB.
>- Generally, it is processed in 20 seconds independent of message size.
>Most messages are small.
> Our setup:
>- Kafka 0.9.0.
>- Using the new Java consumer API (consumer groups etc.).
>- To occasionally handle large (3 MB) messages we've had to set the
>following configuration parameters:
>   - max.partition.fetch.bytes=10485760=10MB on consumer to handle
>   larger messages.
>   - session.timeout.ms=30s to handle our high latency processing.
>   - replica.fetch.max.bytes=10485760=10MB on broker.
>   - message.max.bytes=10485760=10MB on broker.
> Sample code:
> while (isRunning()) {
>   ConsumerRecords<String, byte[]> records = consumer.poll(100);
>   for (final ConsumerRecord<String, byte[]> record : records) {
> // Handle record...
>   }
> }
> AFAIK this seem like a very basic consumer code.
> Initial problem: When doing load testing to simulate peaks our consumer
> started spinning infinitely in similar fashion to [1]. We also noticed that
> we consistently were seeing [2] in our broker log.
> [1] http://bit.ly/1Q7zxgh
> [2] https://gist.github.com/JensRantil/2d1e7db3bd919eb35f9b
> Root cause analysis: AFAIK, health checks are only submitted to Kafka when
> calling `consumer.poll(...)`. To handle larger messages, we needed to
> increase max.partition.fetch.bytes. However, due to our high latency
> consumer a large amounts of small messages could be prefetched which made
> our inner for loop run long enough for the broker to consider our consumer
> dead.
> Two questions:
>- Is there any workaround to avoid the broker thinking our consumer is
>dead? Increasing session timeout to handle the polling interval for small
>messages is not an option since we simply prefetch too many messages for
>that to be an option. Can we set a limit on how many messages Kafka
>prefetches? Or is there a way to send health checks to broker out of bands
>without invoking the `KafkaConsumer#poll` method?
>- Is Kafka a bad tool for our usecase?
> Thanks and have a nice weekend,
> Jens
> -- 
> Jens Rantil
> Backend engineer
> Tink AB
> Email: jens.ran...@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
> Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> <http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
>  Twitter <https://twitter.com/tink>

Low-latency, high message size variance

2015-12-11 Thread Jens Rantil
Hi,

We've been experimenting a little with running Kafka internally for better
handling temporary throughput peaks of asynchronous tasks. However, we've
had a big issues making Kafka work for us and I am starting to question
whether its a good fit.

Our usecase:

   - High latency. At peaks, each consumer requires ~20 seconds to handle a
   single message/task.
   - Extreme variation in message size: Serialized tasks are in the range
   of ~300 bytes up to ~3 MB.
   - Generally, it is processed in 20 seconds independent of message size.
   Most messages are small.

Our setup:

   - Kafka 0.9.0.
   - Using the new Java consumer API (consumer groups etc.).
   - To occasionally handle large (3 MB) messages we've had to set the
   following configuration parameters:
  - max.partition.fetch.bytes=10485760=10MB on consumer to handle
  larger messages.
  - session.timeout.ms=30s to handle our high latency processing.
  - replica.fetch.max.bytes=10485760=10MB on broker.
  - message.max.bytes=10485760=10MB on broker.

Sample code:

while (isRunning()) {
  ConsumerRecords<String, byte[]> records = consumer.poll(100);
  for (final ConsumerRecord<String, byte[]> record : records) {
// Handle record...
  }
}

AFAIK this seem like a very basic consumer code.

Initial problem: When doing load testing to simulate peaks our consumer
started spinning infinitely in similar fashion to [1]. We also noticed that
we consistently were seeing [2] in our broker log.

[1] http://bit.ly/1Q7zxgh
[2] https://gist.github.com/JensRantil/2d1e7db3bd919eb35f9b

Root cause analysis: AFAIK, health checks are only submitted to Kafka when
calling `consumer.poll(...)`. To handle larger messages, we needed to
increase max.partition.fetch.bytes. However, due to our high latency
consumer a large amounts of small messages could be prefetched which made
our inner for loop run long enough for the broker to consider our consumer
dead.

Two questions:

   - Is there any workaround to avoid the broker thinking our consumer is
   dead? Increasing session timeout to handle the polling interval for small
   messages is not an option since we simply prefetch too many messages for
   that to be an option. Can we set a limit on how many messages Kafka
   prefetches? Or is there a way to send health checks to broker out of bands
   without invoking the `KafkaConsumer#poll` method?
   - Is Kafka a bad tool for our usecase?

Thanks and have a nice weekend,
Jens

-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>