Re: What is the procedure of upgrading producers from 0.8 to 0.10

2016-09-26 Thread Alexis Midon
the official recommendations are here
http://kafka.apache.org/documentation.html#upgrade_10


On Fri, Sep 23, 2016 at 7:48 PM Vadim Keylis  wrote:

> Hello we have a producer that is written in c language to send data to
> kafka using 0.8 protocol. We now need to upgrade since protocol has
> changed. We will upgrade broker first to 0.10 version and set
> log.message.format.version=0.8.1.1.
>
> What is the right approach of upgrading producer to avoid downtime?
>
> Do we upgrade consumers before upgrading producers?
>
> Can producer send data using 0.10 format when log.message.format.version is
> set to 0.8.1.1 ? Does anyone know what is the impact?
>
>
> Thanks so much in advance for the help!
>


Re: [DISCUSS] Java 8 as a minimum requirement

2016-08-16 Thread Alexis Midon
java7 is end of life. http://www.oracle.com/technetwork/java/eol-135779.html
+1



On Tue, Aug 16, 2016 at 6:43 AM Ismael Juma  wrote:

> Hey Harsha,
>
> I noticed that you proposed that Storm should drop support for Java 7 in
> master:
>
> http://markmail.org/message/25do6wd3a6g7cwpe
>
> It's useful to know what other Apache projects are doing in this regard, so
> I'm interested in the timeline being proposed for Storm's transition. I
> could not find it in the thread above, so I'd appreciate it if you could
> clarify it for us (and sorry if I missed it).
>
> Thanks,
> Ismael
>
> On Mon, Jun 20, 2016 at 5:05 AM, Harsha  wrote:
>
> > Hi Ismael,
> >   Agree on timing is more important. If we give enough heads
> >   up to the users who are on Java 7 thats great but still
> >   shipping this in 0.10.x line is won't be good as it still
> >   perceived as maint release even the release might contain
> >   lot of features .  If we can make this as part of 0.11 and
> >   cutting 0.10.1 features moving to 0.11 and giving rough
> >   timeline when that would be released would be ideal.
> >
> > Thanks,
> > Harsha
> >
> > On Fri, Jun 17, 2016, at 11:13 AM, Ismael Juma wrote:
> > > Hi Harsha,
> > >
> > > Comments below.
> > >
> > > On Fri, Jun 17, 2016 at 7:48 PM, Harsha  wrote:
> > >
> > > > Hi Ismael,
> > > > "Are you saying that you are aware of many Kafka users still
> > > > using Java 7
> > > > > who would be ready to upgrade to the next Kafka feature release
> > (whatever
> > > > > that version number is) before they can upgrade to Java 8?"
> > > > I know there quite few users who are still on java 7
> > >
> > >
> > > This is good to know.
> > >
> > >
> > > > and regarding the
> > > > upgrade we can't say Yes or no.  Its upto the user discretion when
> they
> > > > choose to upgrade and ofcourse if there are any critical fixes that
> > > > might go into the release.  We shouldn't be restricting their upgrade
> > > > path just because we removed Java 7 support.
> > > >
> > >
> > > My point is that both paths have their pros and cons and we need to
> weigh
> > > them up. If some users are slow to upgrade the Java version (Java 7 has
> > > been EOL'd for over a year), there's a good chance that they are slow
> to
> > > upgrade Kafka too. And if that is the case (and it may not be), then
> > > holding up improvements for the ones who actually do upgrade may be the
> > > wrong call. To be clear, I am still in listening mode and I haven't
> made
> > > up
> > > my mind on the subject.
> > >
> > > Once we released 0.9.0 there aren't any 0.8.x releases. i.e we don't
> > > > have LTS type release where we continually ship critical fixes over
> > > > 0.8.x minor releases. So if a user notices a critical fix the only
> > > > option today is to upgrade to next version where that fix is shipped.
> > > >
> > >
> > > We haven't done a great job at this in the past, but there is no
> decision
> > > that once a new major release is out, we don't do patch releases for
> the
> > > previous major release. In fact, we have been collecting critical fixes
> > > in
> > > the 0.9.0 branch for a potential 0.9.0.2.
> > >
> > > I understand there is no decision made yet but given the premise was to
> > > > ship this in 0.10.x  , possibly 0.10.1 which I don't agree with. In
> > > > general against shipping this in 0.10.x version. Removing Java 7
> > support
> > > > when the release is minor in general not a good idea to users.
> > > >
> > >
> > > Sorry if I didn't communicate this properly. I simply meant the next
> > > feature release. I used 0.10.1.0 as an example, but it could also be
> > > 0.11.0.0 if that turns out to be the next release. A discussion on that
> > > will probably take place once the scope is clear. Personally, I think
> the
> > > timing is more important the the version number, but it seems like some
> > > people disagree.
> > >
> > > Ismael
> >
>


Re: Kafka DNS Caching in AWS

2016-08-03 Thread Alexis Midon
Hi Gwen,

I have explored and tested this approach in the past. It does not work for
2 reasons:
 A. the first one relates to the ZKClient implementation,
 B. the second is the JVM behavior.


A. The ZKConnection [1] managed by ZKClient uses a legacy constructor of
org,apache.Zookeeper [2]. The created Zookeeper instance relies on a
StaticHostProvider [3].
This host provider implementation will resolve the DNS on instantiation. So
as soon as the Kafka broker creates its ZKClient instance, all server
addresses are resolved and the corresponding InetSocketAddress instances
will store the IP for their lifetime.  :(

I believe the right thing to do would be for ZKClient to use a custom
HostProvider implementation that create a new InetSocketAddress instance on
each invocation of `HostProvider#next()` [4] (therefore resolving the
address).

You would think this is enough, but no, because the JVM itself caches DNS.


B. When an InetSocketAddress instance resolves a DNS name, the JVM will
cache the value! So even if a dynamic HostProvider implementation is used,
the JVM might return a cached value.
And the default TTL is implementation specific. If I remember correctly the
Oracle JVM caches them for ever. [5]

So you must also configure the Kafka JVM correctly.

hope it helps,

Alexis

[1]
https://github.com/sgroschupf/zkclient/blob/ec77080a5d7a5d920fa0e8ea5bd5119fb02a06f1/src/main/java/org/I0Itec/zkclient/ZkConnection.java#L69

[2]
https://github.com/apache/zookeeper/blob/a0fcb8ff6c2eece8804ca6c009c175cf8a86335d/src/java/main/org/apache/zookeeper/ZooKeeper.java#L1210

[3]
https://github.com/apache/zookeeper/blob/a0fcb8ff6c2eece8804ca6c009c175cf8a86335d/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java

[4]
https://github.com/apache/zookeeper/blob/a0fcb8ff6c2eece8804ca6c009c175cf8a86335d/src/java/main/org/apache/zookeeper/ClientCnxn.java#L1071

[5]
http://docs.oracle.com/javase/8/docs/technotes/guides/net/properties.html
*`networkaddress.cache.ttl`*Specified in java.security to indicate the
caching policy for successful name lookups from the name service.. The
value is specified as integer to indicate the number of seconds to cache
the successful lookup.

A value of -1 indicates "cache forever". The default behavior is to cache
forever when a security manager is installed, and to cache for an
implementation specific period of time, when a security manager is not
installed.
See also `networkaddress.cache.negative.ttl`.




On Wed, Aug 3, 2016 at 9:45 AM Gwen Shapira  wrote:

> Can you define a DNS name that round-robins to multiple IP addresses?
> This way ZKClient will cache the name and you can rotate IPs behind
> the scenes with no issues?
>
>
>
> On Wed, Aug 3, 2016 at 7:22 AM, Zuber  wrote:
> > Hello –
> >
> > We are planning to use Kafka as Event Store in a system which is being
> built using event sourcing design approach.
> > Here is how we deployed the cluster in AWS to verify HA in the cloud (in
> our POC we only had 1 topic with 1 partition and 3 replication factor) -
> > 1)3 ZK servers running in different AZs (managed by Auto Scaling
> Group)
> > 2)3 Kafka brokers EC2 running in different AZs (managed by Auto
> Scaling Group)
> > 3)Kafka logs are stored in EBS volumes
> > 4)A type addresses are defined for all ZK servers & Kafka brokers in
> Route53
> > EC2 instance registers its IP for corresponding A type address (in
> Route53) on startup
> >
> > But due a bug in ZKClient used by Kafka broker which caches ZK IP
> forever, I don’t see any other option other than bouncing all brokers.
> >
> > One of the Netflix presentation (following links) mentions about the
> issue as well as couple of ZK JIRA defects but I haven’t found any concrete
> solution yet.
> > I would really appreciate any help in this regard.
> >
> >
> http://image.slidesharecdn.com/netflix-kafka-150325105558-conversion-gate01/95/netflix-data-pipeline-with-kafka-36-638.jpg?cb=1427281139
> >
> http://image.slidesharecdn.com/netflix-kafka-150325105558-conversion-gate01/95/netflix-data-pipeline-with-kafka-36-638.jpg?cb=1427281139
> > https://issues.apache.org/jira/browse/ZOOKEEPER-338
> > https://issues.apache.org/jira/browse/ZOOKEEPER-1506
> >
> http://grokbase.com/t/kafka/users/131x67h1bt/zookeeper-caching-dns-entries
> >
> > Thanks,
> > Zuber
> >
>


Re: rate-limiting on rebalancing, or sync from non-leaders?

2016-07-04 Thread Alexis Midon
Same here at Airbnb. Moving data is the biggest operational challenge
because of the network bandwidth cannibalization.
I was hoping that rate limiting would apply to replica fetchers too.

On Sun, Jul 3, 2016 at 15:38 Tom Crayford  wrote:

> Hi Charity,
>
> I'm not sure about the roadmap. The way we (and linkedin/dropbox/netflix)
> handle rebalances right now is to do a small handful of partitions at a
> time (LinkedIn does 10 partitions at a time the last I heard), not a big
> bang rebalance of all the partitions in the cluster. That's not perfect and
> not great throttling, and I agree that it's something Kafka desperately
> needs to work on.
>
> Thanks
>
> Tom Crayford
> Heroku Kafka
>
> On Sun, Jul 3, 2016 at 2:00 AM, Charity Majors  wrote:
>
> > Hi there,
> >
> > I'm curious if there's anything on the Kafka roadmap for adding
> > rate-limiting or max-throughput for rebalancing processes.
> >
> > Alternately, if you have RF>2, maybe a setting to instruct followers to
> > sync from other followers?
> >
> > I'm super impressed with how fast and efficient the kafka data
> rebalancing
> > process is, but also fear for the future when it's battling for resources
> > against high production trafffic.  :)
> >
>


Re: Minor GC time becomes longer and longer

2016-04-11 Thread Alexis Midon
Any experience with G1 for Kafka? I didn't get a chance to try it out.


On Mon, Apr 11, 2016 at 3:31 AM Jakub Neubauer 
wrote:

> Hi,
> Did you consider G1 collector?
> http://docs.oracle.com/javase/7/docs/technotes/guides/vm/G1.html
> Jakub N.
>
> On 11.4.2016 12:08, jinhong lu wrote:
> > Minor GC time in my cluster becomes longer and longer, and broker loss
> session with zk:
> >
> >   2016-04-11T17:33:06.559+0800: 875.917:
> [GC2016-04-11T17:33:06.559+0800: 875.917: [ParNew:
> 3363139K->10287K(3774912K), 0.0858010 secs] 3875141K->522289K(31037888K),
> 0.0860890 secs] [Times: user=1.46 sys=0.02, real=0.09 secs]
> >   2016-04-11T17:38:50.477+0800: 1219.834:
> [GC2016-04-11T17:38:50.477+0800: 1219.835: [ParNew:
> 3365807K->11295K(3774912K), 0.0858600 secs] 3877809K->523297K(31037888K),
> 0.0861390 secs] [Times: user=1.50 sys=0.00, real=0.08 secs]
> >   2016-04-11T17:46:58.550+0800: 1707.908:
> [GC2016-04-11T17:46:58.550+0800: 1707.908: [ParNew:
> 3366815K->8463K(3774912K), 15.8853640 secs] 3878817K->526017K(31037888K),
> 15.8855810 secs] [Times: user=286.22 sys=0.00, real=15.89 secs]
> >   2016-04-11T17:54:37.601+0800: 2166.959:
> [GC2016-04-11T17:54:37.601+0800: 2166.959: [ParNew:
> 3363983K->5961K(3774912K), 22.5800540 secs] 3881537K->523515K(31037888K),
> 22.5803030 secs] [Times: user=406.07 sys=0.00, real=22.58 secs]
> >
> > Here is my jvm setting:
> >
> >   28808 kafka.Kafka -Xmx30G -Xms30G -Xmn4G -XX:+UseCompressedOops
> -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled
> -XX:+UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction=0
> -XX:+CMSParallelRemarkEnabled -XX:+DisableExplicitGC
> -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=75
> -Xnoclassgc -XX:+UseParNewGC -XX:+UseConcMarkSweepGC
> -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark
> -XX:+DisableExplicitGC -Djava.awt.headless=true
> -Xloggc:/home/hadoop/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps
> -Dcom.sun.management.jmxremote
> -Dcom.sun.management.jmxremote.authenticate=false
> -Dcom.sun.management.jmxremote.ssl=false
> -Dkafka.logs.dir=/home/hadoop/kafka/bin/../logs
> -Dlog4j.configuration=file:bin/../config/log4j.properties
> >
> > Any suggestion to tune the jvm ?
> >
> >
> > Thanks,
> > lujinhong
> >
>
>


Re: Minor GC time becomes longer and longer

2016-04-11 Thread Alexis Midon
Why such a gigantic heap? 30G.
In my experience, Kafka broker does not have to deal with long-lived
objects, it's all about many small, ephemeral objects. Most of the data is
kept off heap.

We've been happy with 5G heap, 2G being for the new generation. The server
has 8 cores and 60GB of ram.

Here are the jvm settings:

```
-XX:+AggressiveOpts -XX:+UseCompressedOops -XX:+UseFastEmptyMethods
-XX:+UseFastAccessorMethods -XX:+CMSClassUnloadingEnabled -Xms5g -Xmx5g
-XX:NewSize=2g -XX:MaxNewSize=2g -XX:+UseParNewGC -XX:ParallelGCThreads=8
-XX:MaxTenuringThreshold=15 -XX:SurvivorRatio=30 -XX:+UseConcMarkSweepGC
-XX:+CMSConcurrentMTEnabled
```

The GC logs look ok.
http://pastebin.com/rDyGNttY

Of course, let's not forget that GC tuning is tricky and can't be solved
over an email.




On Mon, Apr 11, 2016 at 3:31 AM Jakub Neubauer 
wrote:

> Hi,
> Did you consider G1 collector?
> http://docs.oracle.com/javase/7/docs/technotes/guides/vm/G1.html
> Jakub N.
>
> On 11.4.2016 12:08, jinhong lu wrote:
> > Minor GC time in my cluster becomes longer and longer, and broker loss
> session with zk:
> >
> >   2016-04-11T17:33:06.559+0800: 875.917:
> [GC2016-04-11T17:33:06.559+0800: 875.917: [ParNew:
> 3363139K->10287K(3774912K), 0.0858010 secs] 3875141K->522289K(31037888K),
> 0.0860890 secs] [Times: user=1.46 sys=0.02, real=0.09 secs]
> >   2016-04-11T17:38:50.477+0800: 1219.834:
> [GC2016-04-11T17:38:50.477+0800: 1219.835: [ParNew:
> 3365807K->11295K(3774912K), 0.0858600 secs] 3877809K->523297K(31037888K),
> 0.0861390 secs] [Times: user=1.50 sys=0.00, real=0.08 secs]
> >   2016-04-11T17:46:58.550+0800: 1707.908:
> [GC2016-04-11T17:46:58.550+0800: 1707.908: [ParNew:
> 3366815K->8463K(3774912K), 15.8853640 secs] 3878817K->526017K(31037888K),
> 15.8855810 secs] [Times: user=286.22 sys=0.00, real=15.89 secs]
> >   2016-04-11T17:54:37.601+0800: 2166.959:
> [GC2016-04-11T17:54:37.601+0800: 2166.959: [ParNew:
> 3363983K->5961K(3774912K), 22.5800540 secs] 3881537K->523515K(31037888K),
> 22.5803030 secs] [Times: user=406.07 sys=0.00, real=22.58 secs]
> >
> > Here is my jvm setting:
> >
> >   28808 kafka.Kafka -Xmx30G -Xms30G -Xmn4G -XX:+UseCompressedOops
> -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled
> -XX:+UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction=0
> -XX:+CMSParallelRemarkEnabled -XX:+DisableExplicitGC
> -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=75
> -Xnoclassgc -XX:+UseParNewGC -XX:+UseConcMarkSweepGC
> -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark
> -XX:+DisableExplicitGC -Djava.awt.headless=true
> -Xloggc:/home/hadoop/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps
> -Dcom.sun.management.jmxremote
> -Dcom.sun.management.jmxremote.authenticate=false
> -Dcom.sun.management.jmxremote.ssl=false
> -Dkafka.logs.dir=/home/hadoop/kafka/bin/../logs
> -Dlog4j.configuration=file:bin/../config/log4j.properties
> >
> > Any suggestion to tune the jvm ?
> >
> >
> > Thanks,
> > lujinhong
> >
>
>


Re: Kafka constant shrinking and expanding after deleting a topic

2016-04-05 Thread Alexis Midon
I ran into the same issue today. In a production cluster, I noticed the
"Shrinking ISR for partition" log messages for a topic deleted 2 months
ago.
Our staging cluster shows the same messages for all the topics deleted in
that cluster.
Both 0.8.2

Yifan, Guozhang, did you find a way to get rid of them?

thanks in advance,
alexis


On Tue, Apr 5, 2016 at 4:16 PM Guozhang Wang  wrote:

> It is possible, there are some discussions about a similar issue in KIP:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-53+-+Add+custom+policies+for+reconnect+attempts+to+NetworkdClient
>
> mailing thread:
>
> https://www.mail-archive.com/dev@kafka.apache.org/msg46868.html
>
>
>
> Guozhang
>
> On Tue, Apr 5, 2016 at 2:34 PM, Yifan Ying  wrote:
>
> > Some updates:
> >
> > Yesterday, right after release (producers and consumers reconnected to
> > Kafka/Zookeeper, but no code change in our producers and consumers), all
> > under replication issues were resolved automatically and no more high
> > latency in both Kafka and Zookeeper. But right after today's
> > release(producers and consumers re-connected again), the under
> replication
> > and high latency issue happened again. So the all-at-once reconnecting
> from
> > producers and consumers would cause the problem? And all these only
> > happened since I deleted a deprecated topic in production.
> >
> > Yifan
> >
> > On Tue, Apr 5, 2016 at 9:04 AM, Guozhang Wang 
> wrote:
> >
> >> These configs are mainly dependent on your publish throughput, since the
> >> replication throughput is higher bounded by the publish throughput. If
> the
> >> publish throughput is not high, then setting a lower threshold values in
> >> these two configs will cause churns in shrinking / expanding ISRs.
> >>
> >> Guozhang
> >>
> >> On Mon, Apr 4, 2016 at 11:55 PM, Yifan Ying  wrote:
> >>
> >>> Thanks for replying, Guozhang. We did increase both settings:
> >>>
> >>> replica.lag.max.messages=2
> >>>
> >>> replica.lag.time.max.ms=2
> >>>
> >>>
> >>> But no sure if these are good enough. And yes, that's a good suggestion
> >>> to monitor ZK performance.
> >>>
> >>>
> >>> Thanks.
> >>>
> >>> On Mon, Apr 4, 2016 at 8:58 PM, Guozhang Wang 
> >>> wrote:
> >>>
>  Hmm, it seems like your broker config "replica.lag.max.messages" and "
>  replica.lag.time.max.ms" is mis-configed regarding your replication
>  traffic, and the deletion of the topic actually makes it below the
>  threshold. What are the config values for these two? And could you
> try to
>  increase these configs and see if that helps?
> 
>  In 0.8.2.1 Kafka-consumer-offset-checker.sh access ZK to query the
>  consumer offsets one-by-one, and hence if your ZK read latency is
> high it
>  could take long time. You may want to monitor your ZK cluster
> performance
>  to check its read / write latencies.
> 
> 
>  Guozhang
> 
> 
> 
> 
> 
>  On Mon, Apr 4, 2016 at 10:59 AM, Yifan Ying 
> wrote:
> 
> > Hi Guozhang,
> >
> > It's 0.8.2.1. So it should be fixed? We also tried to start from
> > scratch by wiping out the data directory on both Kafka and
> Zookeeper. And
> > it's odd that the constant shrinking and expanding happened after
> fresh
> > restart, and high request latency as well. The brokers are using the
> same
> > config before topic deletion.
> >
> > Another observation is that, using the
> > Kafka-consumer-offset-checker.sh is extremely slow. Any suggestion
> would be
> > appreciated! Thanks.
> >
> > On Sun, Apr 3, 2016 at 2:29 PM, Guozhang Wang 
> > wrote:
> >
> >> Yifan,
> >>
> >> Are you on 0.8.0 or 0.8.1/2? There are some issues with zkVersion
> >> checking
> >> in 0.8.0 that are fixed in later minor releases of 0.8.
> >>
> >> Guozhang
> >>
> >> On Fri, Apr 1, 2016 at 7:46 PM, Yifan Ying 
> >> wrote:
> >>
> >> > Hi All,
> >> >
> >> > We deleted a deprecated topic on Kafka cluster(0.8) and started
> >> observing
> >> > constant 'Expanding ISR for partition' and 'Shrinking ISR for
> >> partition'
> >> > for other topics. As a result we saw a huge number of under
> >> replicated
> >> > partitions and very high request latency from Kafka. And it
> doesn't
> >> seem
> >> > able to recover itself.
> >> >
> >> > Anyone knows what caused this issue and how to resolve it?
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
> >
> > --
> > Yifan
> >
> >
> >
> 
> 
>  --
>  -- Guozhang
> 
> >>>
> >>>
> >>>
> >>> --
> >>> Yifan
> >>>
> >>>
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
> >
> > --
> > Yifan
> >
> >
> >
>
>
> --
> -- Guozhang
>


Re: Upgrade path to 0.10.x

2016-04-03 Thread Alexis Midon
- This type of questions is pretty common on the ml. Personally I don't
think I could confidently described the compatibility/upgrade scenarios.
Some clarification is needed, ideally in a single place.

I think this is even more crucial as the number of new features increases
(which is exciting)

- Also would it make sense to release and version the clients independently
of the server? Afaik clients are tied to a protocol version, not a broker
version.

At the moment the "new consumer" is in beta but shipped with non beta
server code. Quite confusing.

Some clarity and predictability would be helpful.


On Thu, Mar 31, 2016 at 06:18 Tommy Becker  wrote:

> Hey folks,
> We are currently still running Kafka 0.8.1.1, and are looking to upgrade.
> From what I have been able to tell, upgrading to 0.10.x will be supported
> from as far back as 0.8.2.x. But the documentation for 0.8.2.x seems to
> indicate that upgrading from 0.8.1.x effectively a no-op. So my question is
> does that imply that upgrading from 0.8.1.1 to 0.10.x will be supported?
>
> Regards,
> Tommy
>
>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>


Re: two questions

2016-03-21 Thread Alexis Midon
Hi Ismael,

could you elaborate on "newer clients don't work with older brokers
though."? doc pointers are fine.
I was under the impression that I could the 0.9 clients with 0.8 brokers.

thanks

Alexis

On Mon, Mar 21, 2016 at 2:05 AM Ismael Juma  wrote:

> Hi Allen,
>
> Answers inline.
>
> On Mon, Mar 21, 2016 at 5:56 AM, allen chan 
> wrote:
>
> > 1) I am using the upgrade instructions to upgrade from 0.8 to 0.9. Can
> > someone tell me if i need to continue to bump the
> > inter.broker.protocol.version after each upgrade? Currently the broker
> code
> > is 0.9.0.1 but i have the config file listing as
> > inter.broker.protocol.versi
> > on=0.9.0.0
> >
>
> When it comes to inter.broker.protocol.version, 0.9.0.0 and 0.9.0.1 are the
> same , so you don't have to. Internally, they are both mapped to 0.9.0.X.
>
>
> > 2) Is it possible to use multiple variations of producers / consumers?
> > My broker is on 0.9.0.1 and i am currently using 0.8.x
> producer/consumer. I
> > want to test the new producer first then the new consumer. So would there
> > be issues if the setup was:
> > 0.9.x producer -> 0.9.x broker -> 0.8.x consumer
> >
>
> Newer brokers support older clients, so this is fine. Note that newer
> clients don't work with older brokers though.
>
> Ismael
>


Re: Kafka broker decommission steps

2016-03-11 Thread Alexis Midon
I think the "scripts for operations" should be APIs that provides higher
level operations. It would be simpler and more powerful.


On Thu, Mar 10, 2016 at 10:54 AM Guozhang Wang <wangg...@gmail.com> wrote:

> Thanks Alexis, these are good points.
>
> We are aware of the partition assignment json problem and trying to improve
> it; as for packaging / configs etc I think the goal of the OS Kafka package
> itself is to provide universal and simple barebone scripts for operations,
> and users can wrap / customize themselves as they need.
>
> Guozhang
>
> On Sun, Mar 6, 2016 at 10:16 AM, Alexis Midon <
> alexis.mi...@airbnb.com.invalid> wrote:
>
> > also:
> > . the command line tool packaging is not ideal, you have to download the
> > (big) kafka zip, set paths, etc.
> >
> > . the details of the zk cluster and kafka cluster must be repeated on the
> > command line.
> >
> > with kafkat, you can use `gem install`, and a store the cluster details
> in
> > a config file.
> >
> > On Sun, Mar 6, 2016 at 10:12 AM Alexis Midon <alexis.mi...@airbnb.com>
> > wrote:
> >
> > > My recollection is that you have to come up with the partition
> assignment
> > > yourself, and pass the json file as an argument.
> > > This is quite error prone, especially during an outage.
> > >
> > > we quickly wrote kafkat to have a simple commands that would let us
> > > express needs without having to deal with the assignment plan: "move
> > topic
> > > X on these brokers", "retire broker X", "set replication factor of
> topic
> > X
> > > to R", etc
> > >
> > >
> > >
> > >
> > > On Sat, Mar 5, 2016 at 3:33 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> > >
> > >> Hello Alexis,
> > >>
> > >> Could you share your findings about the command line tool? We can try
> to
> > >> resolve if there's any issues.
> > >>
> > >> Guozhang
> > >>
> > >> On Fri, Mar 4, 2016 at 3:13 PM, Alexis Midon <
> > >> alexis.mi...@airbnb.com.invalid> wrote:
> > >>
> > >> > The command line tool that ships with Kafka is error prone.
> > >> >
> > >> > Our standard procedure is:
> > >> > 1. spin up the new broker
> > >> > 2. use `kafkat drain  [--brokers ]
> > >> > 3. shut down old broker
> > >> >
> > >> > The `drain` command will generate and submit a partition assignment
> > plan
> > >> > where the new broker id replaces the old one. It's pretty much a
> > >> "gsub(old,
> > >> > new)".
> > >> >
> > >> > We do it regularly. It's almost a mundane operation. The only
> > challenge
> > >> is
> > >> > the volume of data being transferred over the network. Since there
> is
> > no
> > >> > throttling mechanism, the network is sometime saturated which might
> > >> impact
> > >> > other consumers/producers
> > >> >
> > >> > See https://github.com/airbnb/kafkat
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > On Fri, Mar 4, 2016 at 7:28 AM Todd Palino <tpal...@gmail.com>
> wrote:
> > >> >
> > >> > > To answer your questions…
> > >> > >
> > >> > > 1 - Not in the way you want it to. There is a setting for
> automatic
> > >> > leader
> > >> > > election (which I do not recommend anyone use at this time), but
> all
> > >> that
> > >> > > does is pick which of the currently assigned replicas should be
> the
> > >> > leader.
> > >> > > It does not reassign partitions from one broker to another. Kafka
> > does
> > >> > not
> > >> > > have a facility for doing this automatically.
> > >> > >
> > >> > > 2 - No. The most you can do is move all the partitions off and
> then
> > >> > > immediately shut down the broker process. Any broker that is live
> in
> > >> the
> > >> > > cluster can, and will, get partitions assigned to it by the
> > >> controller.
> > >> > >
> > >> > > For what you want to do, you need you use the partition
> reassignment
> > >> &

Re: Kafka 0.8 old high level consumer client gets inactive after few hours of inactivity

2016-03-10 Thread Alexis Midon
- To understand what the stuck consumer is doing, it would be useful to
collect the logs and a thread dump. I'd try to find out what the fetcher
threads are doing. What about the handler/application/stream threads?
- are the offsets committed? After a restart, could it be that the consumer
is just re-fetching from the last checkpoint, as opposed to fetching
messages produced after it stalled.
-  you ZK session timeout is huge, 16+mn. What is the zk session state
before the restart? If the consumer is DISCONNECTED from the ZK cluster,
the partitions assigned to it won't be released until the timeout. During
that time, the messages sent to these partitions will not be consumed.
I believe the logs will show the ZK session state transitions.




On Thu, Mar 10, 2016 at 5:31 AM Abhishek Chawla  wrote:

> Hi,
> I'm using kafka version 0.8.2.1 with old high level consumer api.
> I'm following this gude:
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
>
> The client is working fine but after few hours of inactivity it gets
> inactive and stops receiving messages.
> but if I restart my client again then it starts fetching all the messages
> which were not consumed during the time of it's inactivity.
>
> Im using this configuration:
> props.put("zookeeper.connect", ZOOKEEPER_HOST_ADDR);
> props.put("group.id", "test");
> props.put("zookeeper.session.timeout.ms","100");
> props.put("zookeeper.sync.time.ms","20");
> props.put("auto.commit.enable","true");
> props.put("auto.commit.interval.ms", "100");
> props.put("fetch.wait.max.ms", "100");
> props.put("rebalance.backoff.ms", "2");
>
> can someone please help me in figuring out whats going wrong here.
>
> --
> regards,
> Abhishek Chawla
>


Re: Kafka broker decommission steps

2016-03-06 Thread Alexis Midon
also:
. the command line tool packaging is not ideal, you have to download the
(big) kafka zip, set paths, etc.

. the details of the zk cluster and kafka cluster must be repeated on the
command line.

with kafkat, you can use `gem install`, and a store the cluster details in
a config file.

On Sun, Mar 6, 2016 at 10:12 AM Alexis Midon <alexis.mi...@airbnb.com>
wrote:

> My recollection is that you have to come up with the partition assignment
> yourself, and pass the json file as an argument.
> This is quite error prone, especially during an outage.
>
> we quickly wrote kafkat to have a simple commands that would let us
> express needs without having to deal with the assignment plan: "move topic
> X on these brokers", "retire broker X", "set replication factor of topic X
> to R", etc
>
>
>
>
> On Sat, Mar 5, 2016 at 3:33 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Hello Alexis,
>>
>> Could you share your findings about the command line tool? We can try to
>> resolve if there's any issues.
>>
>> Guozhang
>>
>> On Fri, Mar 4, 2016 at 3:13 PM, Alexis Midon <
>> alexis.mi...@airbnb.com.invalid> wrote:
>>
>> > The command line tool that ships with Kafka is error prone.
>> >
>> > Our standard procedure is:
>> > 1. spin up the new broker
>> > 2. use `kafkat drain  [--brokers ]
>> > 3. shut down old broker
>> >
>> > The `drain` command will generate and submit a partition assignment plan
>> > where the new broker id replaces the old one. It's pretty much a
>> "gsub(old,
>> > new)".
>> >
>> > We do it regularly. It's almost a mundane operation. The only challenge
>> is
>> > the volume of data being transferred over the network. Since there is no
>> > throttling mechanism, the network is sometime saturated which might
>> impact
>> > other consumers/producers
>> >
>> > See https://github.com/airbnb/kafkat
>> >
>> >
>> >
>> >
>> >
>> > On Fri, Mar 4, 2016 at 7:28 AM Todd Palino <tpal...@gmail.com> wrote:
>> >
>> > > To answer your questions…
>> > >
>> > > 1 - Not in the way you want it to. There is a setting for automatic
>> > leader
>> > > election (which I do not recommend anyone use at this time), but all
>> that
>> > > does is pick which of the currently assigned replicas should be the
>> > leader.
>> > > It does not reassign partitions from one broker to another. Kafka does
>> > not
>> > > have a facility for doing this automatically.
>> > >
>> > > 2 - No. The most you can do is move all the partitions off and then
>> > > immediately shut down the broker process. Any broker that is live in
>> the
>> > > cluster can, and will, get partitions assigned to it by the
>> controller.
>> > >
>> > > For what you want to do, you need you use the partition reassignment
>> > > command line tool that ships with Kafka to reassign partitions from
>> the
>> > old
>> > > broker to the new one. Once that is complete, you can double check
>> that
>> > the
>> > > old broker has no partitions left and shut it down. I have a tool
>> that we
>> > > use internally to make this a lot easier, and I’m in the process of
>> > getting
>> > > a repository set up to make it available via open source. It allows
>> for
>> > > more easily removing and adding brokers, and rebalancing partitions
>> in a
>> > > cluster without having to craft the reassignments by hand.
>> > >
>> > > -Todd
>> > >
>> > >
>> > > On Fri, Mar 4, 2016 at 5:07 AM, Muqtafi Akhmad <muqt...@traveloka.com
>> >
>> > > wrote:
>> > >
>> > > > dear Kafka users,
>> > > >
>> > > > I have some questions regarding decommissioning kafka broker node
>> and
>> > > > replacing it with the new one. Lets say that we have three broker
>> nodes
>> > > and
>> > > > each topic in Kafka has replication factor = 3, we upgrade one node
>> > with
>> > > > the following steps :
>> > > > 1. add one broker node to cluster
>> > > > 2. shutdown old broker node
>> > > >
>> > > > My questions are
>> > > > 1. When we add one new broker to the cluster will it trigger Kafka
>> > topic
>> > > /
>> > > > group leadership rebalance?
>> > > > 2. Is there any way to disable the to-be-decommissioned node to
>> hold no
>> > > > topic/group leadership (acting as passive copy) so that it can be
>> > > > decommissioned with minimal effect to Kafka clients?
>> > > >
>> > > > Thank you,
>> > > >
>> > > > --
>> > > > Muqtafi Akhmad
>> > > > Software Engineer
>> > > > Traveloka
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > *—-*
>> > > *Todd Palino*
>> > > Staff Site Reliability Engineer
>> > > Data Infrastructure Streaming
>> > >
>> > >
>> > >
>> > > linkedin.com/in/toddpalino
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>


Re: Kafka broker decommission steps

2016-03-06 Thread Alexis Midon
My recollection is that you have to come up with the partition assignment
yourself, and pass the json file as an argument.
This is quite error prone, especially during an outage.

we quickly wrote kafkat to have a simple commands that would let us express
needs without having to deal with the assignment plan: "move topic X on
these brokers", "retire broker X", "set replication factor of topic X to
R", etc




On Sat, Mar 5, 2016 at 3:33 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Alexis,
>
> Could you share your findings about the command line tool? We can try to
> resolve if there's any issues.
>
> Guozhang
>
> On Fri, Mar 4, 2016 at 3:13 PM, Alexis Midon <
> alexis.mi...@airbnb.com.invalid> wrote:
>
> > The command line tool that ships with Kafka is error prone.
> >
> > Our standard procedure is:
> > 1. spin up the new broker
> > 2. use `kafkat drain  [--brokers ]
> > 3. shut down old broker
> >
> > The `drain` command will generate and submit a partition assignment plan
> > where the new broker id replaces the old one. It's pretty much a
> "gsub(old,
> > new)".
> >
> > We do it regularly. It's almost a mundane operation. The only challenge
> is
> > the volume of data being transferred over the network. Since there is no
> > throttling mechanism, the network is sometime saturated which might
> impact
> > other consumers/producers
> >
> > See https://github.com/airbnb/kafkat
> >
> >
> >
> >
> >
> > On Fri, Mar 4, 2016 at 7:28 AM Todd Palino <tpal...@gmail.com> wrote:
> >
> > > To answer your questions…
> > >
> > > 1 - Not in the way you want it to. There is a setting for automatic
> > leader
> > > election (which I do not recommend anyone use at this time), but all
> that
> > > does is pick which of the currently assigned replicas should be the
> > leader.
> > > It does not reassign partitions from one broker to another. Kafka does
> > not
> > > have a facility for doing this automatically.
> > >
> > > 2 - No. The most you can do is move all the partitions off and then
> > > immediately shut down the broker process. Any broker that is live in
> the
> > > cluster can, and will, get partitions assigned to it by the controller.
> > >
> > > For what you want to do, you need you use the partition reassignment
> > > command line tool that ships with Kafka to reassign partitions from the
> > old
> > > broker to the new one. Once that is complete, you can double check that
> > the
> > > old broker has no partitions left and shut it down. I have a tool that
> we
> > > use internally to make this a lot easier, and I’m in the process of
> > getting
> > > a repository set up to make it available via open source. It allows for
> > > more easily removing and adding brokers, and rebalancing partitions in
> a
> > > cluster without having to craft the reassignments by hand.
> > >
> > > -Todd
> > >
> > >
> > > On Fri, Mar 4, 2016 at 5:07 AM, Muqtafi Akhmad <muqt...@traveloka.com>
> > > wrote:
> > >
> > > > dear Kafka users,
> > > >
> > > > I have some questions regarding decommissioning kafka broker node and
> > > > replacing it with the new one. Lets say that we have three broker
> nodes
> > > and
> > > > each topic in Kafka has replication factor = 3, we upgrade one node
> > with
> > > > the following steps :
> > > > 1. add one broker node to cluster
> > > > 2. shutdown old broker node
> > > >
> > > > My questions are
> > > > 1. When we add one new broker to the cluster will it trigger Kafka
> > topic
> > > /
> > > > group leadership rebalance?
> > > > 2. Is there any way to disable the to-be-decommissioned node to hold
> no
> > > > topic/group leadership (acting as passive copy) so that it can be
> > > > decommissioned with minimal effect to Kafka clients?
> > > >
> > > > Thank you,
> > > >
> > > > --
> > > > Muqtafi Akhmad
> > > > Software Engineer
> > > > Traveloka
> > > >
> > >
> > >
> > >
> > > --
> > > *—-*
> > > *Todd Palino*
> > > Staff Site Reliability Engineer
> > > Data Infrastructure Streaming
> > >
> > >
> > >
> > > linkedin.com/in/toddpalino
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Kafka broker decommission steps

2016-03-04 Thread Alexis Midon
The command line tool that ships with Kafka is error prone.

Our standard procedure is:
1. spin up the new broker
2. use `kafkat drain  [--brokers ]
3. shut down old broker

The `drain` command will generate and submit a partition assignment plan
where the new broker id replaces the old one. It's pretty much a "gsub(old,
new)".

We do it regularly. It's almost a mundane operation. The only challenge is
the volume of data being transferred over the network. Since there is no
throttling mechanism, the network is sometime saturated which might impact
other consumers/producers

See https://github.com/airbnb/kafkat





On Fri, Mar 4, 2016 at 7:28 AM Todd Palino  wrote:

> To answer your questions…
>
> 1 - Not in the way you want it to. There is a setting for automatic leader
> election (which I do not recommend anyone use at this time), but all that
> does is pick which of the currently assigned replicas should be the leader.
> It does not reassign partitions from one broker to another. Kafka does not
> have a facility for doing this automatically.
>
> 2 - No. The most you can do is move all the partitions off and then
> immediately shut down the broker process. Any broker that is live in the
> cluster can, and will, get partitions assigned to it by the controller.
>
> For what you want to do, you need you use the partition reassignment
> command line tool that ships with Kafka to reassign partitions from the old
> broker to the new one. Once that is complete, you can double check that the
> old broker has no partitions left and shut it down. I have a tool that we
> use internally to make this a lot easier, and I’m in the process of getting
> a repository set up to make it available via open source. It allows for
> more easily removing and adding brokers, and rebalancing partitions in a
> cluster without having to craft the reassignments by hand.
>
> -Todd
>
>
> On Fri, Mar 4, 2016 at 5:07 AM, Muqtafi Akhmad 
> wrote:
>
> > dear Kafka users,
> >
> > I have some questions regarding decommissioning kafka broker node and
> > replacing it with the new one. Lets say that we have three broker nodes
> and
> > each topic in Kafka has replication factor = 3, we upgrade one node with
> > the following steps :
> > 1. add one broker node to cluster
> > 2. shutdown old broker node
> >
> > My questions are
> > 1. When we add one new broker to the cluster will it trigger Kafka topic
> /
> > group leadership rebalance?
> > 2. Is there any way to disable the to-be-decommissioned node to hold no
> > topic/group leadership (acting as passive copy) so that it can be
> > decommissioned with minimal effect to Kafka clients?
> >
> > Thank you,
> >
> > --
> > Muqtafi Akhmad
> > Software Engineer
> > Traveloka
> >
>
>
>
> --
> *—-*
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino
>


Re: Need to understand the graph

2016-03-01 Thread Alexis Midon
Also the metrics kafka.network.RequestChannel.RequestQueueSize and
ResponseQueueSize
will give you the saturation of the network and IO threads.

On Tue, Mar 1, 2016 at 9:21 AM Alexis Midon <alexis.mi...@airbnb.com> wrote:

> "request queue time" is the time it takes for IO threads to pick up the
> request. As you increase the load on your broker, it makes sense to see
> higher queue time.
> Here are more details on the request/response model in a Kafka broker
> (0.8.2).
>
> All your requests and responses are belong to RequestChannel (
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala
> ).
>
> The center piece of all Kafka Request/Response handling is the
> `kafka.network.RequestChannel`. RequestChannel is a container for all Kafka
> Requests waiting to be handled, and the Kafka Responses ready to be sent
> back to the clients.
> Requests are queued in a single bounded queue. Up to `queued.max.requests`
> requests will be stored. The response queues are not bounded.
>
> ## First, let's see how the Requests are created.
> A SocketServer (created by the main class KafkaServer) starts up multiple
> threads to handle network connections in a non-blocking fashion:
> . a single Acceptor thread
> . `network.threads` Processor threads
>
> The Acceptor  thread handles OP_ACCEPT events and passes new socket
> channel to the Processor threads (with a simple round-robin algorithm).
>
> A Processor thread has 3 responsibilities:
> 1. register for monitoring the newly created socket channel passed by the
> Acceptor thread
> 2. read data from the socket channels until a complete Kafka request is
> deserialized. The Request is then handed off to the RequestChannel. The
> hand-off might block if the request queue is full.
> 3. poll the Kafka Responses queued in the RequestChannel, and write the
> data on the corresponding socket channel. Each Processor has its own queue
> in the RequestChannel so that a response is processed by the same thread
> which read the request from the connection.
>
> ## Second, let's see how the Responses are built.
> During start up KafkaServer creates an instance of
> KafkaRequestHandlerPool.  KafkaRequestHandlerPool is a thread pool of
> `io.thread` threads of KafkaRequestHandler (named "kafka-request-handler-"
> ).
> A KafkaRequestHandler has a fairly simple job: it polls Request from the
> RequestChannel and passes them to KafkaApis (`KafkaApis.handle()`).
> KafkaApis dispatches the request and eventually a Response is pushed to the
> RequestChannel. The response will then be picked up by a Processor thread
> as described earlier.
>
>
> A diagram:
> http://postimg.org/image/6gadny6px/
>
> The different stages of the request/response handling are nicely measured
> with the following metrics:
> kafka.network.RequestMetrics.RequestQueueTimeMs
> kafka.network.RequestMetrics.LocalTimeMs
> kafka.network.RequestMetrics.RemoteTimeMs
> kafka.network.RequestMetrics.ResponseQueueTimeMs
> kafka.network.RequestMetrics.ResponseSendTimeMs
> kafka.network.RequestMetrics.TotalTimeMs
>
>
>
> On Mon, Feb 29, 2016 at 2:30 PM awa...@pch.com <st.comm.c...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I have an issue with the timing of Kafka. As and when the time increases
>> with load testing, we see the increase in "request queue time". What is the
>> "request queue time" ?
>>
>> Some basic config we are using
>>
>> 2 Kafka Nodes(CPU - 8 CPU's [Thread(s) per core:2], Mem - 32 GB Ram)
>> pkafkaapp01.pchoso.com
>> pkafkaapp02.pchoso.com
>>
>> Tests duration: 13:05 - 14:05
>> Messages published - 869156
>>
>> Load Average, CPU, memory is all under control so not sure what the issue
>> is.
>>
>> Below are some SPM graphs showing the state of my system.
>> Here's the 'Requests' graph:
>>   https://apps.sematext.com/spm-reports/s/lCOJULIKuJ
>
>


Re: Need to understand the graph

2016-03-01 Thread Alexis Midon
"request queue time" is the time it takes for IO threads to pick up the
request. As you increase the load on your broker, it makes sense to see
higher queue time.
Here are more details on the request/response model in a Kafka broker
(0.8.2).

All your requests and responses are belong to RequestChannel (
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala
).

The center piece of all Kafka Request/Response handling is the
`kafka.network.RequestChannel`. RequestChannel is a container for all Kafka
Requests waiting to be handled, and the Kafka Responses ready to be sent
back to the clients.
Requests are queued in a single bounded queue. Up to `queued.max.requests`
requests will be stored. The response queues are not bounded.

## First, let's see how the Requests are created.
A SocketServer (created by the main class KafkaServer) starts up multiple
threads to handle network connections in a non-blocking fashion:
. a single Acceptor thread
. `network.threads` Processor threads

The Acceptor  thread handles OP_ACCEPT events and passes new socket channel
to the Processor threads (with a simple round-robin algorithm).

A Processor thread has 3 responsibilities:
1. register for monitoring the newly created socket channel passed by the
Acceptor thread
2. read data from the socket channels until a complete Kafka request is
deserialized. The Request is then handed off to the RequestChannel. The
hand-off might block if the request queue is full.
3. poll the Kafka Responses queued in the RequestChannel, and write the
data on the corresponding socket channel. Each Processor has its own queue
in the RequestChannel so that a response is processed by the same thread
which read the request from the connection.

## Second, let's see how the Responses are built.
During start up KafkaServer creates an instance of
KafkaRequestHandlerPool.  KafkaRequestHandlerPool is a thread pool of
`io.thread` threads of KafkaRequestHandler (named "kafka-request-handler-"
).
A KafkaRequestHandler has a fairly simple job: it polls Request from the
RequestChannel and passes them to KafkaApis (`KafkaApis.handle()`).
KafkaApis dispatches the request and eventually a Response is pushed to the
RequestChannel. The response will then be picked up by a Processor thread
as described earlier.


A diagram:
http://postimg.org/image/6gadny6px/

The different stages of the request/response handling are nicely measured
with the following metrics:
kafka.network.RequestMetrics.RequestQueueTimeMs
kafka.network.RequestMetrics.LocalTimeMs
kafka.network.RequestMetrics.RemoteTimeMs
kafka.network.RequestMetrics.ResponseQueueTimeMs
kafka.network.RequestMetrics.ResponseSendTimeMs
kafka.network.RequestMetrics.TotalTimeMs



On Mon, Feb 29, 2016 at 2:30 PM awa...@pch.com 
wrote:

> Hello,
>
> I have an issue with the timing of Kafka. As and when the time increases
> with load testing, we see the increase in "request queue time". What is the
> "request queue time" ?
>
> Some basic config we are using
>
> 2 Kafka Nodes(CPU - 8 CPU's [Thread(s) per core:2], Mem - 32 GB Ram)
> pkafkaapp01.pchoso.com
> pkafkaapp02.pchoso.com
>
> Tests duration: 13:05 - 14:05
> Messages published - 869156
>
> Load Average, CPU, memory is all under control so not sure what the issue
> is.
>
> Below are some SPM graphs showing the state of my system.
> Here's the 'Requests' graph:
>   https://apps.sematext.com/spm-reports/s/lCOJULIKuJ


Re: regarding Kafka Queing system

2016-02-26 Thread Alexis Midon
You can fetch messages by offset.
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchRequest


On Fri, Feb 26, 2016 at 7:23 AM rahul shukla 
wrote:

> Hello,
> I am working SNMP trap parsing project in my acadmic. i am using kafka
> message system in my project. Actully  i want to store trap object in
> kafka which is getting from snmp agent, and retrieve that object on
> another side for further processing.
> So, my query is , IS there any way to store a particular  Event in
> kafka and retrieve that event on second side for  further processing
> ?.
>
> Please assist me.
>
>
> Thanks & Regards
> Rahul Shukla
> +91-9826277980
>


Re: Unable to start Kafka cluster after crash (0.8.2.2)

2016-02-24 Thread Alexis Midon
regarding the "Allocation Failure" messages, these are not errors, it's the
standard behavior of a generational GC. I let you google the details, there
are tons of resources.
for ex,
https://plumbr.eu/blog/garbage-collection/understanding-garbage-collection-logs

I believe you should stop the broker 1, and wipe out the data for the
topic. Once restarted, replication will restore the data.

On Wed, Feb 24, 2016 at 8:22 AM Anthony Sparks 
wrote:

> Hello,
>
> Our Kafka cluster (3 servers, each server has Zookeeper and Kafka installed
> and running) crashed, and actually out of the 6 processes only one
> Zookeeper instance remained alive.  The logs do not indicate much, the only
> errors shown were:
>
> *2016-02-21T12:21:36.881+: 27445381.013: [GC (Allocation Failure)
> 27445381.013: [ParNew: 136472K->159K(153344K), 0.0047077 secs]
> 139578K->3265K(507264K), 0.0048552 secs] [Times: user=0.01 sys=0.00,
> real=0.01 secs]*
>
> These errors were both in the Zookeeper and the Kafka logs, and it appears
> they have been happening everyday (with no impact on Kafka, except for
> maybe now?).
>
> The crash is concerning, but not as concerning as what we are encountering
> right now.  I am unable to get the cluster back up.  Two of the three nodes
> halt with this fatal error:
>
> *[2016-02-23 21:18:47,251] FATAL [ReplicaFetcherThread-0-0], Halting
> because log truncation is not allowed for topic audit_data, Current leader
> 0's latest offset 52844816 is less than replica 1's latest offset 52844835
> (kafka.server.ReplicaFetcherThread)*
>
> The other node that manages to stay alive is unable to fulfill writes
> because we have min.ack set to 2 on the producers (requiring at least two
> nodes to be available).  We could change this, but that doesn't fix our
> overall problem.
>
> In browsing the Kafka code, in ReplicaFetcherThread.scala there is this
> little nugget:
>
> *// Prior to truncating the follower's log, ensure that doing so is not
> disallowed by the configuration for unclean leader election.*
> *// This situation could only happen if the unclean election configuration
> for a topic changes while a replica is down. Otherwise,*
> *// we should never encounter this situation since a non-ISR leader cannot
> be elected if disallowed by the broker configuration.*
> *if (!LogConfig.fromProps(brokerConfig.toProps,
> AdminUtils.fetchTopicConfig(replicaMgr.zkClient,*
> *topicAndPartition.topic)).uncleanLeaderElectionEnable) {*
> *// Log a fatal error and shutdown the broker to ensure that data loss
> does not unexpectedly occur.*
> *fatal("Halting because log truncation is not allowed for topic
> %s,".format(topicAndPartition.topic) +*
> *  " Current leader %d's latest offset %d is less than replica %d's
> latest offset %d"*
> *  .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId,
> replica.logEndOffset.messageOffset))*
> *Runtime.getRuntime.halt(1)*
> *}*
>
> For each one of our Kafka instances we have them set at:
> *unclean.leader.election.enable=false *which hasn't changed at all since we
> deployed the cluster (verified by file modification stamps).  This to me
> would indicate the above comment assertion is incorrect; we have
> encountered a non-ISR leader elected even though it is configured not to do
> so.
>
> Any ideas on how to work around this?
>
> Thank you,
>
> Tony Sparks
>


Re: Enable Kafka Consumer 0.8.2.1 Reconnect to Zookeeper

2016-02-18 Thread Alexis Midon
A Connection and a ZK session are two different things.

The ZK master keeps track of a client session validity. When a client
connection gets interrupted, its associated session goes into Disconnected
state, after a while it will be Expired,  but  if a new connection is
established before the timeout then the session will be Connected again.

see
http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions

Example state transitions for an expired session as seen by the expired
session's watcher:

   1.

   'connected' : session is established and client is communicating with
   cluster (client/server communication is operating properly)
   2.

    client is partitioned from the cluster
   3.

   'disconnected' : client has lost connectivity with the cluster
   4.

    time elapses, after 'timeout' period the cluster expires the
   session, nothing is seen by client as it is disconnected from cluster
   5.

    time elapses, the client regains network level connectivity with
   the cluster
   6.

   'expired' : eventually the client reconnects to the cluster, it is then
   notified of the expiration



On Wed, Feb 17, 2016 at 11:38 AM Christian Posta <christian.po...@gmail.com>
wrote:

> I believe so. Happy to be corrected.
>
> On Wed, Feb 17, 2016 at 12:31 PM, Joe San <codeintheo...@gmail.com> wrote:
>
> > So if I use the High Level Consumer API, using the ConsumerConnector, I
> get
> > this automatic zookeeper connection for free?
> >
> > On Wed, Feb 17, 2016 at 8:25 PM, Christian Posta <
> > christian.po...@gmail.com>
> > wrote:
> >
> > > i believe reconnect is handled automatically by the client... is that
> > what
> > > you're asking
> > > peek here to see how it does that and when:
> > >
> > >
> > >
> >
> https://github.com/apache/zookeeper/blob/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java#L1153
> > >
> > > On Wed, Feb 17, 2016 at 12:14 PM, Joe San <codeintheo...@gmail.com>
> > wrote:
> > >
> > > > It is all pretty strange. Here is what I see in my logs as soon as I
> > > > voluntarily shutdown Zookeeper!
> > > >
> > > > java.net.ConnectException: Connection refused
> > > > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > > ~[na:1.8.0_60]
> > > > at
> > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> > > > ~[na:1.8.0_60]
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> > > > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > > > at
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> > > > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > > > 20160217-20:12:44.960+0100
> > > > [sally-kafka-consumer-akka.actor.default-dispatcher-4-SendThread(
> > > > 127.0.0.1:2181)] INFO  org.apache.zookeeper.ClientCnxn - Opening
> > socket
> > > > connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to
> > > > authenticate using SASL (unknown error)
> > > > 20160217-20:12:44.960+0100
> > > > [sally-kafka-consumer-akka.actor.default-dispatcher-4-SendThread(
> > > > 127.0.0.1:2181)] WARN  org.apache.zookeeper.ClientCnxn - Session
> > > > 0x152ea19656b005c for server null, unexpected error, closing socket
> > > > connection and attempting reconnect
> > > > java.net.ConnectException: Connection refused
> > > > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > > ~[na:1.8.0_60]
> > > > at
> > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> > > > ~[na:1.8.0_60]
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> > > > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > > > at
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> > > > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > > >
> > > > It just keep repeating trying to reconnect for ever! So I just wanted
> > to
> > > > know which property from my setting in my email above is responsible
> > for
> > > > this auto reconnect mechanism?
> > > >
> > > > On Wed, Feb 17, 2016 at 8:04 PM, Christian Posta <
> > > > christian.po...@gmail.com>
> > > > wrote:
> > > >
> > > > > Yep, assu

Re: Enable Kafka Consumer 0.8.2.1 Reconnect to Zookeeper

2016-02-17 Thread Alexis Midon
By "re-connect", I'm assuming that the ZK session is expired, not
disconnected.
For details see
http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions

In that case, the high level consumer is basically dead, and the
application should create a new instance of it.


On Mon, Feb 15, 2016 at 12:22 PM Joe San  wrote:

> Any ideas as to which property should I set to enable Zookeeper
> re-connection? I have the following properties defined for my consumer
> (High Level Consumer API). Is this enough for a automatic Zookeeper
> re-connect?
>
> val props = new Properties()
> props.put("zookeeper.connect", zookeeper)
> props.put("group.id", groupId)
> props.put("auto.commit.enabled", "false")
> // this timeout is needed so that we do not block on the stream!
> props.put("consumer.timeout.ms", "1")
> props.put("zookeeper.sync.time.ms", "200")
>


Re: Compression - MessageSet size

2016-02-17 Thread Alexis Midon
- it would be interesting to see the actual ProduceRequests/Responses and
FetchReq/Resp.
- at this point I would dive into the broker source code and follow the
fetch request handling.
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L418



On Tue, Feb 16, 2016 at 10:24 AM Oleksiy Krivoshey <oleks...@gmail.com>
wrote:

> I'm not using Java client, I'm developing my own:
> https://github.com/oleksiyk/kafka
> And I'm talking about MessageSet on the wire, not the one passed to library
> user.
>
> Example:
> Consumer is not started.
> Producer sends a batch #1 of 5 messages, compressed in a single message.
> delay 50 ms
> Producer sends a batch #2 of 5 messages, compressed in a single message.
>
> Start consumer and it will receive a single MessageSet with two messages in
> it, both messages are corresponding compressed batches. Both have codec=2
> (using Snappy). In order to receive all 10 original messages I need to
> decompress both messages in this single MessageSet. Yes, I will then
> concatenate these messages and send a single MessageSet with 10 messages to
> library user, but I just want to clarify, the Protocol Guide says there
> should be single compressed message but I'm able to receive 2, 3 and more,
> all in single MessageSet.
>
> Kafka 0.9
>
> I can provide the actual buffers received.
>
> Thanks!
>
>
> On Tue, 16 Feb 2016 at 20:01 Alexis Midon <alexis.mi...@airbnb.com
> .invalid>
> wrote:
>
> > What makes you think there are 2? would you have data or code to share?
> >
> > When compression is enabled, multiple messages will be packed and
> > compressed in a MessageSet. That MessageSet will then have a single
> > message.
> > The interface however will let you iterate over the unpacked messages.
> See
> >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L166
> >
> >
> >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
> >
> >
> >
> > On Tue, Feb 16, 2016 at 3:33 AM Oleksiy Krivoshey <oleks...@gmail.com>
> > wrote:
> >
> > > Hi!
> > >
> > > The Kafka 0.9 protocol guide
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Compression
> > > explicitly
> > > states that there should be only single compressed message in a
> > MessageSet,
> > > but I'm definitely receiving two compressed messages in a single
> > > MessageSet.
> > >
> > > Can someone please confirm if this should be expected behaviour?
> > >
> > > Thanks!
> > >
> >
>


Re: Replication Factor and number of brokers

2016-02-17 Thread Alexis Midon
it will throw an exception:
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala#L76-L78

On Tue, Feb 16, 2016 at 3:55 PM Alex Loddengaard  wrote:

> Hi Sean, you'll want equal or more brokers than your replication factor.
> Meaning, if your replication factor is 3, you'll want 3 or more brokers.
>
> I'm not sure what Kafka will do if you have fewer brokers than your
> replication factor. It will either give you the highest replication factor
> it can (in this case, the number of brokers), or it will put more than one
> replica on some brokers. My guess is the former, but again, I'm not sure.
>
> Hope this helps.
>
> Alex
>
> On Tue, Feb 16, 2016 at 7:47 AM, Damian Guy  wrote:
>
> > Then you'll have under-replicated partitions. However, even if you have 3
> > brokers with a replication factor of 2 and you lose a single broker
> you'll
> > still likely have under-replicated partitions.
> > Partitions are assigned to brokers, 1 broker will be the leader and n
> > brokers will be followers. If any of the brokers with replicas of the
> > partition on it crash then you'll have under-replicated partitions.
> >
> >
> > On 16 February 2016 at 14:45, Sean Morris (semorris)  >
> > wrote:
> >
> > > So if I have a replication factor of 2, but only 2 brokers, then
> > > replication works, but what if I lose one broker?
> > >
> > > Thanks,
> > > Sean
> > >
> > > On 2/16/16, 9:14 AM, "Damian Guy"  wrote:
> > >
> > > >Hi,
> > > >
> > > >You need to have at least replication factor brokers.
> > > >replication factor  = 1 is no replication.
> > > >
> > > >HTH,
> > > >Damian
> > > >
> > > >On 16 February 2016 at 14:08, Sean Morris (semorris) <
> > semor...@cisco.com>
> > > >wrote:
> > > >
> > > >> Should your number of brokers be atleast one more then your
> > replication
> > > >> factor of your topic(s)?
> > > >>
> > > >> So if I have a replication factor of 2, I need atleast 3 brokers?
> > > >>
> > > >> Thanks,
> > > >> Sean
> > > >>
> > > >>
> > >
> > >
> >
>
>
>
> --
> *Alex Loddengaard | **Solutions Architect | Confluent*
> *Download Apache Kafka and Confluent Platform: www.confluent.io/download
> *
>


Re: Questions from new user

2016-02-17 Thread Alexis Midon
0. I don't understand how deleting log files quickly relates to file/page
cache. Only consumer read patterns are the main factor here afaik.
The OS will eventually discard unused cached pages. I'm not an expert of
page cache policies though, and will be happy to learn.

1. have a look at per-topic config
https://kafka.apache.org/documentation.html#topic-config

2.
- please tell us:
  . what version of the kafka producer/consumer you're using? 0.9, or 0.8?
  . what exact metrics you're referring to?
http://docs.confluent.io/1.0/kafka/monitoring.html

- I'm assuming you're talking about the request-size-avg metric of the
__producer__ 0.9, as described in
http://kafka.apache.org/documentation.html#new_producer_monitoring

If so, the produce request will be capped by the message max size indeed.
Another limiting factor would be the `linger.ms` setting of the producer,
in case of low message rate.
Otherwise, please share the exact metrics you're using for the consumer lag.

- Splitting/Assembling messages in the application sounds quite a pain.

- the lag could also be introduced by the application processing the
messages. Have you checked that side?



On Tue, Feb 16, 2016 at 7:30 PM allen chan 
wrote:

> Hi can anyone help with this?
>
> On Fri, Jan 29, 2016 at 11:50 PM, allen chan  >
> wrote:
>
> > Use case: We are using kafka as broker in one of our elasticsearch
> > clusters. Kafka caches the logs if elasticsearch has any performance
> > issues.  I have Kafka set to delete logs pretty quickly to keep things in
> > the file cache to limit IO.
> >
> > Questions:
> > 1. in 0.9 it seems like consumer offers are stored only in Kafka. Is
> there
> > a way to configure Kafka to delete my production logs pretty quickly but
> > have a different retention behavior for the consumer offsets?
> >
> > 2. Our consumer lag monitoring show us that a lot of times our consumers
> > are behind somewhere between 500 to 1000 messages. Looking at the JMX
> > metrics requestSizeAvg and requestSizeMax, it shows our average request
> > size is 500 bytes and max request size is 800,000 bytes. I assume the lag
> > is because that batch could only hold one message given the max is
> 100
> > bytes. I plan to enable compression and increase the max.bytes to 10mb to
> > fix this short term. In a few blogs, people mentioned the ultimate fix
> > should be splitting the message into smaller chunks in the producer and
> > then having the consumer put it back together. Is that handled in the
> kafka
> > producer/consumer natively or has to be handled outside of it?
> >
> > Thanks for the attention.
> > Allen Chan
> >
> >
> >
>
>
> --
> Allen Michael Chan
>


Re: Compression - MessageSet size

2016-02-16 Thread Alexis Midon
What makes you think there are 2? would you have data or code to share?

When compression is enabled, multiple messages will be packed and
compressed in a MessageSet. That MessageSet will then have a single message.
The interface however will let you iterate over the unpacked messages. See
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L166


https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala



On Tue, Feb 16, 2016 at 3:33 AM Oleksiy Krivoshey 
wrote:

> Hi!
>
> The Kafka 0.9 protocol guide
>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Compression
> explicitly
> states that there should be only single compressed message in a MessageSet,
> but I'm definitely receiving two compressed messages in a single
> MessageSet.
>
> Can someone please confirm if this should be expected behaviour?
>
> Thanks!
>


Re: Consumer backwards compatibility

2016-02-12 Thread Alexis Midon
When porting an existing consumer group from 0.8.2 to 0.9 (clients and
brokers), is there any impact on the last committed offsets?
is the "native offset storage" feature enable by default?




On Thu, Feb 11, 2016 at 4:52 PM Jason Gustafson  wrote:

> The new Java consumer in 0.9.0 will not work with 0.8.2 since it depends on
> the group management protocol built into Kafka, but the older consumer
> should still work.
>
> -Jason
>
> On Thu, Feb 11, 2016 at 2:44 AM, Joe San  wrote:
>
> > I have a 0.9.0 version of the Kafka consumer. Would that work against the
> > 0.8.2 broker?
> >
>


Re: automatically consume from all topics

2015-09-18 Thread Alexis Midon
Thanks Tao.
for reference, here is the corresponding listener
https://github.com/apache/kafka/blob/9dbeb71ab258955e04b46991c1baf880b07633f4/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L1000-L1002

On Sat, Sep 12, 2015 at 5:42 AM, tao xiao <xiaotao...@gmail.com> wrote:

> It is handled by the implementation of the high level consumer
> automatically. High level consumer internal registers an listener on ZK
> topic znode. Any changes to it including topic creation high level consumer
> will be notify and and then rebalanced topics if the new topic matches the
> regex. You can check the source code high level for reference
> On Sat, Sep 12, 2015 at 5:18 AM Alexis Midon
> <alexis.mi...@airbnb.com.invalid> wrote:
>
> > When a new topic is created, I agree that the regex would remain
> unchanged
> > but how would an existing consumer be notified of the topic creation?
> > afaik there's no such notification mechanism in the High level consumer.
> >
> >
> >
> > On Thu, Sep 10, 2015 at 8:43 AM, tao xiao <xiaotao...@gmail.com> wrote:
> >
> > > You can create message streams using regex that includes all topics.
> The
> > > beauty of regex is that any new topic created will be automatically
> > > consumed as long as the name of the topic matches the regex
> > >
> > > You check the method createMessageStreamsByFilter in high level API
> > >
> > > On Thu, Sep 10, 2015 at 11:03 PM Gerrit Jansen van Vuuren <
> > > gerrit...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > I'm not sure about the high level consumer but I maintain a kafka
> > > consumer
> > > > that can add and remove topics dynamically.
> > > >
> > > > https://github.com/gerritjvv/kafka-fast
> > > > see
> > > >
> > > >
> > >
> >
> https://github.com/gerritjvv/kafka-fast/blob/master/kafka-clj/java/kakfa_clj/core/Consumer.java
> > > > if you're using java/scala
> > > >
> > > >
> > > >
> > > > On Thu, Sep 10, 2015 at 3:48 PM, Helleren, Erik <
> > > > erik.helle...@cmegroup.com>
> > > > wrote:
> > > >
> > > > > So, there are several ways to do this.  Lets assume the goal is to
> > add
> > > > > more topics to the application at runtime.  And that this app is
> > > > currently
> > > > > written to be distributed via the magic of consumer groups.
> Sadly, I
> > > > > don¹t think the High level consumer is well designed for this
> > > particular
> > > > > use case.  The app would have to poll using something like the
> topic
> > > list
> > > > > script (bin/kafka-topics.sh --list Š), close the existing high
> level
> > > > > consumer on a change, and start a new one.  And then do this on all
> > the
> > > > > nodes of your application (Should be easier than doing it on just
> one
> > > > > actually).  This would result in a huge latency spike and a problem
> > > when
> > > > > it comes to migrating the state involved in your example
> expectation.
> > > > >
> > > > > The next option still requires polling, but it needs a custom FT
> and
> > > > > distribution scheme.  There might need a leader so some things only
> > > > happen
> > > > > once.  Just use the simple consumer API, and have one thread per
> > > > > partition.  The leader would have to tell a follower something like
> > > > ³Start
> > > > > listening to topic X, partition Y², which is risky and difficult to
> > do.
> > > > >
> > > > > The simplest option, assuming that each topic is independent when
> it
> > > > comes
> > > > > to expectations, is don¹t go with a cluster.  Just have a
> > > script/watcher
> > > > > app that does the polling and then, when it detects new topics, for
> > > each
> > > > > new topic: start a new instance of your app on a new box that
> listens
> > > to
> > > > > that single topic.  It might take a few seconds to startup, but its
> > > easy
> > > > > to code, easy to maintain, and easy to understand.  Which makes
> for a
> > > > more
> > > > > resilient application.
> > > > > -Erik
> > > > >
> > > > >
> > > > >
> > > > > From:  Jo

Re: automatically consume from all topics

2015-09-11 Thread Alexis Midon
When a new topic is created, I agree that the regex would remain unchanged
but how would an existing consumer be notified of the topic creation?
afaik there's no such notification mechanism in the High level consumer.



On Thu, Sep 10, 2015 at 8:43 AM, tao xiao  wrote:

> You can create message streams using regex that includes all topics. The
> beauty of regex is that any new topic created will be automatically
> consumed as long as the name of the topic matches the regex
>
> You check the method createMessageStreamsByFilter in high level API
>
> On Thu, Sep 10, 2015 at 11:03 PM Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > Hi,
> >
> > I'm not sure about the high level consumer but I maintain a kafka
> consumer
> > that can add and remove topics dynamically.
> >
> > https://github.com/gerritjvv/kafka-fast
> > see
> >
> >
> https://github.com/gerritjvv/kafka-fast/blob/master/kafka-clj/java/kakfa_clj/core/Consumer.java
> > if you're using java/scala
> >
> >
> >
> > On Thu, Sep 10, 2015 at 3:48 PM, Helleren, Erik <
> > erik.helle...@cmegroup.com>
> > wrote:
> >
> > > So, there are several ways to do this.  Lets assume the goal is to add
> > > more topics to the application at runtime.  And that this app is
> > currently
> > > written to be distributed via the magic of consumer groups.  Sadly, I
> > > don¹t think the High level consumer is well designed for this
> particular
> > > use case.  The app would have to poll using something like the topic
> list
> > > script (bin/kafka-topics.sh --list Š), close the existing high level
> > > consumer on a change, and start a new one.  And then do this on all the
> > > nodes of your application (Should be easier than doing it on just one
> > > actually).  This would result in a huge latency spike and a problem
> when
> > > it comes to migrating the state involved in your example expectation.
> > >
> > > The next option still requires polling, but it needs a custom FT and
> > > distribution scheme.  There might need a leader so some things only
> > happen
> > > once.  Just use the simple consumer API, and have one thread per
> > > partition.  The leader would have to tell a follower something like
> > ³Start
> > > listening to topic X, partition Y², which is risky and difficult to do.
> > >
> > > The simplest option, assuming that each topic is independent when it
> > comes
> > > to expectations, is don¹t go with a cluster.  Just have a
> script/watcher
> > > app that does the polling and then, when it detects new topics, for
> each
> > > new topic: start a new instance of your app on a new box that listens
> to
> > > that single topic.  It might take a few seconds to startup, but its
> easy
> > > to code, easy to maintain, and easy to understand.  Which makes for a
> > more
> > > resilient application.
> > > -Erik
> > >
> > >
> > >
> > > From:  Joris Peeters 
> > > Reply-To:  "users@kafka.apache.org" 
> > > Date:  Thursday, September 10, 2015 at 6:09 AM
> > > To:  "users@kafka.apache.org" 
> > > Subject:  automatically consume from all topics
> > >
> > >
> > > Hello,
> > >
> > > Is there a simple way to set up a consumer that automatically picks up
> > all
> > > the topics for all the partitions, dynamically extending its range as
> new
> > > topics get created?
> > >
> > > The underlying idea is that we want to have a few over-arching
> consumers
> > > (I¹m aware that¹s not great for the scalability, but that¹s not such a
> > > concern at present), to
> > > -
> > > Gather various statistics, metrics, system pressure, Š and dispatch to
> > the
> > > appropriate  monitoring systems,
> > > -
> > > Apply some end-to-end business-logic testing, to continuously assert
> > > certain expectations (e.g. ³if this-sort-of message arrived, then we
> > > expect that-sort-of-message to be received within this time² etc).
> > >
> > >
> > > I¹m sure I can piece something together that does this, but perhaps it
> > > comes out of the box. (Couldn¹t find it, though).
> > > We¹re using the Java client and Kafka 8.2.1.
> > >
> > > Joris Peeters
> > > Developer
> > >
> > > Research and Data Technology
> > > T:
> > > +44 (0) 20 8576 5800
> > >
> > > Winton
> > > Grove House
> > > 27 Hammersmith Grove
> > > London W6 0NE
> > >
> > > wintoncapital.com 
> > >
> > >  
> > >
> > >
> > >
> > >
> > >
> > > Winton Capital Management Limited (³Winton²) is a limited company
> > > registered in England and Wales with its registered offices at 16 Old
> > > Bailey, London, EC4M 7EG (Registered Company No. 3311531).
> > >  Winton is authorised and regulated by the Financial Conduct Authority
> in
> > > the United Kingdom, registered as an investment adviser with the US
> > > Securities and Exchange Commission, registered with the US Commodity
> > > Futures Trading Commission and a member of the
> > >  National Futures 

Re: Migrating data from old brokers to new borkers question

2014-09-17 Thread Alexis Midon
we would be very happy to contribute. However a description of the current
plan and status regarding tooling would be helpful.
It would speed up the learning curve. You mentioned some jira tickets?

(maybe I should sign up to the developer mailing list and take the
conversation over there)

On Tue, Sep 16, 2014 at 6:46 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Since these tools are so useful, I wonder what it requires (from both
 Airbnb and Kafka) to merge this into Kafka project. I think there are
 couple of Jira regarding improved tool usability that this resolved.

 On Mon, Sep 15, 2014 at 11:45 AM, Alexis Midon
 alexis.mi...@airbedandbreakfast.com wrote:
  distribution will be even based on the number of partitions.
  It is the same logic as AdminUtils.
  see
 
 https://github.com/airbnb/kafkat/blob/master/lib/kafkat/command/reassign.rb#L39
 
  On Sun, Sep 14, 2014 at 6:05 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
  This is great. Thanks for sharing! Does kafkat automatically figure out
 the
  right reassignment strategy based on even data distribution?
 
  On Wed, Sep 3, 2014 at 12:12 AM, Alexis Midon 
  alexis.mi...@airbedandbreakfast.com wrote:
 
   Hi Marcin,
  
   A few weeks ago, I did an upgrade to 0.8.1.1 and then augmented the
  cluster
   from 3 to 9 brokers. All went smoothly.
   In a dev environment, we found out that the biggest pain point is to
 have
   to deal with the json file and the error-prone command line interface.
   So to make our life easier, my team mate Nelson [1] came up with
 kafkat:
   https://github.com/airbnb/kafkat
  
   We now install kafkat on every broker. Note that kafkat does NOT
 connect
  to
   a broker, but to zookeeper. So you can actually use it from any
 machine.
  
   For reassignment, please see:
   `kafkat reassign [topic] [--brokers ids] [--replicas n] `
   It will transparently generate and kick off a balanced assignment.
  
   feedback and contributions welcome! Enjoy!
  
   Alexis
  
   [1] https://github.com/nelgau
  
  
  
   On Tue, Aug 26, 2014 at 10:27 AM, Marcin Michalski 
  mmichal...@tagged.com
   wrote:
  
I am running on 0.8.1.1 and I thought that the partition
 reassignment
   tools
can do this job. Just was not sure if this is the best way to do
 this.
I will try this out in stage env first and will perform the same in
  prod.
   
Thanks,
marcin
   
   
On Mon, Aug 25, 2014 at 7:23 PM, Joe Stein joe.st...@stealth.ly
  wrote:
   
 Marcin, that is a typical task now.  What version of Kafka are you
running?

 Take a look at

  
 https://kafka.apache.org/documentation.html#basic_ops_cluster_expansion
 and


   
  
 
 https://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor

 Basically you can do a --generate to get existing JSON topology
 and
   with
 that take the results of Current partition replica assignment
 (the
first
 JSON that outputs) and make whatever changes (like sed old node
 for
  new
 node and add more replica's which increase the replication factor,
whatever
 you want) and then --execute.

 With lots of data this takes time so you will want to run
 --verify to
   see
 what is in progress... good thing do a node at a time (even topic
 at
  a
 time) however you want to manage and wait for it as such.

 The preferred replica is simply the first one in the list of
   replicas.
  The kafka-preferred-replica-election.sh just makes that replica
 the
leader
 as this is not automatic yet.

 If you are running a version prior to 0.8.1.1 it might make sense
 to
 upgrade the old nodes first then run reassign to the new servers.


 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop 
 http://www.twitter.com/allthingshadoop
 /


 On Mon, Aug 25, 2014 at 8:59 PM, Marcin Michalski 
   mmichal...@tagged.com

 wrote:

  Hi, I would like to migrate my Kafka setup from old servers to
 new
 servers.
  Let say I have 8 really old servers that have the kafka
topics/partitions
  replicated 4 ways and want to migrate the data to 4 brand new
  servers
and
  want the replication factor be 3. I wonder if anyone has ever
   performed
  this type of migration?
 
  Will auto rebalancing take care of this automatically if I do
 the
  following?
 
  Let say I bring down old broker id 1 down and startup new server
   broker
 id
  100 up, is there a way to migrate all of the data of the topic
 that
   had
 the
  topic (where borker id 1 was the leader) over to the new broker
  100?
 
  Or do I need to use *bin/kafka-preferred-replica-election.sh *to
reassign
  the topics

Re: Migrating data from old brokers to new borkers question

2014-09-15 Thread Alexis Midon
distribution will be even based on the number of partitions.
It is the same logic as AdminUtils.
see
https://github.com/airbnb/kafkat/blob/master/lib/kafkat/command/reassign.rb#L39

On Sun, Sep 14, 2014 at 6:05 PM, Neha Narkhede neha.narkh...@gmail.com
wrote:

 This is great. Thanks for sharing! Does kafkat automatically figure out the
 right reassignment strategy based on even data distribution?

 On Wed, Sep 3, 2014 at 12:12 AM, Alexis Midon 
 alexis.mi...@airbedandbreakfast.com wrote:

  Hi Marcin,
 
  A few weeks ago, I did an upgrade to 0.8.1.1 and then augmented the
 cluster
  from 3 to 9 brokers. All went smoothly.
  In a dev environment, we found out that the biggest pain point is to have
  to deal with the json file and the error-prone command line interface.
  So to make our life easier, my team mate Nelson [1] came up with kafkat:
  https://github.com/airbnb/kafkat
 
  We now install kafkat on every broker. Note that kafkat does NOT connect
 to
  a broker, but to zookeeper. So you can actually use it from any machine.
 
  For reassignment, please see:
  `kafkat reassign [topic] [--brokers ids] [--replicas n] `
  It will transparently generate and kick off a balanced assignment.
 
  feedback and contributions welcome! Enjoy!
 
  Alexis
 
  [1] https://github.com/nelgau
 
 
 
  On Tue, Aug 26, 2014 at 10:27 AM, Marcin Michalski 
 mmichal...@tagged.com
  wrote:
 
   I am running on 0.8.1.1 and I thought that the partition reassignment
  tools
   can do this job. Just was not sure if this is the best way to do this.
   I will try this out in stage env first and will perform the same in
 prod.
  
   Thanks,
   marcin
  
  
   On Mon, Aug 25, 2014 at 7:23 PM, Joe Stein joe.st...@stealth.ly
 wrote:
  
Marcin, that is a typical task now.  What version of Kafka are you
   running?
   
Take a look at
   
  https://kafka.apache.org/documentation.html#basic_ops_cluster_expansion
and
   
   
  
 
 https://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
   
Basically you can do a --generate to get existing JSON topology and
  with
that take the results of Current partition replica assignment (the
   first
JSON that outputs) and make whatever changes (like sed old node for
 new
node and add more replica's which increase the replication factor,
   whatever
you want) and then --execute.
   
With lots of data this takes time so you will want to run --verify to
  see
what is in progress... good thing do a node at a time (even topic at
 a
time) however you want to manage and wait for it as such.
   
The preferred replica is simply the first one in the list of
  replicas.
 The kafka-preferred-replica-election.sh just makes that replica the
   leader
as this is not automatic yet.
   
If you are running a version prior to 0.8.1.1 it might make sense to
upgrade the old nodes first then run reassign to the new servers.
   
   
/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/
   
   
On Mon, Aug 25, 2014 at 8:59 PM, Marcin Michalski 
  mmichal...@tagged.com
   
wrote:
   
 Hi, I would like to migrate my Kafka setup from old servers to new
servers.
 Let say I have 8 really old servers that have the kafka
   topics/partitions
 replicated 4 ways and want to migrate the data to 4 brand new
 servers
   and
 want the replication factor be 3. I wonder if anyone has ever
  performed
 this type of migration?

 Will auto rebalancing take care of this automatically if I do the
 following?

 Let say I bring down old broker id 1 down and startup new server
  broker
id
 100 up, is there a way to migrate all of the data of the topic that
  had
the
 topic (where borker id 1 was the leader) over to the new broker
 100?

 Or do I need to use *bin/kafka-preferred-replica-election.sh *to
   reassign
 the topics/partitions from old broker 1 to broker 100? And then
 just
   keep
 doing the same thing until all of the old brokers are
 decommissioned?

 Also, would kafka-preferred-replica-election.sh let me actually
 lower
   the
 number of replicas as well, if I just simply make sure that given
 topic/partition was only elected 3 times versus 4?

 Thanks for your insight,
 Marcin

   
  
 



Re: GenericJMX plugin file for Kafka 0.8.1

2014-09-09 Thread Alexis Midon
looks like I can't edit the wiki page.
feel free to add https://github.com/airbnb/kafka-statsd-metrics2 to the
page.
thanks

On Tue, Sep 9, 2014 at 8:10 AM, Andrew Otto ao...@wikimedia.org wrote:

 We use jmxtrans to pull data out of JMX.


 https://github.com/wikimedia/puppet-kafka/blob/master/kafka-jmxtrans.json.md


 On Sep 9, 2014, at 7:54 AM, Navneet Gupta (Tech - BLR) 
 navneet.gu...@flipkart.com wrote:

  Hi,
 
  We plan to capture various metrics exposed in Kafka via JMX and wanted to
  check if anybody has a plugin file which could be reused.
 
  We found one such plugin file here
  
 https://github.com/deathsyn/chef-kafka/blob/master/templates/default/collectd_kafka-broker.conf.erb
 
  but
  it seems to be targeted for a different kafka version.
 
  --
  Thanks  Regards,
  Navneet Gupta




Re: Migrating data from old brokers to new borkers question

2014-09-03 Thread Alexis Midon
Hi Marcin,

A few weeks ago, I did an upgrade to 0.8.1.1 and then augmented the cluster
from 3 to 9 brokers. All went smoothly.
In a dev environment, we found out that the biggest pain point is to have
to deal with the json file and the error-prone command line interface.
So to make our life easier, my team mate Nelson [1] came up with kafkat:
https://github.com/airbnb/kafkat

We now install kafkat on every broker. Note that kafkat does NOT connect to
a broker, but to zookeeper. So you can actually use it from any machine.

For reassignment, please see:
`kafkat reassign [topic] [--brokers ids] [--replicas n] `
It will transparently generate and kick off a balanced assignment.

feedback and contributions welcome! Enjoy!

Alexis

[1] https://github.com/nelgau



On Tue, Aug 26, 2014 at 10:27 AM, Marcin Michalski mmichal...@tagged.com
wrote:

 I am running on 0.8.1.1 and I thought that the partition reassignment tools
 can do this job. Just was not sure if this is the best way to do this.
 I will try this out in stage env first and will perform the same in prod.

 Thanks,
 marcin


 On Mon, Aug 25, 2014 at 7:23 PM, Joe Stein joe.st...@stealth.ly wrote:

  Marcin, that is a typical task now.  What version of Kafka are you
 running?
 
  Take a look at
  https://kafka.apache.org/documentation.html#basic_ops_cluster_expansion
  and
 
 
 https://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
 
  Basically you can do a --generate to get existing JSON topology and with
  that take the results of Current partition replica assignment (the
 first
  JSON that outputs) and make whatever changes (like sed old node for new
  node and add more replica's which increase the replication factor,
 whatever
  you want) and then --execute.
 
  With lots of data this takes time so you will want to run --verify to see
  what is in progress... good thing do a node at a time (even topic at a
  time) however you want to manage and wait for it as such.
 
  The preferred replica is simply the first one in the list of replicas.
   The kafka-preferred-replica-election.sh just makes that replica the
 leader
  as this is not automatic yet.
 
  If you are running a version prior to 0.8.1.1 it might make sense to
  upgrade the old nodes first then run reassign to the new servers.
 
 
  /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  /
 
 
  On Mon, Aug 25, 2014 at 8:59 PM, Marcin Michalski mmichal...@tagged.com
 
  wrote:
 
   Hi, I would like to migrate my Kafka setup from old servers to new
  servers.
   Let say I have 8 really old servers that have the kafka
 topics/partitions
   replicated 4 ways and want to migrate the data to 4 brand new servers
 and
   want the replication factor be 3. I wonder if anyone has ever performed
   this type of migration?
  
   Will auto rebalancing take care of this automatically if I do the
   following?
  
   Let say I bring down old broker id 1 down and startup new server broker
  id
   100 up, is there a way to migrate all of the data of the topic that had
  the
   topic (where borker id 1 was the leader) over to the new broker 100?
  
   Or do I need to use *bin/kafka-preferred-replica-election.sh *to
 reassign
   the topics/partitions from old broker 1 to broker 100? And then just
 keep
   doing the same thing until all of the old brokers are decommissioned?
  
   Also, would kafka-preferred-replica-election.sh let me actually lower
 the
   number of replicas as well, if I just simply make sure that given
   topic/partition was only elected 3 times versus 4?
  
   Thanks for your insight,
   Marcin
  
 



Re: message size limit

2014-09-03 Thread Alexis Midon
Thanks Jun.

I'll create a jira and try to provide a patch. I think this is pretty
serious.

On Friday, August 29, 2014, Jun Rao jun...@gmail.com wrote:

 The goal of batching is mostly to reduce the # RPC calls to the broker. If
 compression is enabled, a larger batch typically implies better compression
 ratio.

 The reason that we have to fail the whole batch is that the error code in
 the produce response is per partition, instead of per message.

 Retrying individual messages on MessageSizeTooLarge seems reasonable.

 Thanks,

 Jun


 On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon 
 alexis.mi...@airbedandbreakfast.com javascript:; wrote:

  Could you explain the goals of batches? I was assuming this was simply a
  performance optimization, but this behavior makes me think I'm missing
  something.
  is a batch more than a list of *independent* messages?
 
  Why would you reject the whole batch? One invalid message causes the loss
  of batch.num.messages-1 messages :(
  It seems pretty critical to me.
 
  If ack=0, the producer will never know about it.
  If ack !=0, the producer will retry the whole batch. If the issue was
  related to data corruption (etc), retries might work. But in the case of
  big message, the batch will always be rejected and the producer will
 give
  up.
 
  If the messages are indeed considered independent, I think this is a
 pretty
  serious issue.
 
  I see 2 possible fix approaches:
  - the broker could reject only the invalid messages
  - the broker could reject the whole batch (like today) but the producer
 (if
  ack!=0) could retry messages one at a time on exception like
  MessageSizeTooLarge.
 
  opinions?
 
  Alexis
 
  ```
  [2014-08-29 16:00:35,170] WARN Produce request with correlation id 46
  failed due to [test,1]: kafka.common.MessageSizeTooLargeException
  (kafka.producer.async.DefaultEventHandler)
  [2014-08-29 16:00:35,284] WARN Produce request with correlation id 51
  failed due to [test,0]: kafka.common.MessageSizeTooLargeException
  (kafka.producer.async.DefaultEventHandler)
  [2014-08-29 16:00:35,392] WARN Produce request with correlation id 56
  failed due to [test,0]: kafka.common.MessageSizeTooLargeException
  (kafka.producer.async.DefaultEventHandler)
  [2014-08-29 16:00:35,499] WARN Produce request with correlation id 61
  failed due to [test,1]: kafka.common.MessageSizeTooLargeException
  (kafka.producer.async.DefaultEventHandler)
  [2014-08-29 16:00:35,603] ERROR Failed to send requests for topics test
  with correlation ids in [43,62]
 (kafka.producer.async.DefaultEventHandler)
  [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
  (kafka.producer.async.ProducerSendThread)
  kafka.common.FailedToSendMessageException: Failed to send messages after
 3
  tries.
  at
 
 
 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
   at
 
 
 kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
  at
 
 
 kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
   at
 
 
 kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
  ```
 
 
  On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao jun...@gmail.com
 javascript:; wrote:
 
   That's right. If one message in a batch exceeds the size limit, the
 whole
   batch is rejected.
  
   When determining message.max.bytes, the most important thing to
 consider
  is
   probably memory since currently we need to allocate memory for a full
   message in the broker and the producer and the consumer client.
  
   Thanks,
  
   Jun
  
  
   On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon 
   alexis.mi...@airbedandbreakfast.com javascript:; wrote:
  
am I miss reading this loop:
   
   
  
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
   
it seems like all messages from `validMessages` (which is
ByteBufferMessageSet) are NOT appended if one of the message size
  exceeds
the limit.
   
I hope I'm missing something.
   
   
   
On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon 
alexis.mi...@airbedandbreakfast.com javascript:; wrote:
   
 Hi Jun,

 thanks for you answer.
 Unfortunately the size won't help much, I'd like to see the actual
message
 data.

 By the way what are the things to consider when deciding on
 `message.max.bytes` value?






 On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao jun...@gmail.com
 javascript:; wrote:

 The message size check is currently only done on the broker. If
 you
enable
 trace level logging in RequestChannel, you will see the produce
   request,
 which includes the size of each partition.

 Thanks,

 Jun


 On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon 
 alexis.mi...@airbedandbreakfast.com javascript:; wrote:

  Hello,
 
  my brokers are reporting

Re: message size limit

2014-09-03 Thread Alexis Midon
Hi Bhavesh

can you explain what limit you're referring to?
I'm asking because `message.max.bytes` is applied per message not per batch.
is there another limit I should be aware of?

thanks


On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 Hi Jun,

 We have similar problem.  We have variable length of messages.  So when we
 have fixed size of Batch sometime the batch exceed the limit set on the
 brokers (2MB).

 So can Producer have some extra logic to determine the optimal batch size
 by looking at configured message.max.bytes  value.

 During the metadata update, Producer will get this value from the Broker
 for each topic and Producer will check if current batch size reach this
 limit than break batch into smaller chunk such way that It would not exceed
 limit (unless single message exceed the limit). Basically try to avoid data
 loss as much as possible.

 Please let me know what is your opinion on this...

 Thanks,

 Bhavesh


 On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon 
 alexis.mi...@airbedandbreakfast.com wrote:

  Thanks Jun.
 
  I'll create a jira and try to provide a patch. I think this is pretty
  serious.
 
  On Friday, August 29, 2014, Jun Rao jun...@gmail.com wrote:
 
   The goal of batching is mostly to reduce the # RPC calls to the broker.
  If
   compression is enabled, a larger batch typically implies better
  compression
   ratio.
  
   The reason that we have to fail the whole batch is that the error code
 in
   the produce response is per partition, instead of per message.
  
   Retrying individual messages on MessageSizeTooLarge seems reasonable.
  
   Thanks,
  
   Jun
  
  
   On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon 
   alexis.mi...@airbedandbreakfast.com javascript:; wrote:
  
Could you explain the goals of batches? I was assuming this was
 simply
  a
performance optimization, but this behavior makes me think I'm
 missing
something.
is a batch more than a list of *independent* messages?
   
Why would you reject the whole batch? One invalid message causes the
  loss
of batch.num.messages-1 messages :(
It seems pretty critical to me.
   
If ack=0, the producer will never know about it.
If ack !=0, the producer will retry the whole batch. If the issue was
related to data corruption (etc), retries might work. But in the case
  of
big message, the batch will always be rejected and the producer
 will
   give
up.
   
If the messages are indeed considered independent, I think this is a
   pretty
serious issue.
   
I see 2 possible fix approaches:
- the broker could reject only the invalid messages
- the broker could reject the whole batch (like today) but the
 producer
   (if
ack!=0) could retry messages one at a time on exception like
MessageSizeTooLarge.
   
opinions?
   
Alexis
   
```
[2014-08-29 16:00:35,170] WARN Produce request with correlation id 46
failed due to [test,1]: kafka.common.MessageSizeTooLargeException
(kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,284] WARN Produce request with correlation id 51
failed due to [test,0]: kafka.common.MessageSizeTooLargeException
(kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,392] WARN Produce request with correlation id 56
failed due to [test,0]: kafka.common.MessageSizeTooLargeException
(kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,499] WARN Produce request with correlation id 61
failed due to [test,1]: kafka.common.MessageSizeTooLargeException
(kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,603] ERROR Failed to send requests for topics
 test
with correlation ids in [43,62]
   (kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
(kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages
  after
   3
tries.
at
   
   
  
 
 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
 at
   
   
  
 
 kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at
   
   
  
 
 kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
 at
   
   
  
 
 kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
```
   
   
On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao jun...@gmail.com
   javascript:; wrote:
   
 That's right. If one message in a batch exceeds the size limit, the
   whole
 batch is rejected.

 When determining message.max.bytes, the most important thing to
   consider
is
 probably memory since currently we need to allocate memory for a
 full
 message in the broker and the producer and the consumer client.

 Thanks,

 Jun


 On Wed, Aug 27, 2014 at 9:52 PM

Re: message size limit

2014-09-03 Thread Alexis Midon
are you referring to socket.request.max.bytes ?
it looks like it could indeed limit the size of a batch accepted by a
broker.
So, you're right, batch.num.messages * message.max.bytes must be smaller
than socket.request.max.bytes.

It looks like this case has been addressed in the new producer.
See max.request.size and batch.size at
https://kafka.apache.org/documentation.html#newproducerconfigs

Alexis


On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 I am referring to wiki http://kafka.apache.org/08/configuration.html and
 following parameter control max batch message bytes as far as I know.
 Kafka Community, please correct me if I am wrong.  I do not want to create
 confusion for Kafka User Community here.   Also, if you increase this limit
 than you have to set the corresponding limit increase on consumer side as
 well (fetch.message.max.bytes).

 Since we are using batch async mode, our messages are getting drop sometime
 if the entire batch bytes  exceed this limit so I was asking Kafka
 Developers if any optimal way to determine the batch size based on this
 limit to minimize the data loss. Because, entire batch is rejected by
 brokers.

 message.max.bytes 100 The maximum size of a message that the server can
 receive. It is important that this property be in sync with the maximum
 fetch size your consumers use or else an unruly producer will be able to
 publish messages too large for consumers to consume.

 Thanks,

 Bhavesh


 On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon 
 alexis.mi...@airbedandbreakfast.com wrote:

  Hi Bhavesh
 
  can you explain what limit you're referring to?
  I'm asking because `message.max.bytes` is applied per message not per
  batch.
  is there another limit I should be aware of?
 
  thanks
 
 
  On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry 
 mistry.p.bhav...@gmail.com
  
  wrote:
 
   Hi Jun,
  
   We have similar problem.  We have variable length of messages.  So when
  we
   have fixed size of Batch sometime the batch exceed the limit set on the
   brokers (2MB).
  
   So can Producer have some extra logic to determine the optimal batch
 size
   by looking at configured message.max.bytes  value.
  
   During the metadata update, Producer will get this value from the
 Broker
   for each topic and Producer will check if current batch size reach this
   limit than break batch into smaller chunk such way that It would not
  exceed
   limit (unless single message exceed the limit). Basically try to avoid
  data
   loss as much as possible.
  
   Please let me know what is your opinion on this...
  
   Thanks,
  
   Bhavesh
  
  
   On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon 
   alexis.mi...@airbedandbreakfast.com wrote:
  
Thanks Jun.
   
I'll create a jira and try to provide a patch. I think this is pretty
serious.
   
On Friday, August 29, 2014, Jun Rao jun...@gmail.com wrote:
   
 The goal of batching is mostly to reduce the # RPC calls to the
  broker.
If
 compression is enabled, a larger batch typically implies better
compression
 ratio.

 The reason that we have to fail the whole batch is that the error
  code
   in
 the produce response is per partition, instead of per message.

 Retrying individual messages on MessageSizeTooLarge seems
 reasonable.

 Thanks,

 Jun


 On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon 
 alexis.mi...@airbedandbreakfast.com javascript:; wrote:

  Could you explain the goals of batches? I was assuming this was
   simply
a
  performance optimization, but this behavior makes me think I'm
   missing
  something.
  is a batch more than a list of *independent* messages?
 
  Why would you reject the whole batch? One invalid message causes
  the
loss
  of batch.num.messages-1 messages :(
  It seems pretty critical to me.
 
  If ack=0, the producer will never know about it.
  If ack !=0, the producer will retry the whole batch. If the issue
  was
  related to data corruption (etc), retries might work. But in the
  case
of
  big message, the batch will always be rejected and the producer
   will
 give
  up.
 
  If the messages are indeed considered independent, I think this
 is
  a
 pretty
  serious issue.
 
  I see 2 possible fix approaches:
  - the broker could reject only the invalid messages
  - the broker could reject the whole batch (like today) but the
   producer
 (if
  ack!=0) could retry messages one at a time on exception like
  MessageSizeTooLarge.
 
  opinions?
 
  Alexis
 
  ```
  [2014-08-29 16:00:35,170] WARN Produce request with correlation
 id
  46
  failed due to [test,1]: kafka.common.MessageSizeTooLargeException
  (kafka.producer.async.DefaultEventHandler)
  [2014-08-29 16:00:35,284] WARN Produce request with correlation
 id
  51
  failed

Re: message size limit

2014-08-29 Thread Alexis Midon
Could you explain the goals of batches? I was assuming this was simply a
performance optimization, but this behavior makes me think I'm missing
something.
is a batch more than a list of *independent* messages?

Why would you reject the whole batch? One invalid message causes the loss
of batch.num.messages-1 messages :(
It seems pretty critical to me.

If ack=0, the producer will never know about it.
If ack !=0, the producer will retry the whole batch. If the issue was
related to data corruption (etc), retries might work. But in the case of
big message, the batch will always be rejected and the producer will give
up.

If the messages are indeed considered independent, I think this is a pretty
serious issue.

I see 2 possible fix approaches:
- the broker could reject only the invalid messages
- the broker could reject the whole batch (like today) but the producer (if
ack!=0) could retry messages one at a time on exception like
MessageSizeTooLarge.

opinions?

Alexis

```
[2014-08-29 16:00:35,170] WARN Produce request with correlation id 46
failed due to [test,1]: kafka.common.MessageSizeTooLargeException
(kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,284] WARN Produce request with correlation id 51
failed due to [test,0]: kafka.common.MessageSizeTooLargeException
(kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,392] WARN Produce request with correlation id 56
failed due to [test,0]: kafka.common.MessageSizeTooLargeException
(kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,499] WARN Produce request with correlation id 61
failed due to [test,1]: kafka.common.MessageSizeTooLargeException
(kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,603] ERROR Failed to send requests for topics test
with correlation ids in [43,62] (kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
(kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
 at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
 at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
```


On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao jun...@gmail.com wrote:

 That's right. If one message in a batch exceeds the size limit, the whole
 batch is rejected.

 When determining message.max.bytes, the most important thing to consider is
 probably memory since currently we need to allocate memory for a full
 message in the broker and the producer and the consumer client.

 Thanks,

 Jun


 On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon 
 alexis.mi...@airbedandbreakfast.com wrote:

  am I miss reading this loop:
 
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
 
  it seems like all messages from `validMessages` (which is
  ByteBufferMessageSet) are NOT appended if one of the message size exceeds
  the limit.
 
  I hope I'm missing something.
 
 
 
  On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon 
  alexis.mi...@airbedandbreakfast.com wrote:
 
   Hi Jun,
  
   thanks for you answer.
   Unfortunately the size won't help much, I'd like to see the actual
  message
   data.
  
   By the way what are the things to consider when deciding on
   `message.max.bytes` value?
  
  
  
  
  
  
   On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao jun...@gmail.com wrote:
  
   The message size check is currently only done on the broker. If you
  enable
   trace level logging in RequestChannel, you will see the produce
 request,
   which includes the size of each partition.
  
   Thanks,
  
   Jun
  
  
   On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon 
   alexis.mi...@airbedandbreakfast.com wrote:
  
Hello,
   
my brokers are reporting that some received messages exceed the
`message.max.bytes` value.
I'd like to know what producers are at fault but It is pretty much
impossible:
- the brokers don't log the content of the rejected messages
- the log messages do not contain the IP of the producers
- on the consumer side, no exception is thrown (afaik it is because
   Ack-0
is used). The only kind of notification is to closed the connection.
   
[1] Do you have any suggestions to track down the guilty producers
 or
   find
out the message content?
   
Even though it makes total sense to have the limit defined and
 applied
   on
the brokers, I was thinking that this check could also be applied by
  the
producers. Some google results suggest that `message.max.bytes`
 might
  be
used by the producers but I can't find any trace of that behavior in
  the
code.
   
The closest thing I have is
   
   
  
 
 https://github.com/apache/kafka/blob/0.8.1/core

message size limit

2014-08-27 Thread Alexis Midon
Hello,

my brokers are reporting that some received messages exceed the
`message.max.bytes` value.
I'd like to know what producers are at fault but It is pretty much
impossible:
- the brokers don't log the content of the rejected messages
- the log messages do not contain the IP of the producers
- on the consumer side, no exception is thrown (afaik it is because Ack-0
is used). The only kind of notification is to closed the connection.

[1] Do you have any suggestions to track down the guilty producers or find
out the message content?

Even though it makes total sense to have the limit defined and applied on
the brokers, I was thinking that this check could also be applied by the
producers. Some google results suggest that `message.max.bytes` might be
used by the producers but I can't find any trace of that behavior in the
code.

The closest thing I have is
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
but it simply logs the message size and content and the log level is trace.

[2] could you please confirm if such a producer-side check exists?


thanks!

Alexis


Re: message size limit

2014-08-27 Thread Alexis Midon
Hi Jun,

thanks for you answer.
Unfortunately the size won't help much, I'd like to see the actual message
data.

By the way what are the things to consider when deciding on
`message.max.bytes` value?






On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao jun...@gmail.com wrote:

 The message size check is currently only done on the broker. If you enable
 trace level logging in RequestChannel, you will see the produce request,
 which includes the size of each partition.

 Thanks,

 Jun


 On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon 
 alexis.mi...@airbedandbreakfast.com wrote:

  Hello,
 
  my brokers are reporting that some received messages exceed the
  `message.max.bytes` value.
  I'd like to know what producers are at fault but It is pretty much
  impossible:
  - the brokers don't log the content of the rejected messages
  - the log messages do not contain the IP of the producers
  - on the consumer side, no exception is thrown (afaik it is because Ack-0
  is used). The only kind of notification is to closed the connection.
 
  [1] Do you have any suggestions to track down the guilty producers or
 find
  out the message content?
 
  Even though it makes total sense to have the limit defined and applied on
  the brokers, I was thinking that this check could also be applied by the
  producers. Some google results suggest that `message.max.bytes` might be
  used by the producers but I can't find any trace of that behavior in the
  code.
 
  The closest thing I have is
 
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
  but it simply logs the message size and content and the log level is
 trace.
 
  [2] could you please confirm if such a producer-side check exists?
 
 
  thanks!
 
  Alexis
 



Re: message size limit

2014-08-27 Thread Alexis Midon
am I miss reading this loop:
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269

it seems like all messages from `validMessages` (which is
ByteBufferMessageSet) are NOT appended if one of the message size exceeds
the limit.

I hope I'm missing something.



On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon 
alexis.mi...@airbedandbreakfast.com wrote:

 Hi Jun,

 thanks for you answer.
 Unfortunately the size won't help much, I'd like to see the actual message
 data.

 By the way what are the things to consider when deciding on
 `message.max.bytes` value?






 On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao jun...@gmail.com wrote:

 The message size check is currently only done on the broker. If you enable
 trace level logging in RequestChannel, you will see the produce request,
 which includes the size of each partition.

 Thanks,

 Jun


 On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon 
 alexis.mi...@airbedandbreakfast.com wrote:

  Hello,
 
  my brokers are reporting that some received messages exceed the
  `message.max.bytes` value.
  I'd like to know what producers are at fault but It is pretty much
  impossible:
  - the brokers don't log the content of the rejected messages
  - the log messages do not contain the IP of the producers
  - on the consumer side, no exception is thrown (afaik it is because
 Ack-0
  is used). The only kind of notification is to closed the connection.
 
  [1] Do you have any suggestions to track down the guilty producers or
 find
  out the message content?
 
  Even though it makes total sense to have the limit defined and applied
 on
  the brokers, I was thinking that this check could also be applied by the
  producers. Some google results suggest that `message.max.bytes` might be
  used by the producers but I can't find any trace of that behavior in the
  code.
 
  The closest thing I have is
 
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
  but it simply logs the message size and content and the log level is
 trace.
 
  [2] could you please confirm if such a producer-side check exists?
 
 
  thanks!
 
  Alexis