Re: Issue in Kafka 2.0.0 ?

2018-08-20 Thread Ted Yu
Sent out a PR #5543 which fixes the reported bug,
with 
StreamToTableJoinScalaIntegrationTestImplicitSerdes.testShouldCountClicksPerRegion
modified adding the filter methods.

FYI

On Mon, Aug 20, 2018 at 5:26 PM Ted Yu  wrote:

> Thanks for pointing me to that PR.
>
> I applied the PR locally but still got:
>
> org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
> > testShouldCountClicksPerRegion FAILED
> java.lang.StackOverflowError
>
> I can go over that PR to see what can be referenced for solving this bug.
>
> FYI
>
> On Mon, Aug 20, 2018 at 5:21 PM Guozhang Wang  wrote:
>
>> Is this related to the fix https://github.com/apache/kafka/pull/5502 that
>> is currently being worked on?
>>
>>
>> Guozhang
>>
>> On Mon, Aug 20, 2018 at 5:19 PM, Matthias J. Sax 
>> wrote:
>>
>> > Thanks for reporting and for creating the ticket!
>> >
>> > -Matthias
>> >
>> > On 8/20/18 5:17 PM, Ted Yu wrote:
>> > > I was able to reproduce what you saw with modification
>> > > to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>> > > I have logged KAFKA-7316 and am looking for a fix.
>> > >
>> > > FYI
>> > >
>> > > On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel 
>> > wrote:
>> > >
>> > >> Isn’t that a bug then? Or can I fix my code somehow?
>> > >>
>> > >>
>> > >>
>> > >> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com
>> > > >> yuzhih...@gmail.com>) wrote:
>> > >>
>> > >> I think what happened in your use case was that the following
>> implicit
>> > >> from ImplicitConversions.scala kept wrapping the resultant KTable
>> from
>> > >> filter():
>> > >>
>> > >> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
>> > >>
>> > >> leading to stack overflow.
>> > >>
>> > >> Cheers
>> > >>
>> > >> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <
>> dru...@arrcus.com>
>> > >> wrote:
>> > >>
>> > >>> Hi,
>> > >>>
>> > >>> I’m using the org.kafka.streams.scala that was released with version
>> > >>> 2.0.0. I’m getting a StackOverflowError as follows:
>> > >>>
>> > >>> java.lang.StackOverflowError
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>> .
>> > >>> .
>> > >>> .
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>>
>> > >>> The Scala version I’m using is 2.11.11 and the code leading to the
>> > error
>> > >>> is as follows (particularly the .filter).
>> > >>>
>> > >>> val builder = new StreamsBuilder
>> > >>>
>> > >>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
>> > >>>
>> > >>> val customers = args.config.keys
>> > >>>
>> > >>> val predicates = customers.map { customerId =>
>> > >>> (_: Array[Byte], message: CaseClassA) => message.customerId ==
>> > customerId
>> > >>> }.toSeq
>> > >>>
>> > >>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
>> > >>>
>> > >>> val y = Printed.toSysOut[Windowed[Key], Long]
>> > >>>
>> > >>> customerIdToStream.foreach { case (customerId, customerStream) =>
>> > >>> val customerConfig = args.config(customerId)
>> > >>> customerStream
>> > >>> .flatMap { case (_, message) =>
>> > >>> message.objects.map {
>> > >>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
>> > >>> }
>> > >>> }
>> > >>> .groupByKey
>> > >>>
>> > >>>
>> > >> .windowedBy(TimeWindows.of(customerConfig.windowSize).
>> > advanceBy(customerConfig.sliderSize))
>> > >>> .count()
>> > >>> .filter { case (_, count) => count >=
>> > >>> customerConfig.frequencyThreshold }
>> > >>> .toStream
>> > >>> .print(y)
>> > >>> }
>> > >>>
>> > >>> Is this a bug with the new scala module related to:
>> > >>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
>> > >>> Or am I doing something wrong?
>> > >>>
>> > >>> Thanks,
>> > >>> Druhin
>> > >>>
>> > >>
>> > >
>> >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>


Re: Issue in Kafka 2.0.0 ?

2018-08-20 Thread Ted Yu
Hi,
I am aware that more than one method from KTable.scala have this issue.

Once I find a solution, I will apply the fix to the methods you listed.

Cheers

On Mon, Aug 20, 2018 at 5:23 PM Druhin Sagar Goel  wrote:

> Thanks a lot Ted!
>
> FYI - The issue is not limited to the
> org.apache.kafka.streams.scala.KTable.filter. It also happens with
> org.apache.kafka.streams.scala.KTable.filterNot,
> org.apache.kafka.streams.scala.KStream.foreach and
> org.apache.kafka.streams.scala.KStream.peek.
>
> - Druhin
>
>
> On August 20, 2018 at 5:19:36 PM, Matthias J. Sax (matth...@confluent.io
> ) wrote:
>
> Thanks for reporting and for creating the ticket!
>
> -Matthias
>
> On 8/20/18 5:17 PM, Ted Yu wrote:
> > I was able to reproduce what you saw with modification
> > to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> > I have logged KAFKA-7316 and am looking for a fix.
> >
> > FYI
> >
> > On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel 
> wrote:
> >
> >> Isn’t that a bug then? Or can I fix my code somehow?
> >>
> >>
> >>
> >> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com >> yuzhih...@gmail.com>) wrote:
> >>
> >> I think what happened in your use case was that the following implicit
> >> from ImplicitConversions.scala kept wrapping the resultant KTable from
> >> filter():
> >>
> >> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
> >>
> >> leading to stack overflow.
> >>
> >> Cheers
> >>
> >> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel 
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> I’m using the org.kafka.streams.scala that was released with version
> >>> 2.0.0. I’m getting a StackOverflowError as follows:
> >>>
> >>> java.lang.StackOverflowError
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>> .
> >>> .
> >>> .
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>>
> >>> The Scala version I’m using is 2.11.11 and the code leading to the
> error
> >>> is as follows (particularly the .filter).
> >>>
> >>> val builder = new StreamsBuilder
> >>>
> >>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
> >>>
> >>> val customers = args.config.keys
> >>>
> >>> val predicates = customers.map { customerId =>
> >>> (_: Array[Byte], message: CaseClassA) => message.customerId ==
> customerId
> >>> }.toSeq
> >>>
> >>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
> >>>
> >>> val y = Printed.toSysOut[Windowed[Key], Long]
> >>>
> >>> customerIdToStream.foreach { case (customerId, customerStream) =>
> >>> val customerConfig = args.config(customerId)
> >>> customerStream
> >>> .flatMap { case (_, message) =>
> >>> message.objects.map {
> >>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
> >>> }
> >>> }
> >>> .groupByKey
> >>>
> >>>
> >>
> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
> >>> .count()
> >>> .filter { case (_, count) => count >=
> >>> customerConfig.frequencyThreshold }
> >>> .toStream
> >>> .print(y)
> >>> }
> >>>
> >>> Is this a bug with the new scala module related to:
> >>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> >>> Or am I doing something wrong?
> >>>
> >>> Thanks,
> >>> Druhin
> >>>
> >>
> >
>
>


Re: Issue in Kafka 2.0.0 ?

2018-08-20 Thread Ted Yu
Thanks for pointing me to that PR.

I applied the PR locally but still got:

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
> testShouldCountClicksPerRegion FAILED
java.lang.StackOverflowError

I can go over that PR to see what can be referenced for solving this bug.

FYI

On Mon, Aug 20, 2018 at 5:21 PM Guozhang Wang  wrote:

> Is this related to the fix https://github.com/apache/kafka/pull/5502 that
> is currently being worked on?
>
>
> Guozhang
>
> On Mon, Aug 20, 2018 at 5:19 PM, Matthias J. Sax 
> wrote:
>
> > Thanks for reporting and for creating the ticket!
> >
> > -Matthias
> >
> > On 8/20/18 5:17 PM, Ted Yu wrote:
> > > I was able to reproduce what you saw with modification
> > > to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> > > I have logged KAFKA-7316 and am looking for a fix.
> > >
> > > FYI
> > >
> > > On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel 
> > wrote:
> > >
> > >> Isn’t that a bug then? Or can I fix my code somehow?
> > >>
> > >>
> > >>
> > >> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com > >> yuzhih...@gmail.com>) wrote:
> > >>
> > >> I think what happened in your use case was that the following implicit
> > >> from ImplicitConversions.scala kept wrapping the resultant KTable from
> > >> filter():
> > >>
> > >> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
> > >>
> > >> leading to stack overflow.
> > >>
> > >> Cheers
> > >>
> > >> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel  >
> > >> wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> I’m using the org.kafka.streams.scala that was released with version
> > >>> 2.0.0. I’m getting a StackOverflowError as follows:
> > >>>
> > >>> java.lang.StackOverflowError
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>> .
> > >>> .
> > >>> .
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>>
> > >>> The Scala version I’m using is 2.11.11 and the code leading to the
> > error
> > >>> is as follows (particularly the .filter).
> > >>>
> > >>> val builder = new StreamsBuilder
> > >>>
> > >>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
> > >>>
> > >>> val customers = args.config.keys
> > >>>
> > >>> val predicates = customers.map { customerId =>
> > >>> (_: Array[Byte], message: CaseClassA) => message.customerId ==
> > customerId
> > >>> }.toSeq
> > >>>
> > >>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
> > >>>
> > >>> val y = Printed.toSysOut[Windowed[Key], Long]
> > >>>
> > >>> customerIdToStream.foreach { case (customerId, customerStream) =>
> > >>> val customerConfig = args.config(customerId)
> > >>> customerStream
> > >>> .flatMap { case (_, message) =>
> > >>> message.objects.map {
> > >>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
> > >>> }
> > >>> }
> > >>> .groupByKey
> > >>>
> > >>>
> > >> .windowedBy(TimeWindows.of(customerConfig.windowSize).
> > advanceBy(customerConfig.sliderSize))
> > >>> .count()
> > >>> .filter { case (_, count) => count >=
> > >>> customerConfig.frequencyThreshold }
> > >>> .toStream
> > >>> .print(y)
> > >>> }
> > >>>
> > >>> Is this a bug with the new scala module related to:
> > >>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> > >>> Or am I doing something wrong?
> > >>>
> > >>> Thanks,
> > >>> Druhin
> > >>>
> > >>
> > >
> >
> >
>
>
> --
> -- Guozhang
>


Re: Issue in Kafka 2.0.0 ?

2018-08-20 Thread Druhin Sagar Goel
Thanks a lot Ted!

FYI - The issue is not limited to the 
org.apache.kafka.streams.scala.KTable.filter. It also happens with 
org.apache.kafka.streams.scala.KTable.filterNot, 
org.apache.kafka.streams.scala.KStream.foreach and 
org.apache.kafka.streams.scala.KStream.peek.

- Druhin


On August 20, 2018 at 5:19:36 PM, Matthias J. Sax 
(matth...@confluent.io) wrote:

Thanks for reporting and for creating the ticket!

-Matthias

On 8/20/18 5:17 PM, Ted Yu wrote:
> I was able to reproduce what you saw with modification
> to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> I have logged KAFKA-7316 and am looking for a fix.
>
> FYI
>
> On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel  wrote:
>
>> Isn’t that a bug then? Or can I fix my code somehow?
>>
>>
>>
>> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com> yuzhih...@gmail.com>) wrote:
>>
>> I think what happened in your use case was that the following implicit
>> from ImplicitConversions.scala kept wrapping the resultant KTable from
>> filter():
>>
>> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
>>
>> leading to stack overflow.
>>
>> Cheers
>>
>> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel 
>> wrote:
>>
>>> Hi,
>>>
>>> I’m using the org.kafka.streams.scala that was released with version
>>> 2.0.0. I’m getting a StackOverflowError as follows:
>>>
>>> java.lang.StackOverflowError
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> .
>>> .
>>> .
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>>
>>> The Scala version I’m using is 2.11.11 and the code leading to the error
>>> is as follows (particularly the .filter).
>>>
>>> val builder = new StreamsBuilder
>>>
>>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
>>>
>>> val customers = args.config.keys
>>>
>>> val predicates = customers.map { customerId =>
>>> (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
>>> }.toSeq
>>>
>>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
>>>
>>> val y = Printed.toSysOut[Windowed[Key], Long]
>>>
>>> customerIdToStream.foreach { case (customerId, customerStream) =>
>>> val customerConfig = args.config(customerId)
>>> customerStream
>>> .flatMap { case (_, message) =>
>>> message.objects.map {
>>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
>>> }
>>> }
>>> .groupByKey
>>>
>>>
>> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
>>> .count()
>>> .filter { case (_, count) => count >=
>>> customerConfig.frequencyThreshold }
>>> .toStream
>>> .print(y)
>>> }
>>>
>>> Is this a bug with the new scala module related to:
>>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
>>> Or am I doing something wrong?
>>>
>>> Thanks,
>>> Druhin
>>>
>>
>



Re: Issue in Kafka 2.0.0 ?

2018-08-20 Thread Matthias J. Sax
Thanks for reporting and for creating the ticket!

-Matthias

On 8/20/18 5:17 PM, Ted Yu wrote:
> I was able to reproduce what you saw with modification
> to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> I have logged KAFKA-7316 and am looking for a fix.
> 
> FYI
> 
> On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel  wrote:
> 
>> Isn’t that a bug then? Or can I fix my code somehow?
>>
>>
>>
>> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com> yuzhih...@gmail.com>) wrote:
>>
>> I think what happened in your use case was that the following implicit
>> from ImplicitConversions.scala kept wrapping the resultant KTable from
>> filter():
>>
>> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
>>
>> leading to stack overflow.
>>
>> Cheers
>>
>> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel 
>> wrote:
>>
>>> Hi,
>>>
>>> I’m using the org.kafka.streams.scala that was released with version
>>> 2.0.0. I’m getting a StackOverflowError as follows:
>>>
>>> java.lang.StackOverflowError
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> .
>>> .
>>> .
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>>
>>> The Scala version I’m using is 2.11.11 and the code leading to the error
>>> is as follows (particularly the .filter).
>>>
>>> val builder = new StreamsBuilder
>>>
>>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
>>>
>>> val customers = args.config.keys
>>>
>>> val predicates = customers.map { customerId =>
>>> (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
>>> }.toSeq
>>>
>>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
>>>
>>> val y = Printed.toSysOut[Windowed[Key], Long]
>>>
>>> customerIdToStream.foreach { case (customerId, customerStream) =>
>>> val customerConfig = args.config(customerId)
>>> customerStream
>>> .flatMap { case (_, message) =>
>>> message.objects.map {
>>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
>>> }
>>> }
>>> .groupByKey
>>>
>>>
>> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
>>> .count()
>>> .filter { case (_, count) => count >=
>>> customerConfig.frequencyThreshold }
>>> .toStream
>>> .print(y)
>>> }
>>>
>>> Is this a bug with the new scala module related to:
>>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
>>> Or am I doing something wrong?
>>>
>>> Thanks,
>>> Druhin
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Issue in Kafka 2.0.0 ?

2018-08-20 Thread Ted Yu
I was able to reproduce what you saw with modification
to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
I have logged KAFKA-7316 and am looking for a fix.

FYI

On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel  wrote:

> Isn’t that a bug then? Or can I fix my code somehow?
>
>
>
> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com yuzhih...@gmail.com>) wrote:
>
> I think what happened in your use case was that the following implicit
> from ImplicitConversions.scala kept wrapping the resultant KTable from
> filter():
>
> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
>
> leading to stack overflow.
>
> Cheers
>
> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel 
> wrote:
>
> > Hi,
> >
> > I’m using the org.kafka.streams.scala that was released with version
> > 2.0.0. I’m getting a StackOverflowError as follows:
> >
> > java.lang.StackOverflowError
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> > .
> > .
> > .
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >
> > The Scala version I’m using is 2.11.11 and the code leading to the error
> > is as follows (particularly the .filter).
> >
> > val builder = new StreamsBuilder
> >
> > val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
> >
> > val customers = args.config.keys
> >
> > val predicates = customers.map { customerId =>
> > (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
> > }.toSeq
> >
> > val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
> >
> > val y = Printed.toSysOut[Windowed[Key], Long]
> >
> > customerIdToStream.foreach { case (customerId, customerStream) =>
> > val customerConfig = args.config(customerId)
> > customerStream
> > .flatMap { case (_, message) =>
> > message.objects.map {
> > case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
> > }
> > }
> > .groupByKey
> >
> >
> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
> > .count()
> > .filter { case (_, count) => count >=
> > customerConfig.frequencyThreshold }
> > .toStream
> > .print(y)
> > }
> >
> > Is this a bug with the new scala module related to:
> > https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> > Or am I doing something wrong?
> >
> > Thanks,
> > Druhin
> >
>


Re: [ANNOUNCE] New Kafka PMC member: Dong Lin

2018-08-20 Thread Mayuresh Gharat
Congrats Dong !!!

Thanks,

Mayuresh

On Mon, Aug 20, 2018 at 1:36 PM Gwen Shapira  wrote:

> Congrats Dong Lin! Well deserved!
>
> On Mon, Aug 20, 2018, 3:55 AM Ismael Juma  wrote:
>
> > Hi everyone,
> >
> > Dong Lin became a committer in March 2018. Since then, he has remained
> > active in the community and contributed a number of patches, reviewed
> > several pull requests and participated in numerous KIP discussions. I am
> > happy to announce that Dong is now a member of the
> > Apache Kafka PMC.
> >
> > Congratulation Dong! Looking forward to your future contributions.
> >
> > Ismael, on behalf of the Apache Kafka PMC
> >
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Issue in Kafka 2.0.0 ?

2018-08-20 Thread Druhin Sagar Goel
Isn’t that a bug then? Or can I fix my code somehow?



On August 20, 2018 at 1:30:42 PM, Ted Yu 
(yuzhih...@gmail.com) wrote:

I think what happened in your use case was that the following implicit
from ImplicitConversions.scala kept wrapping the resultant KTable from
filter():

implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =

leading to stack overflow.

Cheers

On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel 
wrote:

> Hi,
>
> I’m using the org.kafka.streams.scala that was released with version
> 2.0.0. I’m getting a StackOverflowError as follows:
>
> java.lang.StackOverflowError
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> .
> .
> .
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>
> The Scala version I’m using is 2.11.11 and the code leading to the error
> is as follows (particularly the .filter).
>
> val builder = new StreamsBuilder
>
> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
>
> val customers = args.config.keys
>
> val predicates = customers.map { customerId =>
> (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
> }.toSeq
>
> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
>
> val y = Printed.toSysOut[Windowed[Key], Long]
>
> customerIdToStream.foreach { case (customerId, customerStream) =>
> val customerConfig = args.config(customerId)
> customerStream
> .flatMap { case (_, message) =>
> message.objects.map {
> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
> }
> }
> .groupByKey
>
> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
> .count()
> .filter { case (_, count) => count >=
> customerConfig.frequencyThreshold }
> .toStream
> .print(y)
> }
>
> Is this a bug with the new scala module related to:
> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> Or am I doing something wrong?
>
> Thanks,
> Druhin
>


Re: [ANNOUNCE] New Kafka PMC member: Dong Lin

2018-08-20 Thread Gwen Shapira
Congrats Dong Lin! Well deserved!

On Mon, Aug 20, 2018, 3:55 AM Ismael Juma  wrote:

> Hi everyone,
>
> Dong Lin became a committer in March 2018. Since then, he has remained
> active in the community and contributed a number of patches, reviewed
> several pull requests and participated in numerous KIP discussions. I am
> happy to announce that Dong is now a member of the
> Apache Kafka PMC.
>
> Congratulation Dong! Looking forward to your future contributions.
>
> Ismael, on behalf of the Apache Kafka PMC
>


Re: Issue in Kafka 2.0.0 ?

2018-08-20 Thread Ted Yu
I think what happened in your use case was that the following implicit
from ImplicitConversions.scala kept wrapping the resultant KTable from
filter():

  implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =

leading to stack overflow.

Cheers

On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel 
wrote:

> Hi,
>
> I’m using the org.kafka.streams.scala that was released with version
> 2.0.0. I’m getting a StackOverflowError as follows:
>
> java.lang.StackOverflowError
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>.
>.
>.
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>
> The Scala version I’m using is 2.11.11 and the code leading to the error
> is as follows (particularly the .filter).
>
> val builder = new StreamsBuilder
>
> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
>
> val customers = args.config.keys
>
> val predicates = customers.map { customerId =>
>   (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
> }.toSeq
>
> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
>
> val y = Printed.toSysOut[Windowed[Key], Long]
>
> customerIdToStream.foreach { case (customerId, customerStream) =>
>   val customerConfig = args.config(customerId)
>   customerStream
> .flatMap { case (_, message) =>
>   message.objects.map {
> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
>   }
> }
> .groupByKey
>
> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
> .count()
> .filter { case (_, count) => count >=
> customerConfig.frequencyThreshold }
> .toStream
> .print(y)
> }
>
> Is this a bug with the new scala module related to:
> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> Or am I doing something wrong?
>
> Thanks,
> Druhin
>


Issue in Kafka 2.0.0 ?

2018-08-20 Thread Druhin Sagar Goel
Hi,

I’m using the org.kafka.streams.scala that was released with version 2.0.0. I’m 
getting a StackOverflowError as follows:

java.lang.StackOverflowError
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
   .
   .
   .
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)

The Scala version I’m using is 2.11.11 and the code leading to the error is as 
follows (particularly the .filter).

val builder = new StreamsBuilder

val stream = builder.stream[Array[Byte], CaseClassA](args.topic)

val customers = args.config.keys

val predicates = customers.map { customerId =>
  (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
}.toSeq

val customerIdToStream = customers.zip(stream(predicates: _*)).toMap

val y = Printed.toSysOut[Windowed[Key], Long]

customerIdToStream.foreach { case (customerId, customerStream) =>
  val customerConfig = args.config(customerId)
  customerStream
.flatMap { case (_, message) =>
  message.objects.map {
case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
  }
}
.groupByKey

.windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
.count()
.filter { case (_, count) => count >= customerConfig.frequencyThreshold }
.toStream
.print(y)
}

Is this a bug with the new scala module related to: 
https://github.com/lightbend/kafka-streams-scala/issues/63 ?
Or am I doing something wrong?

Thanks,
Druhin


Re: Kafka SASL (Kerberos) - UNKNOWN_TOPIC_OR_PARTITION

2018-08-20 Thread Matt L
Thanks for the pointer Manikumar!

It looks like it was my interbroker communication, this was set to SSL so
inter broker users were coming in as ANONYMOUS. Once i changed this to
SASL_SSL I was able to publish/consume.

One remaining question i have is around
allow.everyone.if.no.acl.found=true. Despite setting this, I still see
"User:ANONYMOUS is Denied Operation= Describe" in authorizer log. Is there
something else that needs to be set to enable this? Or is "Describe" not
part of what this flag sets.

Thanks,
Matt

On Mon, Aug 20, 2018 at 5:03 AM, Manikumar 
wrote:

> is auto topic creation enabled on server? Any deny logs in
> kafka-authorizer.log?
> What is the inter-broker protocol configured? If it is SSL, SSL user should
> have ClusterAction permission.
>
> On Mon, Aug 20, 2018 at 3:33 PM Matt L  wrote:
>
> > Hello,
> >
> > Having trouble when publishing and consuming from a topic with
> > SASL_PLAINTEXT.
> >
> > Both ZK and Kafka start successfully, in logs I see SASL_PLAINTEXT on
> 9093
> > as being available.
> >
> > kafka.log:[2018-08-20 03:31:08,202] INFO Registered broker 1 at path
> > /brokers/ids/1 with addresses:
> >
> > EndPoint(kafkabroker1,9092,ListenerName(SSL),SSL),
> EndPoint(kafkabroker1,9093,ListenerName(SASL_PLAINTEXT),SASL_PLAINTEXT)
> > (kafka.utils.ZkUtils:70)
> >
> >
> > When i try to publish, e.g.
> >   bin/kafka-console-producer --broker-list kafkabroker1:9093 \
> >   --topic testtopic1 --producer.config /tmp/sasl-producer.properties
> >
> > I get:
> >
> > [2018-08-20 08:37:35,075] WARN Error while fetching metadata with
> > correlation id 3 : {testtopic1=UNKNOWN_TOPIC_OR_PARTITION}
> > (org.apache.kafka.clients.NetworkClient)
> > [2018-08-20 08:37:35,176] WARN Error while fetching metadata with
> > correlation id 4 : {testtopic1=UNKNOWN_TOPIC_OR_PARTITION}
> > (org.apache.kafka.clients.NetworkClient)
> > [2018-08-20 08:37:35,277] WARN Error while fetching metadata with
> > correlation id 5 : {testtopic1=UNKNOWN_TOPIC_OR_PARTITION}
> > (org.apache.kafka.clients.NetworkClient)
> >
> >
> > What I've verified:
> > 1) Client can resolve advertisted.listeners on all brokers. (prior to
> > enabling SASL, PLAINTEXT and SSL work with my set advertisted.listerners)
> > 2) In my sasl-producer.properties, im authenticating with user Kafka.
> Kafka
> > has been set as super user and in kafka-authorizer.log, I see "
> >
> > [2018-08-20 08:27:19,971] DEBUG principal = User:kafka is a super user,
> > allowing operation without checking acls. (kafka.authorizer.logger)
> > [2018-08-20 08:27:19,971] DEBUG Principal = User:kafka is Allowed
> Operation
> > = Describe from host = 10.10.52.1 on resource = Topic:testtopic1
> > (kafka.authorizer.logger)
> > [2018-08-20 08:27:20,072] DEBUG operation = Read on resource = Topic:
> > testtopic1 from host = 10.10.52.1 is Allow based on acl = User:kafka has
> > Allow permission for operations: All from hosts: *
> > (kafka.authorizer.logger)
> >
> > and from the kafka.log's in DEBUG:
> > [2018-08-20 09:35:48,364] DEBUG principal = User:kafka is a super user,
> > allowing operation without checking acls. (kafka.authorizer.logger:159)
> > [2018-08-20 09:35:48,364] DEBUG Principal = User:kafka is Allowed
> Operation
> > = Describe from host = 10.89.64.7 on resource = Topic:kerbtest1
> > (kafka.authorizer.logger:251)
> > [2018-08-20 09:35:48,364] DEBUG Completed
> >
> > request:{api_key=3,api_version=4,correlation_id=186,
> client_id=console-producer}
> > -- {topics=[kerbtest1],allow_auto_topic_creation=true} from connection
> > 10.10.52.1:9093-10.10.52.1
> > :42752;totalTime:0.461000,requestQueueTime:0.033000,
> localTime:0.346000,remoteTime:0.00,throttleTime:0.
> 033000,responseQueueTime:0.03,sendTime:0.066000,securityProtocol:SASL_
> PLAINTEXT,principal:User:kafka,listener:SASL_PLAINTEXT
> > (kafka.request.logger:193)
> >
> >
> > I'm assuming everything is okay from an ACL standpoint but when the
> client
> > cannot get the topic metadata from the returned advertisted listeners?
> > Any ideas on what I could be missing? Could this be something with ZK
> > setup/any authentication I am missing there?  I had even tried "
> > skipACL=yes"
> > but that did not change anything.
> >
> > Thanks!
> >
>


Missing Public Key

2018-08-20 Thread Fagan, Kyle D
The code signing key for Kafka version 1.1.1 [0] does not have a corresponding 
public key in the project’s KEYS [1] file. Therefore GPG verification produces 
the following error:



Is anyone able to update the KEYS file?

[0]  https://www.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz.asc
[1]  https://www.apache.org/dist/kafka/KEYS


Re: NetworkException exception while send/publishing records(Producer)

2018-08-20 Thread Pulkit Manchanda
Thanks Shantanu for your response.
The size is the business req. which might also increase later and I am
using max.request .size as 1Gb
I will try the compression of the data and see the performance.

and sharing the producer blocks the other threads as the data is big and
also it leads to resource leakage.

Pulkit

On Mon, Aug 20, 2018 at 1:23 AM, Shantanu Deshmukh 
wrote:

> Firstly, record size of 150mb is too big. I am quite sure your timeout
> exceptions are due to such a large record. There is a setting in producer
> and broker config which allows you to specify max message size in bytes.
> But still records each of size 150mb might lead to problems with increasing
> volume. You need to look at how you can reduce your message size.
>
> Kafka producer is thread safe and according to documentation you will get
> best performance if you share producer with multiple threads. Don't
> initiate a new kafka producer for each of your thread.
>
> On Fri, Aug 17, 2018 at 9:26 PM Pulkit Manchanda 
> wrote:
>
> > Hi All,
> >
> > I am sending the multiple records to the same topic.
> > I have the two approaches
> > 1)Sharing the producer with all the threads
> > 2) creating a new producer for every thread.
> >
> > I am sending the records of size ~150Mb on multiple request.
> > I am running the cluster and app on my local machine with 3 brokers and
> > max.request .size 1Gb.
> >
> > While sending the records using the following code with approach 2)
> > creating a new producer I am getting the network exception
> > and when I use the approach 1) sharing the producer. I get the same
> network
> > exception and sometimes Timeout too.
> > I looked onto google and StackOverflow but didn't find any solution to
> the
> > Network Exception.
> >
> > val metadata = producer.send(record).get()
> >
> >
> > java.util.concurrent.ExecutionException:
> > org.apache.kafka.common.errors.NetworkException: The server disconnected
> > before a response was received.
> > at
> >
> > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.
> valueOrError(FutureRecordMetadata.java:94)
> > at
> >
> > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(
> FutureRecordMetadata.java:64)
> > at
> >
> > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(
> FutureRecordMetadata.java:29)
> > at service.KafkaService.sendRecordToKafka(KafkaService.scala:65)
> >
> >
> > Any help will be appreciated.
> >
> > Thanks
> > Pulkit
> >
>


Re: Kafka SASL (Kerberos) - UNKNOWN_TOPIC_OR_PARTITION

2018-08-20 Thread Manikumar
is auto topic creation enabled on server? Any deny logs in
kafka-authorizer.log?
What is the inter-broker protocol configured? If it is SSL, SSL user should
have ClusterAction permission.

On Mon, Aug 20, 2018 at 3:33 PM Matt L  wrote:

> Hello,
>
> Having trouble when publishing and consuming from a topic with
> SASL_PLAINTEXT.
>
> Both ZK and Kafka start successfully, in logs I see SASL_PLAINTEXT on 9093
> as being available.
>
> kafka.log:[2018-08-20 03:31:08,202] INFO Registered broker 1 at path
> /brokers/ids/1 with addresses:
>
> EndPoint(kafkabroker1,9092,ListenerName(SSL),SSL),EndPoint(kafkabroker1,9093,ListenerName(SASL_PLAINTEXT),SASL_PLAINTEXT)
> (kafka.utils.ZkUtils:70)
>
>
> When i try to publish, e.g.
>   bin/kafka-console-producer --broker-list kafkabroker1:9093 \
>   --topic testtopic1 --producer.config /tmp/sasl-producer.properties
>
> I get:
>
> [2018-08-20 08:37:35,075] WARN Error while fetching metadata with
> correlation id 3 : {testtopic1=UNKNOWN_TOPIC_OR_PARTITION}
> (org.apache.kafka.clients.NetworkClient)
> [2018-08-20 08:37:35,176] WARN Error while fetching metadata with
> correlation id 4 : {testtopic1=UNKNOWN_TOPIC_OR_PARTITION}
> (org.apache.kafka.clients.NetworkClient)
> [2018-08-20 08:37:35,277] WARN Error while fetching metadata with
> correlation id 5 : {testtopic1=UNKNOWN_TOPIC_OR_PARTITION}
> (org.apache.kafka.clients.NetworkClient)
>
>
> What I've verified:
> 1) Client can resolve advertisted.listeners on all brokers. (prior to
> enabling SASL, PLAINTEXT and SSL work with my set advertisted.listerners)
> 2) In my sasl-producer.properties, im authenticating with user Kafka. Kafka
> has been set as super user and in kafka-authorizer.log, I see "
>
> [2018-08-20 08:27:19,971] DEBUG principal = User:kafka is a super user,
> allowing operation without checking acls. (kafka.authorizer.logger)
> [2018-08-20 08:27:19,971] DEBUG Principal = User:kafka is Allowed Operation
> = Describe from host = 10.10.52.1 on resource = Topic:testtopic1
> (kafka.authorizer.logger)
> [2018-08-20 08:27:20,072] DEBUG operation = Read on resource = Topic:
> testtopic1 from host = 10.10.52.1 is Allow based on acl = User:kafka has
> Allow permission for operations: All from hosts: *
> (kafka.authorizer.logger)
>
> and from the kafka.log's in DEBUG:
> [2018-08-20 09:35:48,364] DEBUG principal = User:kafka is a super user,
> allowing operation without checking acls. (kafka.authorizer.logger:159)
> [2018-08-20 09:35:48,364] DEBUG Principal = User:kafka is Allowed Operation
> = Describe from host = 10.89.64.7 on resource = Topic:kerbtest1
> (kafka.authorizer.logger:251)
> [2018-08-20 09:35:48,364] DEBUG Completed
>
> request:{api_key=3,api_version=4,correlation_id=186,client_id=console-producer}
> -- {topics=[kerbtest1],allow_auto_topic_creation=true} from connection
> 10.10.52.1:9093-10.10.52.1
> :42752;totalTime:0.461000,requestQueueTime:0.033000,localTime:0.346000,remoteTime:0.00,throttleTime:0.033000,responseQueueTime:0.03,sendTime:0.066000,securityProtocol:SASL_PLAINTEXT,principal:User:kafka,listener:SASL_PLAINTEXT
> (kafka.request.logger:193)
>
>
> I'm assuming everything is okay from an ACL standpoint but when the client
> cannot get the topic metadata from the returned advertisted listeners?
> Any ideas on what I could be missing? Could this be something with ZK
> setup/any authentication I am missing there?  I had even tried "
> skipACL=yes"
> but that did not change anything.
>
> Thanks!
>


[ANNOUNCE] New Kafka PMC member: Dong Lin

2018-08-20 Thread Ismael Juma
Hi everyone,

Dong Lin became a committer in March 2018. Since then, he has remained
active in the community and contributed a number of patches, reviewed
several pull requests and participated in numerous KIP discussions. I am
happy to announce that Dong is now a member of the
Apache Kafka PMC.

Congratulation Dong! Looking forward to your future contributions.

Ismael, on behalf of the Apache Kafka PMC


Kafka SASL (Kerberos) - UNKNOWN_TOPIC_OR_PARTITION

2018-08-20 Thread Matt L
Hello,

Having trouble when publishing and consuming from a topic with
SASL_PLAINTEXT.

Both ZK and Kafka start successfully, in logs I see SASL_PLAINTEXT on 9093
as being available.

kafka.log:[2018-08-20 03:31:08,202] INFO Registered broker 1 at path
/brokers/ids/1 with addresses:
EndPoint(kafkabroker1,9092,ListenerName(SSL),SSL),EndPoint(kafkabroker1,9093,ListenerName(SASL_PLAINTEXT),SASL_PLAINTEXT)
(kafka.utils.ZkUtils:70)


When i try to publish, e.g.
  bin/kafka-console-producer --broker-list kafkabroker1:9093 \
  --topic testtopic1 --producer.config /tmp/sasl-producer.properties

I get:

[2018-08-20 08:37:35,075] WARN Error while fetching metadata with
correlation id 3 : {testtopic1=UNKNOWN_TOPIC_OR_PARTITION}
(org.apache.kafka.clients.NetworkClient)
[2018-08-20 08:37:35,176] WARN Error while fetching metadata with
correlation id 4 : {testtopic1=UNKNOWN_TOPIC_OR_PARTITION}
(org.apache.kafka.clients.NetworkClient)
[2018-08-20 08:37:35,277] WARN Error while fetching metadata with
correlation id 5 : {testtopic1=UNKNOWN_TOPIC_OR_PARTITION}
(org.apache.kafka.clients.NetworkClient)


What I've verified:
1) Client can resolve advertisted.listeners on all brokers. (prior to
enabling SASL, PLAINTEXT and SSL work with my set advertisted.listerners)
2) In my sasl-producer.properties, im authenticating with user Kafka. Kafka
has been set as super user and in kafka-authorizer.log, I see "

[2018-08-20 08:27:19,971] DEBUG principal = User:kafka is a super user,
allowing operation without checking acls. (kafka.authorizer.logger)
[2018-08-20 08:27:19,971] DEBUG Principal = User:kafka is Allowed Operation
= Describe from host = 10.10.52.1 on resource = Topic:testtopic1
(kafka.authorizer.logger)
[2018-08-20 08:27:20,072] DEBUG operation = Read on resource = Topic:
testtopic1 from host = 10.10.52.1 is Allow based on acl = User:kafka has
Allow permission for operations: All from hosts: * (kafka.authorizer.logger)

and from the kafka.log's in DEBUG:
[2018-08-20 09:35:48,364] DEBUG principal = User:kafka is a super user,
allowing operation without checking acls. (kafka.authorizer.logger:159)
[2018-08-20 09:35:48,364] DEBUG Principal = User:kafka is Allowed Operation
= Describe from host = 10.89.64.7 on resource = Topic:kerbtest1
(kafka.authorizer.logger:251)
[2018-08-20 09:35:48,364] DEBUG Completed
request:{api_key=3,api_version=4,correlation_id=186,client_id=console-producer}
-- {topics=[kerbtest1],allow_auto_topic_creation=true} from connection
10.10.52.1:9093-10.10.52.1:42752;totalTime:0.461000,requestQueueTime:0.033000,localTime:0.346000,remoteTime:0.00,throttleTime:0.033000,responseQueueTime:0.03,sendTime:0.066000,securityProtocol:SASL_PLAINTEXT,principal:User:kafka,listener:SASL_PLAINTEXT
(kafka.request.logger:193)


I'm assuming everything is okay from an ACL standpoint but when the client
cannot get the topic metadata from the returned advertisted listeners?
Any ideas on what I could be missing? Could this be something with ZK
setup/any authentication I am missing there?  I had even tried " skipACL=yes"
but that did not change anything.

Thanks!


Re: Kafka issue

2018-08-20 Thread Nan Xu
maybe I should highlight, I only publish 1 key. so only one broker is going
to handle it. and only 1 stream instance handle it too. what's the typical
throughput/latency I should expect in this case? assuming the processing
logic is very very simple, just get data(integer) and sum. I am more
expecting 100,000 m/s and less than 10ms latency for a single powerful
broker.

Nan

On Mon, Aug 20, 2018 at 12:45 AM Nan Xu  wrote:

> I did several test.  one is with 10 brokers (remote server),
>   one with 3 brokers. (local docker)
>
> both exhibit the same behavior,  I was thinking the same but from at least
> the kafka log, I don't see a rebalance happening. and I am sure my cpu is
> only used about half. and all broker still running.
>
> Nan
>
>
>
> On Mon, Aug 20, 2018 at 12:18 AM Shantanu Deshmukh 
> wrote:
>
>> How many brokers are there in your cluster? This error usually comes when
>> one of the brokers who is leader for a partition dies and you are trying
>> to
>> access it.
>>
>> On Fri, Aug 17, 2018 at 9:23 PM Harish K  wrote:
>>
>> > Hi,
>> >I have installed Kafka and created topic but while data ingestion i
>> get
>> > some errors as follows.Any help would be really appreciated
>> >
>> >
>> > [2018-08-17 06:12:49,838] WARN Error while fetching metadata with
>> > correlation id 24 :
>> > {wikipedia=LEADER_NOT_AVAILABLE}(org.apache.kafka.clients.NetworkClient)
>> >
>> > *server log:*
>> >
>> > [2018-08-17 06:06:00,719] INFO Creating /controller (is it secure?
>> false)
>> > (kafka.utils.ZKCheckedEphemeral)
>> > [2018-08-17 06:06:00,720] INFO Result of znode creation is: OK
>> > (kafka.utils.ZKCheckedEphemeral)
>> > [2018-08-17 06:06:00,720] INFO 0 successfully elected as leader
>> > (kafka.server.ZookeeperLeaderElector)
>> > [2018-08-17 06:06:00,736] ERROR Error while electing or becoming leader
>> on
>> > broker 0 (kafka.server.ZookeeperLeaderElector)
>> > kafka.common.KafkaException: Can't parse json string: null
>> > at kafka.utils.Json$.liftedTree1$1(Json.scala:40)
>> > at kafka.utils.Json$.parseFull(Json.scala:36)
>> > at
>> >
>> >
>> kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:660)
>> > at
>> >
>> >
>> kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:656)
>> > at
>> >
>> >
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> > at kafka.utils.ZkUtils.getReplicaAssignmentForTopics(ZkUtils.scala:656)
>> > at
>> >
>> >
>> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742)
>> > at
>> >
>> >
>> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:333)
>> > at
>> >
>> >
>> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:160)
>> > at
>> >
>> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:85)
>> > at
>> >
>> >
>> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:154)
>> > at
>> >
>> >
>> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:154)
>> > at
>> >
>> >
>> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:154)
>> > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>> > at
>> >
>> >
>> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:153)
>> > at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:825)
>> > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:72)
>> > Caused by: java.lang.NullPointerException
>> > at
>> >
>> >
>> scala.util.parsing.combinator.lexical.Scanners$Scanner.(Scanners.scala:44)
>> > at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:51)
>> > at scala.util.parsing.json.JSON$.parseFull(JSON.scala:65)
>> > at kafka.utils.Json$.liftedTree1$1(Json.scala:37)
>> > ... 17 more
>> >
>> >
>> > *Controller Log:*
>> >
>> > [2018-08-17 06:05:54,644] INFO [Controller 0]: Controller starting up
>> > (kafka.controller.KafkaController)
>> > [2018-08-17 06:05:54,659] INFO [Controller 0]: Broker 0 starting become
>> > controller state transition (kafka.controller.KafkaController)
>> > [2018-08-17 06:05:54,661] INFO [Controller 0]: Initialized controller
>> epoch
>> > to 2294948 and zk version 2294947 (kafka.controller.KafkaController)
>> > [2018-08-17 06:05:54,664] INFO [Controller 0]: Controller 0 incremented
>> > epoch to 2294949 (kafka.controller.KafkaController)
>> > [2018-08-17 06:05:54,665] DEBUG [Controller 0]: Registering
>> > IsrChangeNotificationListener (kafka.controller.KafkaController)
>> > [2018-08-17 06:05:54,705] INFO [Controller 0]: Controller startup
>> complete
>> > (kafka.controller.KafkaController)
>> > [2018-08-17 06:05:54,715] DEBUG [Controller 0]: Controller resigning,
>> > broker id 0