Re: Checking the consumer lag when using manual partition assignment with the KafkaConsumer

2016-11-06 Thread Robert Metzger
Hi Matthias,

the bin/kafka-consumer-groups.sh utility is exactly what the user in
http://grokbase.com/t/kafka/users/163rrq9ne8/new-consumer-group-not-showing-up
has used.
It seems that the broker is not returning consumer groups that don't
participate in the consumer group balancing mechanism.


On Fri, Nov 4, 2016 at 6:07 PM, Matthias J. Sax 
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> Robert,
>
> you can use bin/kafka-consumer-groups.sh instead.
>
> > bin/kafka-consumer-offset-checker.sh [2016-11-04 10:05:07,852] WARN
> > WARNING: ConsumerOffsetChecker is deprecated and will be dropped in
> > releases following 0.9.0. Use ConsumerGroupCommand instead.
> > (kafka.tools.ConsumerOffsetChecker$)
>
>
> - -Matthias
>
> On 11/3/16 4:07 AM, Robert Metzger wrote:
> > Hi,
> >
> > some Flink users recently noticed that they can not check the
> > consumer lag when using Flink's kafka consumer [1]. According to
> > this discussion on the Kafka user list [2] the
> > kafka-consumer-groups.sh utility doesn't work with KafkaConsumers
> > with manual partition assignment.
> >
> > Is there a way to get the consumer lag for a consumer that
> > regularly commits its offsets to the broker?
> >
> > Regards, Robert
> >
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-5001 [2]
> > http://grokbase.com/t/kafka/users/163rrq9ne8/new-consumer-
> group-not-showing-up
> >
> -BEGIN PGP SIGNATURE-
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYHMBkAAoJECnhiMLycopPJvEQALdx2dOVa30QsDBtNIJAzwwJ
> tk+ZS3lgefNkAWYBMccNTDFqDH0OtLKUhi9Pproye0LTItlaNGYgemC7kzzPnlDv
> R6PrJe+GzRNd13eNSBzb7kJ9nxodsZ9flpUsMrX3k2g3v6w6cwV7Xon1LlHV5HtO
> xbJ0ei41lNcBFjbgYFbwyzsS+W3FQt1FTw/we4mL3TwaBYwHG5pfByK0eC0NUEEy
> Q2rSPnZ8701igC4RJoWRIe0QibzZSdzEk/S8rWpNxTrmG/TaBhM2mObbf96iqM0J
> VvJcUiuUpAY/CKiOY6tjCGRpCAGYdrdPismaTQA6hGxzr+91l3eXebn1lFEw2Auc
> E0fbkZ5XoNo/JQaHcDA2M+iwVZZMIOP/yfierkG7g3XmYfbCpIKd9+AU79GYGwe+
> nhCUB2BbhEi+saNharAHtm6ai1T55bmSnCBYiz14wJpEYYzDJv1BnRN9YvV3Topl
> GNMSK3/zWIALXPp8fFE0JFxX0syaGRqMS5XqlsnklulCaeB/M1gWwGyKxYF+cAWG
> 5F5kuKGvr8YgNmRPSOAO8sastbL9se1Kw6uDQj0eN8kimbXptCw+LtEOlQ3z9cgb
> IQcbl33oJUcUJdUejQ34rGK84P59CVzIt2R7r722QwJYE9pWBIR9S/+Ft4QnkBaE
> VnmHxH/PmQrXMTl8DYs1
> =wVbw
> -END PGP SIGNATURE-
>


Checking the consumer lag when using manual partition assignment with the KafkaConsumer

2016-11-03 Thread Robert Metzger
Hi,

some Flink users recently noticed that they can not check the consumer lag
when using Flink's kafka consumer [1]. According to this discussion on the
Kafka user list [2] the kafka-consumer-groups.sh utility doesn't work with
KafkaConsumers with manual partition assignment.

Is there a way to get the consumer lag for a consumer that regularly
commits its offsets to the broker?

Regards,
Robert


[1] https://issues.apache.org/jira/browse/FLINK-5001
[2]
http://grokbase.com/t/kafka/users/163rrq9ne8/new-consumer-group-not-showing-up


Re: Blocked in KafkaConsumer.commitOffsets

2016-06-15 Thread Robert Metzger
Hi,

I've looked at this issue already at the Flink list and recommended Hironori
to post here. It seems that the consumer is not returning from the poll()
call, that's why the commitOffsets() method can not enter the synchronized
block.
The KafkaConsumer is logging the following statements:

2016-06-10 20:29:53,677 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.
2016-06-10 20:29:53,678 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.
2016-06-10 20:29:53,679 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.

2016-06-10 20:56:53,982 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.


I guess that the poll() call is not returning within the given timeout
while trying to reconnect to the brokers?


On Wed, Jun 15, 2016 at 2:41 PM, Hironori Ogibayashi 
wrote:

> Hello,
>
> I am running stream processing job with Kafka and Flink.
> Flink reads records from Kafka.
>
> My software versions are:
> - Kafka broker: 0.9.0.2.4 (HDP 2.4.0.0 version)
> - Kafka client library: 0.9.0.1
> - Flink: 1.0.3
>
> Now I have problem that Flink job is sometimes blocked and consumer lag
> is increasing.
> I got thread dump during the situation.
>
> This is the blocked thread. Looks like blocked in
> KafkaConsumer.commitOffsets.
>
> 
> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
> prio=10 tid=0x7f2b14010800 nid=0x1b89a waiting for monitor entry
> [0x7f2b3ddfc000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
> - waiting to lock <0x000659111b58> (a
> org.apache.kafka.clients.consumer.KafkaConsumer)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
> - locked <0x000659111cc8> (a java.lang.Object)
> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> ---
>
> And lock 0x000659111b58 is held by the following thread.
>
> ---
> "Thread-9" daemon prio=10 tid=0x7f2b2440d000 nid=0x1b838 runnable
> [0x7f2b3dbfa000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
> - locked <0x000659457dc8> (a sun.nio.ch.Util$2)
> - locked <0x000659457db8> (a
> java.util.Collections$UnmodifiableSet)
> - locked <0x000659457108> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
> at
> org.apache.kafka.common.network.Selector.select(Selector.java:425)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
> at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
> - locked <0x000659111b58> (a
> org.apache.kafka.clients.consumer.KafkaConsumer)
> ---
>
> I am wondering why Flink's kafka consumer is blocked and any advice
> would be appreciated.
>
> Thanks,
> Hironori Ogibayashi
>


Re: NotLeaderForPartitionException: This server is not the leader for that topic-partition.

2016-02-08 Thread Robert Metzger
Sorry for reviving this old mailing list discussion.

I'm facing a similar issue while running a load test with many small topics
(100 topics) with 4 partitions each.
There is also a Flink user who's facing the issue:
https://issues.apache.org/jira/browse/FLINK-3066

Are you also writing into many topics at the same time?


On Sat, Nov 14, 2015 at 4:07 PM, Sean Morris (semorris) 
wrote:

> I have suddenly starting having issues where when I am producing data I
> occasionally get "NotLeaderForPartitionException: This server is not the
> leader for that topic-partition". I am using Kafka 2.10-0.8.2.1  with the
> new Producer class and no retries. If I add retries to the Producer
> properties I am ending up with duplicates on the consumer side.
>
> Any idea what can be the cause of the NotLeaderForPartitionException?
>
> Thanks.
>
>


Re: Broker Exception: Attempt to read with a maximum offset less than start offset

2016-01-27 Thread Robert Metzger
Yes, I've asked the user to test with the 0.9.0.0 release (I saw Gwen's
comment in KAFKA-725).

I have a potentially related question: Is it an issue that both Flink and
Gearpump* are not committing their offsets through the SimpleConsumer API?
Flink is directly committing the offsets into ZK (and maintaining them
internally).
I'm asking because I see some log statements like this:
WARN Partition [WordCount,22] on broker 0: No checkpointed highwatermark is
found for partition [WordCount,22] (kafka.cluster.Partition)

Reading from the Kafka server's code, it doesn't seem that the
commitOffsets() request updates any internal state (at least for version=0).

*I didn't find any call to commitOffsets() in their code.


On Wed, Jan 27, 2016 at 2:32 PM, Ismael Juma  wrote:

> Hi Manu and Robert,
>
> It would help to know if this still happens in trunk or the 0.9.0 branch.
>
> Ismael
> On 27 Jan 2016 13:05, "Robert Metzger"  wrote:
>
> > Hi Manu,
> >
> > in the streaming-benchmark, are seeing the issue only when reading with
> > Gearpump, or is it triggered by a different processing framework as well?
> >
> > I'm asking because there is a Flink user who is using Kafka 0.8.2.1 as
> well
> > who's reporting a very similar issue on SO:
> >
> >
> http://stackoverflow.com/questions/34982483/flink-streaming-job-switched-to-failed-status/34987963
> > .
> > His issue is also only present under load.
> >
> >
> >
> >
> > On Thu, Jan 21, 2016 at 2:28 AM, Manu Zhang 
> > wrote:
> >
> > > Hi,
> > >
> > > Any suggestions for this issue or do I need to provide more
> information ?
> > > Any links I can refer to would be also very helpful.
> > >
> > > Thanks,
> > > Manu Zhang
> > >
> > >
> > > On Tue, Jan 19, 2016 at 8:41 PM, Manu Zhang 
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > Is KAFKA-725 Broker Exception: Attempt to read with a maximum offset
> > less
> > > > than start offset <https://issues.apache.org/jira/browse/KAFKA-725>
> > > still
> > > > valid ? We are seeing a similar issue when we are carrying out the
> > > yahoo's
> > > > streaming-benchmarks <https://github.com/yahoo/streaming-benchmarks>
> > on
> > > a
> > > > 4-node cluster. Our issue id is
> > > > https://github.com/gearpump/gearpump/issues/1872.
> > > >
> > > > We are using Kafka scala-2.10-0.8.2.1. 4 brokers are installed on 4
> > nodes
> > > > with Zookeeper on 3 of them. On each node, 4 producers produce data
> to
> > a
> > > > Kafka topic with 4 partitions and 1 replica. Each producer has a
> > > throughput
> > > > of 17K messages/s. 4 consumers are distributed (not necessarily
> evenly)
> > > > across the cluster and consume from Kafka as fast as possible.
> > > >
> > > > I tried logging the produced offsets (with callback in send) and
> found
> > > > that the "start offset" already existed when the consumer failed with
> > the
> > > > fetch exception.
> > > >
> > > > This happened only when producers are producing at high throughput.
> > > >
> > > > Any ideas would be much appreciated.
> > > >
> > > > Thanks,
> > > > Manu Zhang
> > > >
> > >
> >
>


Re: Broker Exception: Attempt to read with a maximum offset less than start offset

2016-01-27 Thread Robert Metzger
Hi Manu,

in the streaming-benchmark, are seeing the issue only when reading with
Gearpump, or is it triggered by a different processing framework as well?

I'm asking because there is a Flink user who is using Kafka 0.8.2.1 as well
who's reporting a very similar issue on SO:
http://stackoverflow.com/questions/34982483/flink-streaming-job-switched-to-failed-status/34987963
.
His issue is also only present under load.




On Thu, Jan 21, 2016 at 2:28 AM, Manu Zhang  wrote:

> Hi,
>
> Any suggestions for this issue or do I need to provide more information ?
> Any links I can refer to would be also very helpful.
>
> Thanks,
> Manu Zhang
>
>
> On Tue, Jan 19, 2016 at 8:41 PM, Manu Zhang 
> wrote:
>
> > Hi all,
> >
> > Is KAFKA-725 Broker Exception: Attempt to read with a maximum offset less
> > than start offset 
> still
> > valid ? We are seeing a similar issue when we are carrying out the
> yahoo's
> > streaming-benchmarks  on
> a
> > 4-node cluster. Our issue id is
> > https://github.com/gearpump/gearpump/issues/1872.
> >
> > We are using Kafka scala-2.10-0.8.2.1. 4 brokers are installed on 4 nodes
> > with Zookeeper on 3 of them. On each node, 4 producers produce data to a
> > Kafka topic with 4 partitions and 1 replica. Each producer has a
> throughput
> > of 17K messages/s. 4 consumers are distributed (not necessarily evenly)
> > across the cluster and consume from Kafka as fast as possible.
> >
> > I tried logging the produced offsets (with callback in send) and found
> > that the "start offset" already existed when the consumer failed with the
> > fetch exception.
> >
> > This happened only when producers are producing at high throughput.
> >
> > Any ideas would be much appreciated.
> >
> > Thanks,
> > Manu Zhang
> >
>


Re: SimpleConsumer.getOffsetsBefore() in 0.9 KafkaConsumer

2016-01-25 Thread Robert Metzger
Thank you for your replies Gwen and Jason.

Lets continue the discussion in the JIRA.


On Fri, Jan 22, 2016 at 8:37 PM, Jason Gustafson  wrote:

> The offset API is one of the known gaps in the new consumer. There is a
> JIRA (KAFKA-1332), but we might need a KIP to make that change now that the
> API is released. For now, Gwen's suggestion is the only way to do it.
>
> -Jason
>
> On Thu, Jan 21, 2016 at 8:22 PM, Gwen Shapira  wrote:
>
> > Hi Robert!
> >
> > Jason is the expert, and I hope he'll respond soon.
> >
> > Meanwhile: I think that you can do what you are trying to do by:
> > 1. call position() to get the current position you are consuming
> > 2. call seekToEnd() and then position(), which will give you the last
> > position at the point in which you called seekToEnd()
> > 3. Use another seek() call to go to the offset you really want to get to.
> >
> > I don't think there are current plans to add getOffsetsBefore, but maybe
> we
> > need it for the use-case you specified.
> > I think the developer mailing list (or a JIRA) will be a better place for
> > an API discussion.
> >
> > Gwen
> >
> > On Wed, Jan 20, 2016 at 3:09 AM, Robert Metzger 
> > wrote:
> >
> > > Hi,
> > >
> > > I'm currently looking into implementing a load shedding strategy into
> > > Flink's Kafka consumer.
> > >
> > > Therefore, I would like to allow users to request the latest offset of
> > the
> > > subscribed TopicPartitions, so that they can
> > > a) determine the lag
> > > b) maybe set the next fetch offset to the latest offset (or anything in
> > > between)
> > >
> > > With the SimpleConsumer, there is the getOffsetsBefore() method, which
> > > allows to request the largest offset.
> > > For the new KafkaConsumer, I only found the seekToEnd() and
> > > seekToBeginning() methods, but they don't allow to get the actual end
> or
> > > beginning.
> > >
> > >
> > > Are there plans to add such an API to the KafkaConsumer again?
> > > Can you suggest any workarounds?
> > >
> > > Regards,
> > > Robert
> > >
> >
>


SimpleConsumer.getOffsetsBefore() in 0.9 KafkaConsumer

2016-01-20 Thread Robert Metzger
Hi,

I'm currently looking into implementing a load shedding strategy into
Flink's Kafka consumer.

Therefore, I would like to allow users to request the latest offset of the
subscribed TopicPartitions, so that they can
a) determine the lag
b) maybe set the next fetch offset to the latest offset (or anything in
between)

With the SimpleConsumer, there is the getOffsetsBefore() method, which
allows to request the largest offset.
For the new KafkaConsumer, I only found the seekToEnd() and
seekToBeginning() methods, but they don't allow to get the actual end or
beginning.


Are there plans to add such an API to the KafkaConsumer again?
Can you suggest any workarounds?

Regards,
Robert


Re: Minimal KakfaConsumer in Scala fails compilation with `could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]`

2016-01-04 Thread Robert Metzger
Hi Peter,

The problem is that you have the DataSet and DataStream package imports.
Remove the import from the DataSet API (import org.apache.flink.api.scala._)
to make the example work.

On Sun, Dec 20, 2015 at 3:20 PM, Peter Vandenabeele 
wrote:

> Hi,
>
> I am trying to write a minimal Kafka consumer in Scala and got
> this far:
>
> ➜  scala git:(kafka_exp_001) ✗ cat KafkaConsumer.scala
> package io.allthingsdata.kafkaConsumer
>
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
> import org.apache.flink.api.common.typeinfo._
> //import org.apache.flink.streaming.api.windowing.time.Time
>
> object KafkaConsumer {
>   def main(args: Array[String]) {
>
> // set up the execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> val valueDeserializer = new SimpleStringSchema()
> val props = new java.util.Properties()
>
> // create a Kafka Consumer
> val kafkaConsumer: FlinkKafkaConsumer082[String] =
>   new FlinkKafkaConsumer082(
> "Topic1",
> valueDeserializer,
> props
>   )
>
> // get input data
> val messageStream: DataStream[String] = env.addSource(kafkaConsumer) //
> supply typeInfo ?
>
> // do something with it
> val messages = messageStream.
>   map ( s => "Kafka and Flink say: $s" )
>
> // execute and print result
> messages.print()
>   }
> }
>
> /*  based on this Java example code
> ParameterTool parameterTool = ParameterTool.fromArgs(args);
>
> DataStream messageStream = env
>   .addSource(new FlinkKafkaConsumer082<>(
> parameterTool.getRequired("topic"),
> new SimpleStringSchema(),
> parameterTool.getProperties()));
>
> Once a DataStream is created, you can transform it as you like. For
> example, let us pad every word with a fixed prefix, and print to stdout:
>
> messageStream
>   .rebalance()
>   .map ( s -> “Kafka and Flink says: ” + s)
>   .print();
> */
>
>
> When trying to compile in sbt I get these error messages:
>
> ```
> > compile
> [info] Compiling 1 Scala source to
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes...
> [error]
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28:
> could not find implicit value for evidence parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[String]
> [error] val messageStream: DataStream[String] =
> env.addSource(kafkaConsumer) // supply typeInfo ?
> [error]  ^
> [error]
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32:
> could not find implicit value for evidence parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[String]
> [error]   map ( s => "Kafka and Flink say: $s" )
> [error]   ^
> [error] two errors found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 0 s, completed Dec 19, 2015 5:11:56 PM
> ```
>
> When inspecting DataStreamSource addSource, I read:
>
> /**
>  * Ads a data source with a custom type information thus opening a
>  * {@link DataStream}. Only in very special cases does the user need to
>  * support type information. Otherwise use
>  * {@link
> #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
>  *
>
>
> I did try to supply a `BasicTypeInfo.STRING_TYPE_INFO` as typeInfo
> argument, but that does not solve it.
>
> When trying:
>
> `val messageStream: DataStream[String] = env.addSource(kafkaConsumer,
> BasicTypeInfo.STRING_TYPE_INFO) // supply typeInfo ?`
>
> I get:
>
> > compile
> [info] Compiling 1 Scala source to
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes...
> [error]
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28:
> overloaded method value addSource with alternatives:
> [error]   [T](function:
>
> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T]
> => Unit)(implicit evidence$17: scala.reflect.ClassTag[T], implicit
> evidence$18:
>
> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
> 
> [error]   [T](function:
> org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit
> evidence$15: scala.reflect.ClassTag[T], implicit evidence$16:
>
> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
> [error]  cannot be applied to
> (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082[String],
> org.apache.flink.api.common.typeinfo.BasicTypeInfo[String])
> [error] val messageStream: DataStream[String] =
> env.addSource(kafkaConsumer, BasicTypeInfo.STRING_TYPE_INFO) // supply
> ty

Re: Consumer that consumes only local partition?

2015-08-04 Thread Robert Metzger
Sorry for the very late reply ...

The performance issue was not caused by network latency. I had a job like
this:
FlinkKafkaConsumer --> someSimpleOperation --> FlinkKafkaProducer.

I thought that our FlinkKafkaConsumer is slow, but actually our
FlinkKafkaProducer was using the old producer API of Kafka. Switching to
the new producer API of Kafka greatly improved our writing performance to
Kafka. Flink was slowing down the KafkaConsumer because of the producer.

Since we are already talking about performance, let me ask you the
following question:
I am using Kafka and Flink on a HDP 2.2 cluster (with 40 machines). What
would you consider a good read/write performance for 8-byte messages on the
following setup?
- 40 brokers,
- topic with 120 partitions
- 120 reading threads (on 30 machines)
- 120 writing threads (on 30 machines)

I'm getting a write throughput of ~75k elements/core/second and a read
throughput of ~50k el/c/s.
When I'm stopping the writers, the read throughput goes up to 130k.
I would expect a higher throughput than (8*75000) / 1024 = 585.9 kb/sec per
partition .. or are the messages too small and the overhead is very high.

Which system out there would you recommend for getting reference
performance numbers? Samza, Spark, Storm?


On Wed, Jul 15, 2015 at 7:20 PM, Gwen Shapira  wrote:

> This is not something you can use the consumer API to simply do easily
> (consumers don't have locality notion).
> I can imagine using Kafka's low-level API calls to get a list of
> partitions and the lead replica, figuring out which are local and
> using those - but that sounds painful.
>
> Are you 100% sure the performance issue is due to network latency? If
> not, you may want to start optimizing somewhere more productive :)
> Kafka brokers and clients both have Metrics that may help you track
> where the performance issues are coming from.
>
> Gwen
>
> On Wed, Jul 15, 2015 at 9:24 AM, Robert Metzger 
> wrote:
> > Hi Shef,
> >
> > did you resolve this issue?
> > I'm facing some performance issues and I was wondering whether reading
> > locally would resolve them.
> >
> > On Mon, Jun 22, 2015 at 11:43 PM, Shef  wrote:
> >
> >> Noob question here. I want to have a single consumer for each partition
> >> that consumes only the messages that have been written locally. In other
> >> words, I want the consumer to access the local disk and not pull
> anything
> >> across the network. Possible?
> >>
> >> How can I discover which partitions are local?
> >>
> >>
> >>
>


Re: Consumer that consumes only local partition?

2015-07-15 Thread Robert Metzger
Hi Shef,

did you resolve this issue?
I'm facing some performance issues and I was wondering whether reading
locally would resolve them.

On Mon, Jun 22, 2015 at 11:43 PM, Shef  wrote:

> Noob question here. I want to have a single consumer for each partition
> that consumes only the messages that have been written locally. In other
> words, I want the consumer to access the local disk and not pull anything
> across the network. Possible?
>
> How can I discover which partitions are local?
>
>
>


Manually controlling the start offset in the high level API

2015-04-22 Thread Robert Metzger
Hi,

I'm a committer at the Apache Flink project.
I'm working on adding support for exactly-once semantics for Flink's stream
processing component.
Therefore, we want to keep track of the read offset from the KafkaSource
and restart the consumption from the last known offset (tracked within
Flink).

Over the past few weeks, we tried implementing our own low level Kafka
consumer which allows setting our own offsets on recovery. It seems that
our own consumer is somewhat working, but our users need basically the same
features as the high level consumer.
For example they want to set various custom configuration values (like the
max fetch size) and they would like to use the ./kafka-run-class.sh
kafka.tools.ConsumerOffsetChecker tool for getting the "lag" of the
consumer.

I'm not sure if its the best approach for us to re-implement a well
engineered consumer because we can not pass one offset in the beginning.

If found the link to the upcoming Consumer API (in 0.9) that allows us to
manually manage the offset (
http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
).
Will the new Consumer API be compatible with older Kafka installations, or
will we need to implement our own Consumer for pre-0.9 users in any case?
I know its always hard to predict release dates, but can you give me a
rough estimate when you'll release 0.9 ?

Is there a way to "hack" something for making our users happy in the
meantime?
For the "hack" I had two ideas:
a) copy/extend your consumer code to pass the offset
b) set the offset manually in Zookeeper before starting the consumer.

I also have a quick conceptual question: The committing of offsets (either
manual through commitOffsets() or the autocommit feature) is only about
maintaining the offset in the broker or in Zookeeper for that consumer.
Kafka is using this offset to recover in case of a consumer restart.
The offset committing is independent of the log compression/message
retention.
Are these assumptions correct?


Best regards,
Robert