RE: Re: Re: Kafka 3.2.1 performance issue with JDK 11

2023-05-22 Thread Vic Xu
Hi Greg

I know JDK 8 will solve the problem certainly, but I wondering if JDK8 has been 
deprecated after Kafka 3? Can I use JDK 8 with Kafka 3.2.1? Thank you.


On 2023/05/22 20:55:01 Greg Harris wrote:
> Vic,
> 
> While you can certainly try that, I don't know if that will solve the problem.
> The reason why JDK11 appears relevant in this context is that a class
> was removed between JDK8 and JDK11. I don't know if a replacement
> stack inspector with better performance was added to JDK17 and used
> within log4j2.
> If you were to try to solve this with a JDK version change, a
> downgrade to 8 may solve the problem, since the log4j library would
> use a different stack inspector.
> 
> Greg
> 
> On Sun, May 21, 2023 at 11:30 PM Vic Xu  wrote:
> >
> > Hi Greg,
> >
> > I found another possible solution that is upgrade JDK from 11 to 17. Do you 
> > recommend this solution?
> >
> > On 2023/05/21 17:58:42 Greg Harris wrote:
> > > Vic,
> > >
> > > I found an open JIRA issue that previously reported this problem:
> > > https://issues.apache.org/jira/browse/KAFKA-10877 .
> > > I believe one workaround is to use log4j 1.x, such as reload4j. Kafka
> > > still relies on log4j 1.x until the planned upgrade to log4j 2.x in
> > > kafka 4.0 https://issues.apache.org/jira/browse/KAFKA-9366 .
> > > I will look into reviving or replacing the performance patch for 3.x.
> > >
> > > Hope this helps,
> > > Greg Harris
> > >
> > > On Sun, May 21, 2023 at 6:31 AM Vic Xu  wrote:
> > > >
> > > > Hello all,  I have a Kafka cluster deployed with version 3.2.1 , JDK 11 
> > > > and log4j 2.18.0. I built my own Kafka image. One of my Kafka brokers 
> > > > is experiencing CPU issues, and based on the jstack information, it 
> > > > seems that log4j is causing the problem due to its usage of 
> > > > StackWalker. How to solve this issue?
> > > >
> > > > Here is jstack information:
> > > > "data-plane-kafka-request-handler-6" #59 daemon prio=5 os_prio=0 
> > > > cpu=86381259.23ms elapsed=1948787.21s tid=0x7f8939c04800 nid=0x190 
> > > > runnable  [0x7f883f6f5000]
> > > >java.lang.Thread.State: RUNNABLE
> > > > at 
> > > > java.lang.StackStreamFactory$AbstractStackWalker.fetchStackFrames(java.base@11.0.9/Native
> > > >  Method)
> > > > at 
> > > > java.lang.StackStreamFactory$AbstractStackWalker.fetchStackFrames(java.base@11.0.9/Unknown
> > > >  Source)
> > > > at 
> > > > java.lang.StackStreamFactory$AbstractStackWalker.getNextBatch(java.base@11.0.9/Unknown
> > > >  Source)
> > > > at 
> > > > java.lang.StackStreamFactory$AbstractStackWalker.peekFrame(java.base@11.0.9/Unknown
> > > >  Source)
> > > > at 
> > > > java.lang.StackStreamFactory$AbstractStackWalker.hasNext(java.base@11.0.9/Unknown
> > > >  Source)
> > > > at 
> > > > java.lang.StackStreamFactory$StackFrameTraverser.tryAdvance(java.base@11.0.9/Unknown
> > > >  Source)
> > > > at 
> > > > java.util.stream.ReferencePipeline.forEachWithCancel(java.base@11.0.9/Unknown
> > > >  Source)
> > > > at 
> > > > java.util.stream.AbstractPipeline.copyIntoWithCancel(java.base@11.0.9/Unknown
> > > >  Source)
> > > > at 
> > > > java.util.stream.AbstractPipeline.copyInto(java.base@11.0.9/Unknown 
> > > > Source)
> > > > at 
> > > > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@11.0.9/Unknown
> > > >  Source)
> > > > at 
> > > > java.util.stream.FindOps$FindOp.evaluateSequential(java.base@11.0.9/Unknown
> > > >  Source)
> > > > at 
> > > > java.util.stream.AbstractPipeline.evaluate(java.base@11.0.9/Unknown 
> > > > Source)
> > > > at 
> > > > java.util.stream.ReferencePipeline.findFirst(java.base@11.0.9/Unknown 
> > > > Source)
> > > > at 
> > > > org.apache.logging.log4j.util.StackLocator.lambda$getCallerClass$2(StackLocator.java:57)
> > > > at 
> > > > org.apache.logging.log4j.util.StackLocator$$Lambda$117/0x0008001a6c40.apply(Unknown
> > > >  Source)
> > > > at 
> > > > java.lang.StackStream

RE: Re: Kafka 3.2.1 performance issue with JDK 11

2023-05-22 Thread Vic Xu
Hi Greg, 

I found another possible solution that is upgrade JDK from 11 to 17. Do you 
recommend this solution?

On 2023/05/21 17:58:42 Greg Harris wrote:
> Vic,
> 
> I found an open JIRA issue that previously reported this problem:
> https://issues.apache.org/jira/browse/KAFKA-10877 .
> I believe one workaround is to use log4j 1.x, such as reload4j. Kafka
> still relies on log4j 1.x until the planned upgrade to log4j 2.x in
> kafka 4.0 https://issues.apache.org/jira/browse/KAFKA-9366 .
> I will look into reviving or replacing the performance patch for 3.x.
> 
> Hope this helps,
> Greg Harris
> 
> On Sun, May 21, 2023 at 6:31 AM Vic Xu  wrote:
> >
> > Hello all,  I have a Kafka cluster deployed with version 3.2.1 , JDK 11 and 
> > log4j 2.18.0. I built my own Kafka image. One of my Kafka brokers is 
> > experiencing CPU issues, and based on the jstack information, it seems that 
> > log4j is causing the problem due to its usage of StackWalker. How to solve 
> > this issue?
> >
> > Here is jstack information:
> > "data-plane-kafka-request-handler-6" #59 daemon prio=5 os_prio=0 
> > cpu=86381259.23ms elapsed=1948787.21s tid=0x7f8939c04800 nid=0x190 
> > runnable  [0x7f883f6f5000]
> >java.lang.Thread.State: RUNNABLE
> > at 
> > java.lang.StackStreamFactory$AbstractStackWalker.fetchStackFrames(java.base@11.0.9/Native
> >  Method)
> > at 
> > java.lang.StackStreamFactory$AbstractStackWalker.fetchStackFrames(java.base@11.0.9/Unknown
> >  Source)
> > at 
> > java.lang.StackStreamFactory$AbstractStackWalker.getNextBatch(java.base@11.0.9/Unknown
> >  Source)
> > at 
> > java.lang.StackStreamFactory$AbstractStackWalker.peekFrame(java.base@11.0.9/Unknown
> >  Source)
> > at 
> > java.lang.StackStreamFactory$AbstractStackWalker.hasNext(java.base@11.0.9/Unknown
> >  Source)
> > at 
> > java.lang.StackStreamFactory$StackFrameTraverser.tryAdvance(java.base@11.0.9/Unknown
> >  Source)
> > at 
> > java.util.stream.ReferencePipeline.forEachWithCancel(java.base@11.0.9/Unknown
> >  Source)
> > at 
> > java.util.stream.AbstractPipeline.copyIntoWithCancel(java.base@11.0.9/Unknown
> >  Source)
> > at 
> > java.util.stream.AbstractPipeline.copyInto(java.base@11.0.9/Unknown Source)
> > at 
> > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@11.0.9/Unknown 
> > Source)
> > at 
> > java.util.stream.FindOps$FindOp.evaluateSequential(java.base@11.0.9/Unknown 
> > Source)
> > at 
> > java.util.stream.AbstractPipeline.evaluate(java.base@11.0.9/Unknown Source)
> > at 
> > java.util.stream.ReferencePipeline.findFirst(java.base@11.0.9/Unknown 
> > Source)
> > at 
> > org.apache.logging.log4j.util.StackLocator.lambda$getCallerClass$2(StackLocator.java:57)
> > at 
> > org.apache.logging.log4j.util.StackLocator$$Lambda$117/0x0008001a6c40.apply(Unknown
> >  Source)
> > at 
> > java.lang.StackStreamFactory$StackFrameTraverser.consumeFrames(java.base@11.0.9/Unknown
> >  Source)
> > at 
> > java.lang.StackStreamFactory$AbstractStackWalker.doStackWalk(java.base@11.0.9/Unknown
> >  Source)
> > at 
> > java.lang.StackStreamFactory$AbstractStackWalker.callStackWalk(java.base@11.0.9/Native
> >  Method)
> > at 
> > java.lang.StackStreamFactory$AbstractStackWalker.beginStackWalk(java.base@11.0.9/Unknown
> >  Source)
> > at 
> > java.lang.StackStreamFactory$AbstractStackWalker.walk(java.base@11.0.9/Unknown
> >  Source)
> > at java.lang.StackWalker.walk(java.base@11.0.9/Unknown Source)
> > at 
> > org.apache.logging.log4j.util.StackLocator.getCallerClass(StackLocator.java:51)
> > at 
> > org.apache.logging.log4j.util.StackLocatorUtil.getCallerClass(StackLocatorUtil.java:104)
> > at 
> > org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:50)
> > at 
> > org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:47)
> > at 
> > org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:33)
> > at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:363)
> > at kafka.utils.Logging.logger(Logging.scala:43)
> > at kafka.utils.Logging.logger$(Logging.scala:43)
> > at 
> > kafka.server.SessionlessFetchContext.logger$lzycompute(FetchSession.scala:364)
> > - lo

Re: Kafka 3.2.1 performance issue with JDK 11

2023-05-22 Thread Vic Xu
Hi Greg,

I found another possible solution that is upgrade JDK from 11 to 17. Do you
recommend this solution?

Greg Harris  於 2023年5月22日 週一 上午1:59寫道:

> Vic,
>
> I found an open JIRA issue that previously reported this problem:
> https://issues.apache.org/jira/browse/KAFKA-10877 .
> I believe one workaround is to use log4j 1.x, such as reload4j. Kafka
> still relies on log4j 1.x until the planned upgrade to log4j 2.x in
> kafka 4.0 https://issues.apache.org/jira/browse/KAFKA-9366 .
> I will look into reviving or replacing the performance patch for 3.x.
>
> Hope this helps,
> Greg Harris
>
> On Sun, May 21, 2023 at 6:31 AM Vic Xu 
> wrote:
> >
> > Hello all,  I have a Kafka cluster deployed with version 3.2.1 , JDK 11
> and log4j 2.18.0. I built my own Kafka image. One of my Kafka brokers is
> experiencing CPU issues, and based on the jstack information, it seems that
> log4j is causing the problem due to its usage of StackWalker. How to solve
> this issue?
> >
> > Here is jstack information:
> > "data-plane-kafka-request-handler-6" #59 daemon prio=5 os_prio=0
> cpu=86381259.23ms elapsed=1948787.21s tid=0x7f8939c04800 nid=0x190
> runnable  [0x7f883f6f5000]
> >java.lang.Thread.State: RUNNABLE
> > at
> java.lang.StackStreamFactory$AbstractStackWalker.fetchStackFrames(java.base@11.0.9/Native
> Method)
> > at
> java.lang.StackStreamFactory$AbstractStackWalker.fetchStackFrames(java.base@11.0.9/Unknown
> Source)
> > at
> java.lang.StackStreamFactory$AbstractStackWalker.getNextBatch(java.base@11.0.9/Unknown
> Source)
> > at
> java.lang.StackStreamFactory$AbstractStackWalker.peekFrame(java.base@11.0.9/Unknown
> Source)
> > at
> java.lang.StackStreamFactory$AbstractStackWalker.hasNext(java.base@11.0.9/Unknown
> Source)
> > at
> java.lang.StackStreamFactory$StackFrameTraverser.tryAdvance(java.base@11.0.9/Unknown
> Source)
> > at
> java.util.stream.ReferencePipeline.forEachWithCancel(java.base@11.0.9/Unknown
> Source)
> > at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(java.base@11.0.9/Unknown
> Source)
> > at 
> > java.util.stream.AbstractPipeline.copyInto(java.base@11.0.9/Unknown
> Source)
> > at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@11.0.9/Unknown
> Source)
> > at
> java.util.stream.FindOps$FindOp.evaluateSequential(java.base@11.0.9/Unknown
> Source)
> > at 
> > java.util.stream.AbstractPipeline.evaluate(java.base@11.0.9/Unknown
> Source)
> > at 
> > java.util.stream.ReferencePipeline.findFirst(java.base@11.0.9/Unknown
> Source)
> > at
> org.apache.logging.log4j.util.StackLocator.lambda$getCallerClass$2(StackLocator.java:57)
> > at
> org.apache.logging.log4j.util.StackLocator$$Lambda$117/0x0008001a6c40.apply(Unknown
> Source)
> > at
> java.lang.StackStreamFactory$StackFrameTraverser.consumeFrames(java.base@11.0.9/Unknown
> Source)
> > at
> java.lang.StackStreamFactory$AbstractStackWalker.doStackWalk(java.base@11.0.9/Unknown
> Source)
> > at
> java.lang.StackStreamFactory$AbstractStackWalker.callStackWalk(java.base@11.0.9/Native
> Method)
> > at
> java.lang.StackStreamFactory$AbstractStackWalker.beginStackWalk(java.base@11.0.9/Unknown
> Source)
> > at
> java.lang.StackStreamFactory$AbstractStackWalker.walk(java.base@11.0.9/Unknown
> Source)
> > at java.lang.StackWalker.walk(java.base@11.0.9/Unknown Source)
> > at
> org.apache.logging.log4j.util.StackLocator.getCallerClass(StackLocator.java:51)
> > at
> org.apache.logging.log4j.util.StackLocatorUtil.getCallerClass(StackLocatorUtil.java:104)
> > at
> org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:50)
> > at
> org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:47)
> > at
> org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:33)
> > at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:363)
> > at kafka.utils.Logging.logger(Logging.scala:43)
> > at kafka.utils.Logging.logger$(Logging.scala:43)
> > at
> kafka.server.SessionlessFetchContext.logger$lzycompute(FetchSession.scala:364)
> > - locked <0x0007fa037e58> (a
> kafka.server.SessionlessFetchContext)
> > at
> kafka.server.SessionlessFetchContext.logger(FetchSession.scala:364)
> > at kafka.utils.Logging.debug(Logging.scala:62)
> > a

Kafka 3.2.1 performance issue with JDK 11

2023-05-21 Thread Vic Xu
Hello all,  I have a Kafka cluster deployed with version 3.2.1 , JDK 11 and 
log4j 2.18.0. I built my own Kafka image. One of my Kafka brokers is 
experiencing CPU issues, and based on the jstack information, it seems that 
log4j is causing the problem due to its usage of StackWalker. How to solve this 
issue?

Here is jstack information:
"data-plane-kafka-request-handler-6" #59 daemon prio=5 os_prio=0 
cpu=86381259.23ms elapsed=1948787.21s tid=0x7f8939c04800 nid=0x190 runnable 
 [0x7f883f6f5000]
   java.lang.Thread.State: RUNNABLE
at 
java.lang.StackStreamFactory$AbstractStackWalker.fetchStackFrames(java.base@11.0.9/Native
 Method)
at 
java.lang.StackStreamFactory$AbstractStackWalker.fetchStackFrames(java.base@11.0.9/Unknown
 Source)
at 
java.lang.StackStreamFactory$AbstractStackWalker.getNextBatch(java.base@11.0.9/Unknown
 Source)
at 
java.lang.StackStreamFactory$AbstractStackWalker.peekFrame(java.base@11.0.9/Unknown
 Source)
at 
java.lang.StackStreamFactory$AbstractStackWalker.hasNext(java.base@11.0.9/Unknown
 Source)
at 
java.lang.StackStreamFactory$StackFrameTraverser.tryAdvance(java.base@11.0.9/Unknown
 Source)
at 
java.util.stream.ReferencePipeline.forEachWithCancel(java.base@11.0.9/Unknown 
Source)
at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(java.base@11.0.9/Unknown 
Source)
at java.util.stream.AbstractPipeline.copyInto(java.base@11.0.9/Unknown 
Source)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@11.0.9/Unknown 
Source)
at 
java.util.stream.FindOps$FindOp.evaluateSequential(java.base@11.0.9/Unknown 
Source)
at java.util.stream.AbstractPipeline.evaluate(java.base@11.0.9/Unknown 
Source)
at 
java.util.stream.ReferencePipeline.findFirst(java.base@11.0.9/Unknown Source)
at 
org.apache.logging.log4j.util.StackLocator.lambda$getCallerClass$2(StackLocator.java:57)
at 
org.apache.logging.log4j.util.StackLocator$$Lambda$117/0x0008001a6c40.apply(Unknown
 Source)
at 
java.lang.StackStreamFactory$StackFrameTraverser.consumeFrames(java.base@11.0.9/Unknown
 Source)
at 
java.lang.StackStreamFactory$AbstractStackWalker.doStackWalk(java.base@11.0.9/Unknown
 Source)
at 
java.lang.StackStreamFactory$AbstractStackWalker.callStackWalk(java.base@11.0.9/Native
 Method)
at 
java.lang.StackStreamFactory$AbstractStackWalker.beginStackWalk(java.base@11.0.9/Unknown
 Source)
at 
java.lang.StackStreamFactory$AbstractStackWalker.walk(java.base@11.0.9/Unknown 
Source)
at java.lang.StackWalker.walk(java.base@11.0.9/Unknown Source)
at 
org.apache.logging.log4j.util.StackLocator.getCallerClass(StackLocator.java:51)
at 
org.apache.logging.log4j.util.StackLocatorUtil.getCallerClass(StackLocatorUtil.java:104)
at 
org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:50)
at 
org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:47)
at 
org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:33)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:363)
at kafka.utils.Logging.logger(Logging.scala:43)
at kafka.utils.Logging.logger$(Logging.scala:43)
at 
kafka.server.SessionlessFetchContext.logger$lzycompute(FetchSession.scala:364)
- locked <0x0007fa037e58> (a kafka.server.SessionlessFetchContext)
at kafka.server.SessionlessFetchContext.logger(FetchSession.scala:364)
at kafka.utils.Logging.debug(Logging.scala:62)
at kafka.utils.Logging.debug$(Logging.scala:62)
at kafka.server.SessionlessFetchContext.debug(FetchSession.scala:364)
at 
kafka.server.SessionlessFetchContext.updateAndGenerateResponseData(FetchSession.scala:377)
at kafka.server.KafkaApis.processResponseCallback$1(KafkaApis.scala:932)
at 
kafka.server.KafkaApis.$anonfun$handleFetchRequest$33(KafkaApis.scala:965)
at 
kafka.server.KafkaApis.$anonfun$handleFetchRequest$33$adapted(KafkaApis.scala:965)
at kafka.server.KafkaApis$$Lambda$1241/0x0008007e4040.apply(Unknown 
Source)

Re: [ANNOUNCE] New Kafka PMC Member: Randall Hauch

2021-04-16 Thread lobo xu
Congrats Randall 


Re: kafka-producer-network-thread hangs while fetching metadata.

2021-04-14 Thread lobo xu
Hi nikhita
   It looks like there is a problem with the connection address 
"mycoolhost:mycoolport". The address resolves properly to IP and port。
> 在 2021年4月15日,08:51,nikhita kataria  写道:
> 
> ava.io.IOException: Can't resolve address: mycoolhost:mycoolport
>at



Re: kafka latency for large message

2019-03-19 Thread Nan Xu
that's very good information from the slides, thanks. Our design to use
kafka has 2 purpose. one is use it as a cache, we use ktable for that
purpose, second purpose is use as message delivery mechanism to send it to
other system. Because we very much care the latency, the ktable with a
compact topic suit us very well, if has to find another system to do the
caching, big change involved. The way described in the slides, which break
the message to smaller chunks then reassemble them seems a viable solution.

do you know why kafka doesn't have a liner latency for big messages
comparing to small ones. for 2M message, I have avg latency less than 10
ms, more expecting for 30M has latency less than 10 * 20 = 200ms

On Mon, Mar 18, 2019 at 3:29 PM Bruce Markey  wrote:

> Hi Nan,
>
> Would you consider other approaches that may actually be a more efficient
> solution for you? There is a slide deck Handle Large Messages In Apache
> Kafka
> <
> https://www.slideshare.net/JiangjieQin/handle-large-messages-in-apache-kafka-58692297
> >.
> For messages this large, one of the approaches suggested is Reference Based
> Messaging where you write your large files to an external data store then
> produce a small Apache Kafka message with a reference for where to find the
> file. This would allow your consumer applications to find the file as
> needed rather than storing all that data in the event log.
>
> --  bjm
>
> On Thu, Mar 14, 2019 at 1:53 PM Xu, Nan  wrote:
>
> > Hi,
> >
> > We are using kafka to send messages and there is less than 1% of
> > message is very big, close to 30M. understanding kafka is not ideal for
> > sending big messages, because the large message rate is very low, we just
> > want let kafka do it anyway. But still want to get a reasonable latency.
> >
> > To test, I just setup up a topic test on a single broker local kafka,
> > with only 1 partition and 1 replica, using the following command
> >
> > ./kafka-producer-perf-test.sh  --topic test --num-records 200
> > --throughput 1 --record-size 3000 --producer.config
> > ../config/producer.properties
> >
> > Producer.config
> >
> > #Max 40M message
> > max.request.size=4000
> > buffer.memory=4000
> >
> > #2M buffer
> > send.buffer.bytes=200
> >
> > 6 records sent, 1.1 records/sec (31.00 MB/sec), 973.0 ms avg latency,
> > 1386.0 max latency.
> > 6 records sent, 1.0 records/sec (28.91 MB/sec), 787.2 ms avg latency,
> > 1313.0 max latency.
> > 5 records sent, 1.0 records/sec (27.92 MB/sec), 582.8 ms avg latency,
> > 643.0 max latency.
> > 6 records sent, 1.1 records/sec (30.16 MB/sec), 685.3 ms avg latency,
> > 1171.0 max latency.
> > 5 records sent, 1.0 records/sec (27.92 MB/sec), 629.4 ms avg latency,
> > 729.0 max latency.
> > 5 records sent, 1.0 records/sec (27.61 MB/sec), 635.6 ms avg latency,
> > 673.0 max latency.
> > 6 records sent, 1.1 records/sec (30.09 MB/sec), 736.2 ms avg latency,
> > 1255.0 max latency.
> > 5 records sent, 1.0 records/sec (27.62 MB/sec), 626.8 ms avg latency,
> > 685.0 max latency.
> > 5 records sent, 1.0 records/sec (28.38 MB/sec), 608.8 ms avg latency,
> > 685.0 max latency.
> >
> >
> > On the broker, I change the
> >
> > socket.send.buffer.bytes=2024000
> > # The receive buffer (SO_RCVBUF) used by the socket server
> > socket.receive.buffer.bytes=2224000
> >
> > and all others are default.
> >
> > I am a little surprised to see about 1 s max latency and average about
> 0.5
> > s. my understanding is kafka is doing the memory mapping for log file and
> > let system flush it. all the write are sequential. So flush should be not
> > affected by message size that much. Batching and network will take
> longer,
> > but those are memory based and local machine. my ssd should be far better
> > than 0.5 second. where the time got consumed? any suggestion?
> >
> > Thanks,
> > Nan
> >
> >
> >
> >
> >
> >
> >
> > --
> > This message, and any attachments, is for the intended recipient(s) only,
> > may contain information that is privileged, confidential and/or
> proprietary
> > and subject to important terms and conditions available at
> > http://www.bankofamerica.com/emaildisclaimer.   If you are not the
> > intended recipient, please delete this message.
> >
>


Re: kafka latency for large message

2019-03-18 Thread Nan Xu
anyone can give some suggestion? or an explanation why kafka give a big
latency for large payload.

Thanks,
Nan

On Thu, Mar 14, 2019 at 3:53 PM Xu, Nan  wrote:

> Hi,
>
> We are using kafka to send messages and there is less than 1% of
> message is very big, close to 30M. understanding kafka is not ideal for
> sending big messages, because the large message rate is very low, we just
> want let kafka do it anyway. But still want to get a reasonable latency.
>
> To test, I just setup up a topic test on a single broker local kafka,
> with only 1 partition and 1 replica, using the following command
>
> ./kafka-producer-perf-test.sh  --topic test --num-records 200
> --throughput 1 --record-size 3000 --producer.config
> ../config/producer.properties
>
> Producer.config
>
> #Max 40M message
> max.request.size=4000
> buffer.memory=4000
>
> #2M buffer
> send.buffer.bytes=200
>
> 6 records sent, 1.1 records/sec (31.00 MB/sec), 973.0 ms avg latency,
> 1386.0 max latency.
> 6 records sent, 1.0 records/sec (28.91 MB/sec), 787.2 ms avg latency,
> 1313.0 max latency.
> 5 records sent, 1.0 records/sec (27.92 MB/sec), 582.8 ms avg latency,
> 643.0 max latency.
> 6 records sent, 1.1 records/sec (30.16 MB/sec), 685.3 ms avg latency,
> 1171.0 max latency.
> 5 records sent, 1.0 records/sec (27.92 MB/sec), 629.4 ms avg latency,
> 729.0 max latency.
> 5 records sent, 1.0 records/sec (27.61 MB/sec), 635.6 ms avg latency,
> 673.0 max latency.
> 6 records sent, 1.1 records/sec (30.09 MB/sec), 736.2 ms avg latency,
> 1255.0 max latency.
> 5 records sent, 1.0 records/sec (27.62 MB/sec), 626.8 ms avg latency,
> 685.0 max latency.
> 5 records sent, 1.0 records/sec (28.38 MB/sec), 608.8 ms avg latency,
> 685.0 max latency.
>
>
> On the broker, I change the
>
> socket.send.buffer.bytes=2024000
> # The receive buffer (SO_RCVBUF) used by the socket server
> socket.receive.buffer.bytes=2224000
>
> and all others are default.
>
> I am a little surprised to see about 1 s max latency and average about 0.5
> s. my understanding is kafka is doing the memory mapping for log file and
> let system flush it. all the write are sequential. So flush should be not
> affected by message size that much. Batching and network will take longer,
> but those are memory based and local machine. my ssd should be far better
> than 0.5 second. where the time got consumed? any suggestion?
>
> Thanks,
> Nan
>
>
>
>
>
>
>
> --
> This message, and any attachments, is for the intended recipient(s) only,
> may contain information that is privileged, confidential and/or proprietary
> and subject to important terms and conditions available at
> http://www.bankofamerica.com/emaildisclaimer.   If you are not the
> intended recipient, please delete this message.
>


kafka latency for large message

2019-03-14 Thread Xu, Nan
Hi, 
   
We are using kafka to send messages and there is less than 1% of message is 
very big, close to 30M. understanding kafka is not ideal for sending big 
messages, because the large message rate is very low, we just want let kafka do 
it anyway. But still want to get a reasonable latency.

To test, I just setup up a topic test on a single broker local kafka,  with 
only 1 partition and 1 replica, using the following command

./kafka-producer-perf-test.sh  --topic test --num-records 200  --throughput 
1 --record-size 3000 --producer.config ../config/producer.properties

Producer.config

#Max 40M message
max.request.size=4000
buffer.memory=4000

#2M buffer
send.buffer.bytes=200

6 records sent, 1.1 records/sec (31.00 MB/sec), 973.0 ms avg latency, 1386.0 
max latency.
6 records sent, 1.0 records/sec (28.91 MB/sec), 787.2 ms avg latency, 1313.0 
max latency.
5 records sent, 1.0 records/sec (27.92 MB/sec), 582.8 ms avg latency, 643.0 max 
latency.
6 records sent, 1.1 records/sec (30.16 MB/sec), 685.3 ms avg latency, 1171.0 
max latency.
5 records sent, 1.0 records/sec (27.92 MB/sec), 629.4 ms avg latency, 729.0 max 
latency.
5 records sent, 1.0 records/sec (27.61 MB/sec), 635.6 ms avg latency, 673.0 max 
latency.
6 records sent, 1.1 records/sec (30.09 MB/sec), 736.2 ms avg latency, 1255.0 
max latency.
5 records sent, 1.0 records/sec (27.62 MB/sec), 626.8 ms avg latency, 685.0 max 
latency.
5 records sent, 1.0 records/sec (28.38 MB/sec), 608.8 ms avg latency, 685.0 max 
latency.


On the broker, I change the 

socket.send.buffer.bytes=2024000
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=2224000

and all others are default.

I am a little surprised to see about 1 s max latency and average about 0.5 s. 
my understanding is kafka is doing the memory mapping for log file and let 
system flush it. all the write are sequential. So flush should be not affected 
by message size that much. Batching and network will take longer, but those are 
memory based and local machine. my ssd should be far better than 0.5 second. 
where the time got consumed? any suggestion?

Thanks,
Nan







--
This message, and any attachments, is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/emaildisclaimer.   If you are not the intended 
recipient, please delete this message.


kafka stream. streams.allMetadata() get nothing back.

2019-02-27 Thread Nan Xu
hi,

   trying the following program and want to see the metadata for
test_store, and nothing get back, the  val metaIter =
streams.allMetadata().iterator() size is 0. I can see data in the store
though, but I need metadata so when I have multiple instance running. I can
find the right store.
is there any setting I missed?

Thanks,
Nan

   val storeName = "test_store"
  val streamProperties = new Properties()
  streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
"streams-store-test5")
  streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")

  val streamBuilder = new StreamsBuilder()

  val inputStream = streamBuilder.stream[String, String]("test1")

  val kvStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName),
serialization.Serdes.String(),
serialization.Serdes.String(),
  ).withCachingDisabled()

  streamBuilder.addStateStore(kvStore)

  val transformer: TransformerSupplier[String, String, KeyValue[String,
String]]
  = () => new SimpleTransformer(storeName)

  inputStream.transform(transformer, storeName)

  val topology = streamBuilder.build()
  val streams = new KafkaStreams(topology, streamProperties)
  Runtime.getRuntime.addShutdownHook(new Thread { () => streams.close() })

  import scala.concurrent.ExecutionContext.Implicits.global

  streams.start()

  Thread.sleep(5000)
  Future {
val metaIter = streams.allMetadata().iterator()
while(metaIter.hasNext){
  println("===meta: " + metaIter.next())
}
}


what's in the rocksdb in the tmp dir?

2019-02-20 Thread Nan Xu
just a general question about the rocksdb in the Kafka stream, I see there
is a folder at /tmp/kafka-stream/, which is used by the rocksdb in the
kafka stream. so when a stream app get restarted, can the store data
directly loaded from this folder? because I see there is very heavy traffic
on the network to read from broker, assuming it's trying to rebuild the
store. if so, what's the purpose of this rocksdb folder?

Thanks,
Nan


Re: Can I query ktable/stream/store with SQL like statment.

2019-02-13 Thread Nan Xu
Range query would be enough for me, is there an example? which api I can
call for this?

Thanks,
Nan

On Tue, Feb 12, 2019 at 6:17 PM Matthias J. Sax 
wrote:

> You could do a range query from "abc" to "abd" for example (in this
> case, you would need to make sure to check the result form the iterator
> and drop "abd" though).
>
> Note, that range queries are executed on the raw bytes. Thus, you need
> to understand how the serializes you use work. In doubt, you may want to
> use conservative ranges and apply a filter on the iterator to ignore
> false positives.
>
> Also, this only work for prefix queries, ie, if you query with a know
> prefix of the key.
>
> Hope this helps.
>
> -Matthias
>
> On 2/12/19 8:25 AM, Nan Xu wrote:
> > Hi,
> >
> > Just wondering if there is a way to do a sql like "select key,
> > value.fieild1 from ktable where key like abc%"
> > The purpose of this to select some value from a ktable without a
> fully
> > defined key.  Store.all then filter on them would be very inefficient if
> > store is big.
> >
> > Thanks,
> > Nan
> >
>
>


Can I query ktable/stream/store with SQL like statment.

2019-02-12 Thread Nan Xu
Hi,

Just wondering if there is a way to do a sql like "select key,
value.fieild1 from ktable where key like abc%"
The purpose of this to select some value from a ktable without a fully
defined key.  Store.all then filter on them would be very inefficient if
store is big.

Thanks,
Nan


Re: kstream transform forward to different topics

2019-02-07 Thread Nan Xu
that will be really helpful, thanks for the heads up.

On Thu, Feb 7, 2019 at 7:36 PM Guozhang Wang  wrote:

> Hi Nan,
>
> Glad it helps with your case. Just another note that in the next release
> when KIP-307 is in place [1], you can actually combine the DSL with PAPI by
> naming the last operator that creates your transformed KStream, and then
> manually add the sink nodes like:
>
> stream2 = stream1.transform(Named.as("myName"));
>
> topology = builder.build();
>
> // continue adding to the built topology
> topology.addSink(... "myName");
>
> -
>
> Or you can also rely on flatTransform [2] to reduce "transform.flatMap" to
> a single operator.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> [2]
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-313%3A+Add+KStream.flatTransform+and+KStream.flatTransformValues
>
>
> Guozhang
>
> On Thu, Feb 7, 2019 at 12:59 PM Nan Xu  wrote:
>
> > awesome, this solution is great, thanks a lot.
> >
> > Nan
> >
> > On Thu, Feb 7, 2019 at 2:28 PM Bill Bejeck  wrote:
> >
> > > Hi Nan,
> > >
> > > l see what you are saying about reproducing a join in the PAPI.
> > >
> > > I have another thought.
> > >
> > >1. Have your Transform return a List [r1, r2, r3]
> > >2. Then after your transform operation use a  flatMapValues operator
> > as
> > >this will forward KV pairs of (k, r1), (k, r2), and (k, r3).
> > >
> > > From there you have two choices.
> > >
> > >  1. If you are using Kafka Streams v 2.0+, you can create an instance
> of
> > > TopicNameExtractor.
> > > The TopicNameExtractor can return the appropriate topic name based on
> the
> > > instance type of the value.
> > > Then you would look something like
> > >
> inputStream.transform(transformer).flatMapValues(...).to(MyTopicChooser,
> > > Produced(...));
> > >
> > > 2. If you are using a version of Kafka Streams prior to v 2.0 then
> first
> > > create 3 org.apache.kafka.streams.kstream.Predicate instances.
> > >
> > >- Predicate p1 = (k,v) -> v instanceof r1;
> > >- Predicate p2 = (k,v) -> v instanceof r2;
> > >- Predicate p3 = (k,v) -> v instanceof r3;
> > >
> > >  You will still use the flatMapValues operator, but now you'd follow it
> > > with the branch operator and have the resulting stream instances in the
> > > array foward to the appropriate topic
> > >
> > >  val allStreams =
> > > inputStream.transform(transformer).flatMapValues(...).branch(p1,
> > > p2, p3);
> > >  val allStreams(0).to("topic1"..);
> > >  val allStreams(1).to("topic2"..);
> > >  val allStreams(2).to("topic3"..);
> > >
> > > HTH,
> > > Bill
> > >
> > >
> > >
> > > On Thu, Feb 7, 2019 at 11:51 AM Nan Xu  wrote:
> > >
> > > > hmm, but my DSL logic at beginning involve some join between
> different
> > > > streams, so I feel that will be quit complex to write everything in
> > PAPI.
> > > > what if I do this. in the transform, I return all 3 classes as a
> tuple.
> > > > then to map 3 times on the same stream like this
> > > > transformer {
> > > > return (class1Instance, class2Instance, class3Instance)
> > > > }
> > > > val kstream = inputStream.transform(transformer)
> > > > kstream.map((r1,r2,r3) => r1).to("topic1")
> > > > kstream.map((r1,r2,r3) => r2).to("topic2")
> > > > kstream.map((r1,r2,r3) => r3).to("topic3")
> > > > but don't know if it is the recommended way.
> > > >
> > > > Thanks,
> > > > Nan
> > > >
> > > > On Thu, Feb 7, 2019 at 10:12 AM Bill Bejeck 
> wrote:
> > > >
> > > > > Hi Nan,
> > > > >
> > > > > I wanted to follow up some more.
> > > > >
> > > > > Since you need your Transformer forward to 3 output topics or more
> > > > > generally any time you want a processor to forward to multiple
> child
> > > > nodes
> > > > > or specific nodes in the topology, you can best achieve this kind
> of
> > > > > control and flexibility using the PAPI.
> > > > >
> > > >

Re: kstream transform forward to different topics

2019-02-07 Thread Nan Xu
awesome, this solution is great, thanks a lot.

Nan

On Thu, Feb 7, 2019 at 2:28 PM Bill Bejeck  wrote:

> Hi Nan,
>
> l see what you are saying about reproducing a join in the PAPI.
>
> I have another thought.
>
>1. Have your Transform return a List [r1, r2, r3]
>2. Then after your transform operation use a  flatMapValues operator as
>this will forward KV pairs of (k, r1), (k, r2), and (k, r3).
>
> From there you have two choices.
>
>  1. If you are using Kafka Streams v 2.0+, you can create an instance of
> TopicNameExtractor.
> The TopicNameExtractor can return the appropriate topic name based on the
> instance type of the value.
> Then you would look something like
> inputStream.transform(transformer).flatMapValues(...).to(MyTopicChooser,
> Produced(...));
>
> 2. If you are using a version of Kafka Streams prior to v 2.0 then first
> create 3 org.apache.kafka.streams.kstream.Predicate instances.
>
>- Predicate p1 = (k,v) -> v instanceof r1;
>- Predicate p2 = (k,v) -> v instanceof r2;
>- Predicate p3 = (k,v) -> v instanceof r3;
>
>  You will still use the flatMapValues operator, but now you'd follow it
> with the branch operator and have the resulting stream instances in the
> array foward to the appropriate topic
>
>  val allStreams =
> inputStream.transform(transformer).flatMapValues(...).branch(p1,
> p2, p3);
>  val allStreams(0).to("topic1"..);
>  val allStreams(1).to("topic2"..);
>  val allStreams(2).to("topic3"..);
>
> HTH,
> Bill
>
>
>
> On Thu, Feb 7, 2019 at 11:51 AM Nan Xu  wrote:
>
> > hmm, but my DSL logic at beginning involve some join between different
> > streams, so I feel that will be quit complex to write everything in PAPI.
> > what if I do this. in the transform, I return all 3 classes as a tuple.
> > then to map 3 times on the same stream like this
> > transformer {
> > return (class1Instance, class2Instance, class3Instance)
> > }
> > val kstream = inputStream.transform(transformer)
> > kstream.map((r1,r2,r3) => r1).to("topic1")
> > kstream.map((r1,r2,r3) => r2).to("topic2")
> > kstream.map((r1,r2,r3) => r3).to("topic3")
> > but don't know if it is the recommended way.
> >
> > Thanks,
> > Nan
> >
> > On Thu, Feb 7, 2019 at 10:12 AM Bill Bejeck  wrote:
> >
> > > Hi Nan,
> > >
> > > I wanted to follow up some more.
> > >
> > > Since you need your Transformer forward to 3 output topics or more
> > > generally any time you want a processor to forward to multiple child
> > nodes
> > > or specific nodes in the topology, you can best achieve this kind of
> > > control and flexibility using the PAPI.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Thu, Feb 7, 2019 at 10:47 AM Bill Bejeck  wrote:
> > >
> > > > Hi Nan,
> > > >
> > > > What I'm suggesting is do the entire topology in the PAPI, sorry if I
> > > > didn't make this clear from before.
> > > >
> > > > Thanks,
> > > > Bill
> > > >
> > > > On Thu, Feb 7, 2019 at 10:41 AM Nan Xu  wrote:
> > > >
> > > >> thanks, just to make sure I understand this correctly,.
> > > >>
> > > >> I have some processing logic using DSL, after those processing, I
> > have a
> > > >> kstream, from this kstream, I need to do a transform and put result
> to
> > > >> different topics. To use processor api, I need to put this kstream
> to
> > a
> > > >> topic, then use topology.addSource("source-node", "input-topic");
> > > >> something like
> > > >>
> > > >>  val streamBuilder = new StreamsBuilder()
> > > >>  val inputStream = streamBuilder.stream[String,
> > StoreInput](begin_topic)
> > > >>
> > > >> //some DSL processing
> > > >> val resultKStream = inputStream.map().fitler..
> > > >>
> > > >>
> > > >> resultKStream .to("inter_topic")
> > > >>
> > > >> final Topology topology = new Topology();
> > > >> topology.addSource("source-node", " inter_topic");
> > > >> topology.addProcessor("transformer", () -> new MyTransfomer(),
> > > >> "source-node");
> > > >>
> > > >> so if I have to put my intermediate result to inter

Re: kstream transform forward to different topics

2019-02-07 Thread Nan Xu
hmm, but my DSL logic at beginning involve some join between different
streams, so I feel that will be quit complex to write everything in PAPI.
what if I do this. in the transform, I return all 3 classes as a tuple.
then to map 3 times on the same stream like this
transformer {
return (class1Instance, class2Instance, class3Instance)
}
val kstream = inputStream.transform(transformer)
kstream.map((r1,r2,r3) => r1).to("topic1")
kstream.map((r1,r2,r3) => r2).to("topic2")
kstream.map((r1,r2,r3) => r3).to("topic3")
but don't know if it is the recommended way.

Thanks,
Nan

On Thu, Feb 7, 2019 at 10:12 AM Bill Bejeck  wrote:

> Hi Nan,
>
> I wanted to follow up some more.
>
> Since you need your Transformer forward to 3 output topics or more
> generally any time you want a processor to forward to multiple child nodes
> or specific nodes in the topology, you can best achieve this kind of
> control and flexibility using the PAPI.
>
> Thanks,
> Bill
>
> On Thu, Feb 7, 2019 at 10:47 AM Bill Bejeck  wrote:
>
> > Hi Nan,
> >
> > What I'm suggesting is do the entire topology in the PAPI, sorry if I
> > didn't make this clear from before.
> >
> > Thanks,
> > Bill
> >
> > On Thu, Feb 7, 2019 at 10:41 AM Nan Xu  wrote:
> >
> >> thanks, just to make sure I understand this correctly,.
> >>
> >> I have some processing logic using DSL, after those processing, I have a
> >> kstream, from this kstream, I need to do a transform and put result to
> >> different topics. To use processor api, I need to put this kstream to a
> >> topic, then use topology.addSource("source-node", "input-topic");
> >> something like
> >>
> >>  val streamBuilder = new StreamsBuilder()
> >>  val inputStream = streamBuilder.stream[String, StoreInput](begin_topic)
> >>
> >> //some DSL processing
> >> val resultKStream = inputStream.map().fitler..
> >>
> >>
> >> resultKStream .to("inter_topic")
> >>
> >> final Topology topology = new Topology();
> >> topology.addSource("source-node", " inter_topic");
> >> topology.addProcessor("transformer", () -> new MyTransfomer(),
> >> "source-node");
> >>
> >> so if I have to put my intermediate result to inter_topic, is there any
> >> performance implication? Not sure if I am right, but sounds to me that
> >> will
> >> cause one more hop from client(stream app) to kakfa brokers.  beginning
> >> DSL
> >> processing is happening on the client side.  then have to put the result
> >> back to broker, then read back to client to use processor api.
> >>
> >> Thanks,
> >> Nan
> >>
> >>
> >>
> >>
> >> On Thu, Feb 7, 2019 at 9:18 AM Bill Bejeck  wrote:
> >>
> >> > Hi Nan,
> >> >
> >> > To forward to the 3 different topics it will probably be easier to do
> >> this
> >> > in the Processor API.  Based off what your stated in your question,
> the
> >> > topology will look something like this:
> >> >
> >> > final Topology topology = new Topology();
> >> > topology.addSource("source-node", "input-topic");
> >> > topology.addProcessor("transformer", () -> new MyTransfomer(),
> >> > "source-node");
> >> > topology.addSink("sink-node-1", "output-topic-1", "transformer");
> >> > topology.addSink("sink-node-2", "output-topic-2", "transformer");
> >> > topology.addSink("sink-node-3", "output-topic-3", "transformer");
> >> >
> >> > As you can see, the "transformer" is the parent node of all 3 sink
> >> nodes.
> >> > Then in your Transformer, you can forward the key-value pairs by using
> >> one
> >> > of two approaches.
> >> >
> >> > Sending to all child nodes with this call:
> >> >
> >> > context().forward(key, value, To.all()).
> >> >
> >> > Or by listing each child node individually like so
> >> >
> >> > context().forward(key, value, To.child("sink-node-1"));
> >> > context().forward(key, value, To.child("sink-node-2"));
> >> > context().forward(key, value, To.child("sink-node-3"));
> >> >
> >> > HTH,
> >> >
> >> > Bill
> >> >
> >> >
> >> >
> >> >
> >> > On Thu, Feb 7, 2019 at 12:13 AM Nan Xu  wrote:
> >> >
> >> > > when I do the transform, for a single input record, I need to
> output 3
> >> > > different records, those 3 records are in different classes.  I want
> >> to
> >> > > send the each type of records to a separate topic, my understanding
> >> is I
> >> > > should use
> >> > >
> >> > > context.forward inside the transformer  like
> >> > >
> >> > > Transformer{..
> >> > > context.forward(key, record1, To.child("topic1"))
> >> > > context.forward(key, value1, To.child("topic2"))
> >> > > }
> >> > > but how do I define those processor, I can create them in topology
> but
> >> > who
> >> > > should be their parent? what's the name of the parent?
> >> > >
> >> > > stream.transform(transformer) don't give me a way to say processor
> >> name.
> >> > >
> >> > > Thanks,
> >> > > Nan
> >> > >
> >> >
> >>
> >
>


Re: kstream transform forward to different topics

2019-02-07 Thread Nan Xu
thanks, just to make sure I understand this correctly,.

I have some processing logic using DSL, after those processing, I have a
kstream, from this kstream, I need to do a transform and put result to
different topics. To use processor api, I need to put this kstream to a
topic, then use topology.addSource("source-node", "input-topic");
something like

 val streamBuilder = new StreamsBuilder()
 val inputStream = streamBuilder.stream[String, StoreInput](begin_topic)

//some DSL processing
val resultKStream = inputStream.map().fitler..


resultKStream .to("inter_topic")

final Topology topology = new Topology();
topology.addSource("source-node", " inter_topic");
topology.addProcessor("transformer", () -> new MyTransfomer(),
"source-node");

so if I have to put my intermediate result to inter_topic, is there any
performance implication? Not sure if I am right, but sounds to me that will
cause one more hop from client(stream app) to kakfa brokers.  beginning DSL
processing is happening on the client side.  then have to put the result
back to broker, then read back to client to use processor api.

Thanks,
Nan




On Thu, Feb 7, 2019 at 9:18 AM Bill Bejeck  wrote:

> Hi Nan,
>
> To forward to the 3 different topics it will probably be easier to do this
> in the Processor API.  Based off what your stated in your question, the
> topology will look something like this:
>
> final Topology topology = new Topology();
> topology.addSource("source-node", "input-topic");
> topology.addProcessor("transformer", () -> new MyTransfomer(),
> "source-node");
> topology.addSink("sink-node-1", "output-topic-1", "transformer");
> topology.addSink("sink-node-2", "output-topic-2", "transformer");
> topology.addSink("sink-node-3", "output-topic-3", "transformer");
>
> As you can see, the "transformer" is the parent node of all 3 sink nodes.
> Then in your Transformer, you can forward the key-value pairs by using one
> of two approaches.
>
> Sending to all child nodes with this call:
>
> context().forward(key, value, To.all()).
>
> Or by listing each child node individually like so
>
> context().forward(key, value, To.child("sink-node-1"));
> context().forward(key, value, To.child("sink-node-2"));
> context().forward(key, value, To.child("sink-node-3"));
>
> HTH,
>
> Bill
>
>
>
>
> On Thu, Feb 7, 2019 at 12:13 AM Nan Xu  wrote:
>
> > when I do the transform, for a single input record, I need to output 3
> > different records, those 3 records are in different classes.  I want to
> > send the each type of records to a separate topic, my understanding is I
> > should use
> >
> > context.forward inside the transformer  like
> >
> > Transformer{..
> > context.forward(key, record1, To.child("topic1"))
> > context.forward(key, value1, To.child("topic2"))
> > }
> > but how do I define those processor, I can create them in topology but
> who
> > should be their parent? what's the name of the parent?
> >
> > stream.transform(transformer) don't give me a way to say processor name.
> >
> > Thanks,
> > Nan
> >
>


kstream transform forward to different topics

2019-02-06 Thread Nan Xu
when I do the transform, for a single input record, I need to output 3
different records, those 3 records are in different classes.  I want to
send the each type of records to a separate topic, my understanding is I
should use

context.forward inside the transformer  like

Transformer{..
context.forward(key, record1, To.child("topic1"))
context.forward(key, value1, To.child("topic2"))
}
but how do I define those processor, I can create them in topology but who
should be their parent? what's the name of the parent?

stream.transform(transformer) don't give me a way to say processor name.

Thanks,
Nan


kafka stream depends on it's own derived table

2019-01-28 Thread Nan Xu
hi,
I was writing a simple stream app, all it does is producer send a sequence
of path and value, for example
path /0 ,  value 1
path /0/1,  value 2
path /0/1/2, value 3
and kafka stream take those input and produce a ktable store.

There is a rule. if parent path is not exist, then child can not insert.
so if /0/1 is not there,  insert /0/1/2 should be filter out.

I use the following program to process it.  path send as /0, /0/1, /0/1/2,
., /0/1/../9.

Because the filter is depends on the ktable store, which was build after
the filter stream.  When filter check for a path if its parent exist, it
could be the parent path already pass the filter, but not at the store
yet,  but from filter, it think the parent is not exist. this is more of a
problem of asyn processing. because the parent is not fully done( to the
store), and next element start processing (filter)

Another problem is because parent key and child key are different, so the
path arrival seq could be different as producer send sequence, which also
cause the child get filter out.  producer send as /0, /0/1, /0/1/2.. but
broker get it as /0, /0/1/2, /0/1,.then all the following path will be
filter out, because /0/1/2 don't get a chance to get created.

any thoughts to solve this?

Thanks,
Nan


val streamProperties = new Properties()
  streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
"my-first-streams-application1")
  streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
  streamProperties.put(StreamsConfig.CLIENT_ID_CONFIG,
"important-test-client")
  streamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
classOf[StringSerde].getName)
  streamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
classOf[IntegerSerde].getName)

  val streamBuilder = new StreamsBuilder()
  val topic = "input"

  val inputStream = streamBuilder.stream[String, Integer](topic)

  val materialized = Materialized.as[String, Integer, KeyValueStore[Bytes,
Array[Byte]]](topic + "_store")
.withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())

  val reducer = new Reducer[Integer](){
override def apply(value1: Integer, value2: Integer): Integer = {
  value2
}
  }

  //value is not important, only care key.
  val ktable = inputStream.filter(filter).groupByKey().reduce(reducer,
materialized)

  // make sure parent exist.
  def filter(key: String, value: Integer): Boolean = {
println("===current store===, checking key " + key + " value: " + value
)
store.all().forEachRemaining(x => println(x.key))
val parent = key.trim().substring(0, key.lastIndexOf("/"))
if(parent == "") {
  true
} else {
  if (store.get(parent) == null) {
println("not found parent" + parent)
false
  } else {
true
  }
}
  }

  val topology = streamBuilder.build()
  val streams = new KafkaStreams(topology, streamProperties)
  streams.start()

  Thread.sleep(6000)
  val storeName = ktable.queryableStoreName()
  val store = streams.store(storeName,
QueryableStoreTypes.keyValueStore[String, Integer])


val senderProperties = new Properties
  senderProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
  senderProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer")
  senderProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName)
  senderProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
classOf[IntegerSerializer].getName)
  val producer = new KafkaProducer[String, Integer](senderProperties)


  for(j <- 1 until 10) {
val path = for(i <- 0 until j) yield {
  "/" + i
}
producer.send(new ProducerRecord(topic, path.mkString(""), j))
  }

  Thread.sleep(3000)
  println("show final store state")
  store.all().forEachRemaining(x => println(x.key))

Thread.sleep(1000)


output:
===current store===, checking key /0 value: 1
===current store===, checking key /0/1 value: 2
/0
===current store===, checking key /0/1/2/3 value: 4
/0/1
/0
not found parent/0/1/2
===current store===, checking key /0/1/2 value: 3
/0/1
/0
===current store===, checking key /0/1/2/3/4/5 value: 6
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4
===current store===, checking key /0/1/2/3/4 value: 5
/0/1
/0
/0/1/2
not found parent/0/1/2/3
===current store===, checking key /0/1/2/3/4/5/6 value: 7
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4/5
===current store===, checking key /0/1/2/3/4/5/6/7/8 value: 9
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4/5/6/7
===current store===, checking key /0/1/2/3/4/5/6/7 value: 8
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4/5/6
show final store state
/0/1
/0
/0/1/2


manually trigger log compaction

2018-09-26 Thread Xu, Nan
Hi, 

   Wondering is there a way to manually trigger a log compaction for a certain 
topic?

Thanks,
Nan

--
This message, and any attachments, is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/emaildisclaimer.   If you are not the intended 
recipient, please delete this message.


Re: kafka stream latency

2018-08-28 Thread Nan Xu
understand, thanks for all the help

On Mon, Aug 27, 2018 at 2:16 PM Guozhang Wang  wrote:

> Hello Nan,
>
> What you described seems to be a broker-side latency spike, not a client
> (either producer, or consumer, or streams)-side latency spike. There's a
> number of possible reasons for such spikes: disk flushing (though async, it
> can also cause the processing thread to halt), GC, page faults (in case the
> thread needs to access a cold page), etc. It is hard to tell which one is
> the actual root cause.
>
> For example, you can take a look at this slides (starting at 14), for a
> concrete example of such an investigation:
>
> https://www.slideshare.net/kawamuray/multitenancy-kafka-clusters-for-everyone-at-line
>
> My point is that it is not really easy via email discussion and by looking
> at your experiment code to tell exactly what is the root cause: the
> community can share with your some past experience and a few quick hinters,
> but most likely the issue varies case by case and hence can only be fully
> understandable by yourself.
>
>
> Guozhang
>
> On Sat, Aug 25, 2018 at 6:58 PM, Nan Xu  wrote:
>
> > maybe easier to use github.
> >
> >  https://github.com/angelfox123/kperf
> >
> >
> > On Sat, Aug 25, 2018 at 8:43 PM Nan Xu  wrote:
> >
> > > so I did upgrade to 2.0.0 and still seeing the same result. below is
> the
> > > program I am using.  I am running everything on a single server.
> (centos
> > 7,
> > > 24 core, 32G ram , 1 broker, 1 zookeeper, single harddrive), I
> understand
> > > the single hard drive is less ideal. but still don't expect it can
> over 3
> > > seconds.
> > >
> > >
> > > case 1.
> > > I create 1 parittions for input and  1 partition for output. message
> size
> > > 10K
> > > producer give parameter  (3600, 1000, 2)   // 2 message per 1000 micro
> > > second for 3600 seconds, that translate to 2,000 message/s, I still see
> > > latency, sometime can reach to 3 seconds.
> > >
> > > case 2
> > > 50 partitions for input, and 50 partitions for output. message size 10K
> > > producer give parameter  (3600, 1000, 20)   // 20 message per 1000
> micro
> > > second for 3600 seconds, that translate to 20,000 message/s,latency not
> > > only high, and happen more often.
> > >
> > >
> > > Any suggestion is appreciated. target is per partition handle 1,000 --
> > > 2,000 message/s and all latency lower than 100ms.
> > >
> > > build.gradle==
> > > plugins {
> > > id 'application'
> > > id 'java'
> > > }
> > > group 'com.bofa'
> > > version '1.0-SNAPSHOT'
> > > sourceCompatibility = 1.8
> > > mainClassName="main.StreamApp"
> > >
> > > repositories {
> > > mavenCentral()
> > > }
> > >
> > > dependencies {
> > > compile group: 'org.apache.kafka', name: 'kafka-clients', version:
> > > '2.0.0'
> > > compile group: "org.apache.kafka", name: "kafka-streams", version:
> > > "2.0.0"
> > > compile group: 'io.dropwizard.metrics', name: 'metrics-core',
> > > version:'3.2.6'
> > > testCompile group: 'junit', name: 'junit', version: '4.12'
> > > }
> > >
> > > producer
> > > package main;
> > >
> > > import java.util.Properties;
> > > import java.util.concurrent.atomic.AtomicInteger;
> > >
> > > import Util.BusyTimer;
> > > import org.apache.kafka.clients.producer.KafkaProducer;
> > > import org.apache.kafka.clients.producer.ProducerConfig;
> > > import org.apache.kafka.clients.producer.ProducerRecord;
> > > import org.apache.kafka.common.serialization.Serde;
> > > import org.apache.kafka.common.serialization.Serdes;
> > > import org.apache.kafka.common.serialization.StringSerializer;
> > >
> > > public class SimpleProducer {
> > > public static void main(String[] args) {
> > > final int time =Integer.valueOf(args[0]);
> > > final long interval = Integer.valueOf(args[1]);
> > > final int batch =Integer.valueOf(args[2]);
> > > Properties props = new Properties();
> > > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > "localhost:9092");
> > > props.put(ProducerConfig.CLIENT_ID_CONFIG,
> > > "kafka-perf-test-producer");
> > > props.put(ProducerConfig.KEY_SERI

Re: kafka stream latency

2018-08-25 Thread Nan Xu
maybe easier to use github.

 https://github.com/angelfox123/kperf


On Sat, Aug 25, 2018 at 8:43 PM Nan Xu  wrote:

> so I did upgrade to 2.0.0 and still seeing the same result. below is the
> program I am using.  I am running everything on a single server. (centos 7,
> 24 core, 32G ram , 1 broker, 1 zookeeper, single harddrive), I understand
> the single hard drive is less ideal. but still don't expect it can over 3
> seconds.
>
>
> case 1.
> I create 1 parittions for input and  1 partition for output. message size
> 10K
> producer give parameter  (3600, 1000, 2)   // 2 message per 1000 micro
> second for 3600 seconds, that translate to 2,000 message/s, I still see
> latency, sometime can reach to 3 seconds.
>
> case 2
> 50 partitions for input, and 50 partitions for output. message size 10K
> producer give parameter  (3600, 1000, 20)   // 20 message per 1000 micro
> second for 3600 seconds, that translate to 20,000 message/s,latency not
> only high, and happen more often.
>
>
> Any suggestion is appreciated. target is per partition handle 1,000 --
> 2,000 message/s and all latency lower than 100ms.
>
> build.gradle==
> plugins {
> id 'application'
> id 'java'
> }
> group 'com.bofa'
> version '1.0-SNAPSHOT'
> sourceCompatibility = 1.8
> mainClassName="main.StreamApp"
>
> repositories {
> mavenCentral()
> }
>
> dependencies {
> compile group: 'org.apache.kafka', name: 'kafka-clients', version:
> '2.0.0'
> compile group: "org.apache.kafka", name: "kafka-streams", version:
> "2.0.0"
> compile group: 'io.dropwizard.metrics', name: 'metrics-core',
> version:'3.2.6'
> testCompile group: 'junit', name: 'junit', version: '4.12'
> }
>
> producer
> package main;
>
> import java.util.Properties;
> import java.util.concurrent.atomic.AtomicInteger;
>
> import Util.BusyTimer;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.Serde;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.common.serialization.StringSerializer;
>
> public class SimpleProducer {
> public static void main(String[] args) {
> final int time =Integer.valueOf(args[0]);
> final long interval = Integer.valueOf(args[1]);
> final int batch =Integer.valueOf(args[2]);
> Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> props.put(ProducerConfig.CLIENT_ID_CONFIG,
> "kafka-perf-test-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class);
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class);
>
> KafkaProducer kafkaProducer = new
> KafkaProducer(props);
>
> StringBuffer buffer = new StringBuffer();
> for(int i=0; i<10240; i++) {
> buffer.append('a');
> }
> String value = buffer.toString();
>
> final long speed = 100/interval;
> Runnable task = new Runnable() {
> int sendNum=0;
> @Override
> public void run() {
>
> for(int i=0; i ProducerRecord record = new
> ProducerRecord<>("input",  System.nanoTime() + "-" + value);
> kafkaProducer.send(record);
> sendNum++;
> }
>
> if(sendNum % (speed * batch) == 0){
> System.out.println(System.currentTimeMillis() + " : "
> + sendNum);
> }
> }
> };
>
> BusyTimer timer = new BusyTimer(interval,time, task);
> timer.spaceMessageWithInterval();
> }
> }
>
>
> kafka stream=
> package main;
>
> import java.util.Properties;
>
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.Consumed;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.KStream;
> import org.apache.kafka.streams.kstream.Produced;
>
> public class StreamApp {
> public static void main(String[] args) {
> final Properties streamsConfiguration = new Pro

Re: kafka stream latency

2018-08-25 Thread Nan Xu
final KStream inputStream = builder.stream(
"input",
Consumed.with(
new Serdes.StringSerde(),
new Serdes.StringSerde()
)
);

inputStream.to(
"output",
Produced.with(new Serdes.StringSerde(), new
Serdes.StringSerde())
);

KafkaStreams streams = new KafkaStreams(builder.build(),
streamsConfiguration);
streams.start();
}
}

=consumer
package main;

import java.util.Collections;
import java.util.Properties;

import com.codahale.metrics.Reservoir;
import com.codahale.metrics.UniformReservoir;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class SimpleConsumer {
public static void main(String[] args) {
int expectedSpeed = args[0];
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"kafka-perf-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList("output"));

consumer.poll(0);
int recNum=0;

Reservoir totalRes = new UniformReservoir();

while (true) {
ConsumerRecords records = consumer.poll(10);
for(ConsumerRecord record : records){
long sendTime = Long.valueOf(record.value().split("-")[0]);
long takeTime = System.nanoTime() - sendTime;
if(recNum> 2) {
totalRes.update(takeTime);
}
recNum++;

if(recNum % expectedSpeed == 0){
System.out.println("==="+ recNum +
"");
System.out.println("  mean: " +
totalRes.getSnapshot().getMean()/100);
System.out.println("  75%: " +
totalRes.getSnapshot().get75thPercentile()/100);
System.out.println("  99%: " +
totalRes.getSnapshot().get99thPercentile()/100);
System.out.println("  99.9%: " +
totalRes.getSnapshot().get999thPercentile()/100);
System.out.println("  Max: " +
totalRes.getSnapshot().getMax()/100);

System.out.println("");
totalRes = new UniformReservoir();
}
};
}
}
}

==busy timer=
//idea is space the message at a fixed time.(as thread.sleep, but sleep is
less accurate)
package Util;

import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class BusyTimer {
long interval;
long duration;
ArrayList pubTime;
ExecutorService ex = Executors.newSingleThreadExecutor();
Runnable task;


public BusyTimer(long microInterval, long exDurationInSeconds, Runnable
task){
pubTime = new ArrayList((int)(exDurationInSeconds * 1000 *
1000 / microInterval+1));

this.interval = microInterval * 1000;
this.duration = exDurationInSeconds * 10;
this.task = task;

}

private void busywaitUntil(long nano){
while(System.nanoTime() < nano){

}
}

public void spaceMessageWithInterval(){
int i =0 ;
long baseTime = System.nanoTime();
long doneTime = baseTime + duration;
while(true) {
task.run();
pubTime.add(System.nanoTime());
long targetTime = System.nanoTime() + interval;
if(System.nanoTime() > doneTime ){
break;
}
busywaitUntil(targetTime);
}
}
}



On Fri, Aug 24, 2018 at 3:37 PM Nan Xu  wrote:

> Looks really promising but after upgrade, still show the same result. I
> will post the program soon. Maybe you can see where the problem could be.
>
> Nan
>
> On Thu, Aug 23, 2018, 7:34 PM Guozhang Wang  wrote:
>
>> Hello Nan,
>>
>> Kafka does not tie up the processing thread to do disk flushing. However,
>> since

kafka stream latency

2018-08-24 Thread Nan Xu
Hi Guozhang,

Here is the very simple kafka producer/consumer/stream app, using the
latest version and just create 2 topics
   input and output

all component are just running on localhost.

Thanks,
Nan

-Original Message-
From: Nan Xu [mailto:nanxu1...@gmail.com]
Sent: Friday, August 24, 2018 3:37 PM
To: users@kafka.apache.org
Subject: Re: kafka stream latency

Looks really promising but after upgrade, still show the same result. I
will post the program soon. Maybe you can see where the problem could be.

Nan

On Thu, Aug 23, 2018, 7:34 PM Guozhang Wang  wrote:

> Hello Nan,
>
> Kafka does not tie up the processing thread to do disk flushing. However,
> since you are on an older version of Kafka I suspect you're bumping into
> some old issues that have been resolved in later versions. e.g.
>
>
https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_KAFKA-2D4614=DwIFaQ=SFszdw3oxIkTvaP4xmzq_apLU3uL-3SxdAPNkldf__Q=HB5LZowSGF4DiMmOUsCX6Q=p13WzjDTEfIPtmkgDwg30Z1iD58RW7K-7665XOlZN00=DvInsC6YF2LJjWFYtUE1OlimJB2ULrJfAXETSgsQZuU=
>
> I'd suggest you upgrading to latest version (2.0.0) and try again to see
if
> you observe the same pattern.
>
> Guozhang
>
> On Thu, Aug 23, 2018 at 3:22 PM, Sudhir Babu Pothineni <
> sbpothin...@gmail.com> wrote:
>
> > I will wait for the expert’s opinion:
> >
> > Did the Transparent Huge Pages(THP) disabled on the broker machine? it’s
> a
> > Linux kernel parameter.
> >
> > -Sudhir
> >
> > > On Aug 23, 2018, at 4:46 PM, Nan Xu  wrote:
> > >
> > > I think I found where the problem is, how to solve and why, still not
> > sure.
> > >
> > > it related to disk (maybe flushing?). I did a single machine, single
> > node,
> > > single topic and single partition setup.  producer pub as 2000
> message/s,
> > > 10K size message size. and single key.
> > >
> > > when I save kafka log to the  memory based partition, I don't see a
> > latency
> > > over 100ms. top around 70ms.
> > > when I save to a ssd hard drive. I do see latency spike, sometime over
> > 1s.
> > >
> > > adjust the log.flush.inteval.message / log.flush.intefval.ms has
> impact,
> > > but only to make thing worse... need suggestion.
> > >
> > > I think log flushing is totally async and done by OS in the default
> > > setting. does kafka has to wait when flushing data to disk?
> > >
> > > Thanks,
> > > Nan
> > >
> > >
> > >
> > >> On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang 
> > wrote:
> > >>
> > >> Given your application code:
> > >>
> > >> 
> > >>
> > >> final KStream localDeltaStream =
builder.stream(
> > >>
> > >>localDeltaTopic,
> > >>
> > >>Consumed.with(
> > >>
> > >>new Serdes.StringSerde(),
> > >>
> > >>new NodeMutationSerde<>()
> > >>
> > >>)
> > >>
> > >>);
> > >>
> > >>  KStream localHistStream =
> > localDeltaStream.mapValues(
> > >>
> > >>(mutation) -> NodeState
> > >>
> > >>.newBuilder()
> > >>
> > >>.setMeta(
> > >>
> > >>mutation.getMetaMutation().getMeta()
> > >>
> > >>)
> > >>
> > >>.setValue(
> > >>
> > >>mutation.getValueMutation().getValue()
> > >>
> > >>)
> > >>
> > >>.build()
> > >>
> > >>);
> > >>
> > >>  localHistStream.to(
> > >>
> > >>localHistTopic,
> > >>
> > >>Produced.with(new Serdes.StringSerde(), new
> > NodeStateSerde<>())
> > >>
> > >>);
> > >>
> > >> 
> > >>
> > >> which is pure stateless, committing will not touch on an state
> > directory at
> > >> all. Hence committing only involves committing offsets to Kafka.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >>> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu  wrote:
> > >>

RE: kafka stream latency

2018-08-24 Thread Xu, Nan
Hi Guozhang, 

Here is the very simple kafka producer/consumer/stream app, using the 
latest version and just create 2 topics
   input and output

all component are just running on localhost.

Sorry, git hub is not allowed from work, nor java extension file

Thanks,
Nan

-Original Message-
From: Nan Xu [mailto:nanxu1...@gmail.com] 
Sent: Friday, August 24, 2018 3:37 PM
To: users@kafka.apache.org
Subject: Re: kafka stream latency

Looks really promising but after upgrade, still show the same result. I
will post the program soon. Maybe you can see where the problem could be.

Nan

On Thu, Aug 23, 2018, 7:34 PM Guozhang Wang  wrote:

> Hello Nan,
>
> Kafka does not tie up the processing thread to do disk flushing. However,
> since you are on an older version of Kafka I suspect you're bumping into
> some old issues that have been resolved in later versions. e.g.
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_KAFKA-2D4614=DwIFaQ=SFszdw3oxIkTvaP4xmzq_apLU3uL-3SxdAPNkldf__Q=HB5LZowSGF4DiMmOUsCX6Q=p13WzjDTEfIPtmkgDwg30Z1iD58RW7K-7665XOlZN00=DvInsC6YF2LJjWFYtUE1OlimJB2ULrJfAXETSgsQZuU=
>
> I'd suggest you upgrading to latest version (2.0.0) and try again to see if
> you observe the same pattern.
>
> Guozhang
>
> On Thu, Aug 23, 2018 at 3:22 PM, Sudhir Babu Pothineni <
> sbpothin...@gmail.com> wrote:
>
> > I will wait for the expert’s opinion:
> >
> > Did the Transparent Huge Pages(THP) disabled on the broker machine? it’s
> a
> > Linux kernel parameter.
> >
> > -Sudhir
> >
> > > On Aug 23, 2018, at 4:46 PM, Nan Xu  wrote:
> > >
> > > I think I found where the problem is, how to solve and why, still not
> > sure.
> > >
> > > it related to disk (maybe flushing?). I did a single machine, single
> > node,
> > > single topic and single partition setup.  producer pub as 2000
> message/s,
> > > 10K size message size. and single key.
> > >
> > > when I save kafka log to the  memory based partition, I don't see a
> > latency
> > > over 100ms. top around 70ms.
> > > when I save to a ssd hard drive. I do see latency spike, sometime over
> > 1s.
> > >
> > > adjust the log.flush.inteval.message / log.flush.intefval.ms has
> impact,
> > > but only to make thing worse... need suggestion.
> > >
> > > I think log flushing is totally async and done by OS in the default
> > > setting. does kafka has to wait when flushing data to disk?
> > >
> > > Thanks,
> > > Nan
> > >
> > >
> > >
> > >> On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang 
> > wrote:
> > >>
> > >> Given your application code:
> > >>
> > >> 
> > >>
> > >> final KStream localDeltaStream = builder.stream(
> > >>
> > >>localDeltaTopic,
> > >>
> > >>Consumed.with(
> > >>
> > >>new Serdes.StringSerde(),
> > >>
> > >>new NodeMutationSerde<>()
> > >>
> > >>)
> > >>
> > >>);
> > >>
> > >>  KStream localHistStream =
> > localDeltaStream.mapValues(
> > >>
> > >>(mutation) -> NodeState
> > >>
> > >>.newBuilder()
> > >>
> > >>.setMeta(
> > >>
> > >>mutation.getMetaMutation().getMeta()
> > >>
> > >>)
> > >>
> > >>.setValue(
> > >>
> > >>mutation.getValueMutation().getValue()
> > >>
> > >>)
> > >>
> > >>.build()
> > >>
> > >>);
> > >>
> > >>  localHistStream.to(
> > >>
> > >>localHistTopic,
> > >>
> > >>Produced.with(new Serdes.StringSerde(), new
> > NodeStateSerde<>())
> > >>
> > >>);
> > >>
> > >> 
> > >>
> > >> which is pure stateless, committing will not touch on an state
> > directory at
> > >> all. Hence committing only involves committing offsets to Kafka.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >>> On Wed, Au

Re: kafka stream latency

2018-08-24 Thread Nan Xu
Looks really promising but after upgrade, still show the same result. I
will post the program soon. Maybe you can see where the problem could be.

Nan

On Thu, Aug 23, 2018, 7:34 PM Guozhang Wang  wrote:

> Hello Nan,
>
> Kafka does not tie up the processing thread to do disk flushing. However,
> since you are on an older version of Kafka I suspect you're bumping into
> some old issues that have been resolved in later versions. e.g.
>
> https://issues.apache.org/jira/browse/KAFKA-4614
>
> I'd suggest you upgrading to latest version (2.0.0) and try again to see if
> you observe the same pattern.
>
> Guozhang
>
> On Thu, Aug 23, 2018 at 3:22 PM, Sudhir Babu Pothineni <
> sbpothin...@gmail.com> wrote:
>
> > I will wait for the expert’s opinion:
> >
> > Did the Transparent Huge Pages(THP) disabled on the broker machine? it’s
> a
> > Linux kernel parameter.
> >
> > -Sudhir
> >
> > > On Aug 23, 2018, at 4:46 PM, Nan Xu  wrote:
> > >
> > > I think I found where the problem is, how to solve and why, still not
> > sure.
> > >
> > > it related to disk (maybe flushing?). I did a single machine, single
> > node,
> > > single topic and single partition setup.  producer pub as 2000
> message/s,
> > > 10K size message size. and single key.
> > >
> > > when I save kafka log to the  memory based partition, I don't see a
> > latency
> > > over 100ms. top around 70ms.
> > > when I save to a ssd hard drive. I do see latency spike, sometime over
> > 1s.
> > >
> > > adjust the log.flush.inteval.message / log.flush.intefval.ms has
> impact,
> > > but only to make thing worse... need suggestion.
> > >
> > > I think log flushing is totally async and done by OS in the default
> > > setting. does kafka has to wait when flushing data to disk?
> > >
> > > Thanks,
> > > Nan
> > >
> > >
> > >
> > >> On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang 
> > wrote:
> > >>
> > >> Given your application code:
> > >>
> > >> 
> > >>
> > >> final KStream localDeltaStream = builder.stream(
> > >>
> > >>localDeltaTopic,
> > >>
> > >>Consumed.with(
> > >>
> > >>new Serdes.StringSerde(),
> > >>
> > >>new NodeMutationSerde<>()
> > >>
> > >>)
> > >>
> > >>);
> > >>
> > >>  KStream localHistStream =
> > localDeltaStream.mapValues(
> > >>
> > >>(mutation) -> NodeState
> > >>
> > >>.newBuilder()
> > >>
> > >>.setMeta(
> > >>
> > >>mutation.getMetaMutation().getMeta()
> > >>
> > >>)
> > >>
> > >>.setValue(
> > >>
> > >>mutation.getValueMutation().getValue()
> > >>
> > >>)
> > >>
> > >>.build()
> > >>
> > >>);
> > >>
> > >>  localHistStream.to(
> > >>
> > >>localHistTopic,
> > >>
> > >>Produced.with(new Serdes.StringSerde(), new
> > NodeStateSerde<>())
> > >>
> > >>);
> > >>
> > >> 
> > >>
> > >> which is pure stateless, committing will not touch on an state
> > directory at
> > >> all. Hence committing only involves committing offsets to Kafka.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >>> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu  wrote:
> > >>>
> > >>> I was suspecting that too, but I also noticed the spike is not spaced
> > >>> around 10s. to further prove it. I put kafka data directory in a
> memory
> > >>> based directory.  it still has such latency spikes.  I am going to
> test
> > >> it
> > >>> on a single broker, single partition env.  will report back soon.
> > >>>
> > >>> On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang 
> > >> wrote:
> > >>>
> > >>>> Hello Nan,
> > >>>>
&g

Re: kafka stream latency

2018-08-23 Thread Nan Xu
I think I found where the problem is, how to solve and why, still not sure.

it related to disk (maybe flushing?). I did a single machine, single node,
single topic and single partition setup.  producer pub as 2000 message/s,
10K size message size. and single key.

when I save kafka log to the  memory based partition, I don't see a latency
over 100ms. top around 70ms.
when I save to a ssd hard drive. I do see latency spike, sometime over 1s.

adjust the log.flush.inteval.message / log.flush.intefval.ms has impact,
but only to make thing worse... need suggestion.

I think log flushing is totally async and done by OS in the default
setting. does kafka has to wait when flushing data to disk?

Thanks,
Nan



On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang  wrote:

> Given your application code:
>
> 
>
>  final KStream localDeltaStream = builder.stream(
>
> localDeltaTopic,
>
> Consumed.with(
>
> new Serdes.StringSerde(),
>
> new NodeMutationSerde<>()
>
> )
>
> );
>
>   KStream localHistStream = localDeltaStream.mapValues(
>
> (mutation) -> NodeState
>
> .newBuilder()
>
> .setMeta(
>
> mutation.getMetaMutation().getMeta()
>
> )
>
> .setValue(
>
> mutation.getValueMutation().getValue()
>
> )
>
> .build()
>
> );
>
>   localHistStream.to(
>
> localHistTopic,
>
> Produced.with(new Serdes.StringSerde(), new NodeStateSerde<>())
>
> );
>
> 
>
> which is pure stateless, committing will not touch on an state directory at
> all. Hence committing only involves committing offsets to Kafka.
>
>
> Guozhang
>
>
> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu  wrote:
>
> > I was suspecting that too, but I also noticed the spike is not spaced
> > around 10s. to further prove it. I put kafka data directory in a memory
> > based directory.  it still has such latency spikes.  I am going to test
> it
> > on a single broker, single partition env.  will report back soon.
> >
> > On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang 
> wrote:
> >
> > > Hello Nan,
> > >
> > > Thanks for the detailed information you shared. When Kafka Streams is
> > > normally running, no rebalances should be triggered unless some of the
> > > instances (in your case, docker containers) have soft failures.
> > >
> > > I suspect the latency spike is due to the commit intervals: streams
> will
> > > try to commit its offset at a regular paces, which may increase
> latency.
> > It
> > > is controlled by the "commit.interval.ms" config value. I saw that in
> > your
> > > original email you've set it to 10 * 1000 (10 seconds). Is that aligned
> > > with the frequency you observe latency spikes?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu  wrote:
> > >
> > > > did more test and and make the test case simple.
> > > > all the setup now is a single physical machine. running 3 docker
> > > instance.
> > > > a1, a2, a3
> > > >
> > > > kafka + zookeeper running on all of those docker containers.
> > > > producer running on a1, send a single key,  update speed 2000
> > message/s,
> > > > each message is 10K size.
> > > > 3 consumer(different group)  are running. one on each docker.
> > > > all topics are pre-created.
> > > > in startup, I do see some latency greater than 100ms, which is fine.
> > and
> > > > then everything is good. latency is low and consumer don't see
> anything
> > > > over 100ms for a while.
> > > > then I see a few messages have latency over 100ms. then back to
> normal,
> > > > then happen again. do seems like gc problem. but I check the gc
> > > log.  I
> > > > don't think it can cause over 100ms. (both are G1 collector)
> > > >
> > > > after the stream stable running( exclude the startup), the first
> > message
> > > > over 100ms take 179ms  and the gc ( it has a 30ms pause, but should
> not
> > > > cause a 179ms end to end).
> > > >
> > > > FROM APP
> > > >
> > > > 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure)
> > > > 318473

Re: kafka local single partition, what's the limit?

2018-08-22 Thread Nan Xu
the data directory is memory basd, no hard drive involved.
mount -t tmpfs -o size=25G tmpfs /mnt/ramdisk  and use this as data folder.
iostat show 0 write too.

On Wed, Aug 22, 2018 at 11:06 PM Ken Chen  wrote:

> You are reaching 10gb * 1000 / 64 = 156 MB / s which probably saturated
> your hard drive bandwidth ? So you can take a look at your iostats
>
> --
> Sent from my iPhone
>
> On Aug 22, 2018, at 8:20 PM, Nan Xu  wrote:
>
> I setup a local single node test. producer and broker are sitting at the
> same VM. broker only has a single node(localhost) and a single partition.
> producer produce message as fast as it could in a single thread. all update
> to a SINGLE key(String). the kafka broker data directory is memory based
> directory(RAM). in this setup, because I am not access network( everything
> is localhost), and not access disk( memory based dir). I should avoid all
> the IO operations, so the pub should be really, really fast, maybe close to
> the memory speed. But I publish 1,000,000 messges (every message is 10K
> string, so total 10G), that takes 64s, which I think it's very slow. And
> from htop, I don't see any cpu thread get pushed to close to 100%. is that
> mean kafka has some internal lock/wait which can not fully utilize the
> hardware? producer and broker setting are out of box default, I am using
> 0.10.1.0.
>
> Thanks,
> Nan
>


kafka local single partition, what's the limit?

2018-08-22 Thread Nan Xu
I setup a local single node test. producer and broker are sitting at the
same VM. broker only has a single node(localhost) and a single partition.
producer produce message as fast as it could in a single thread. all update
to a SINGLE key(String). the kafka broker data directory is memory based
directory(RAM). in this setup, because I am not access network( everything
is localhost), and not access disk( memory based dir). I should avoid all
the IO operations, so the pub should be really, really fast, maybe close to
the memory speed. But I publish 1,000,000 messges (every message is 10K
string, so total 10G), that takes 64s, which I think it's very slow. And
from htop, I don't see any cpu thread get pushed to close to 100%. is that
mean kafka has some internal lock/wait which can not fully utilize the
hardware? producer and broker setting are out of box default, I am using
0.10.1.0.

Thanks,
Nan


Re: kafka stream latency

2018-08-22 Thread Nan Xu
I was suspecting that too, but I also noticed the spike is not spaced
around 10s. to further prove it. I put kafka data directory in a memory
based directory.  it still has such latency spikes.  I am going to test it
on a single broker, single partition env.  will report back soon.

On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang  wrote:

> Hello Nan,
>
> Thanks for the detailed information you shared. When Kafka Streams is
> normally running, no rebalances should be triggered unless some of the
> instances (in your case, docker containers) have soft failures.
>
> I suspect the latency spike is due to the commit intervals: streams will
> try to commit its offset at a regular paces, which may increase latency. It
> is controlled by the "commit.interval.ms" config value. I saw that in your
> original email you've set it to 10 * 1000 (10 seconds). Is that aligned
> with the frequency you observe latency spikes?
>
>
> Guozhang
>
>
> On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu  wrote:
>
> > did more test and and make the test case simple.
> > all the setup now is a single physical machine. running 3 docker
> instance.
> > a1, a2, a3
> >
> > kafka + zookeeper running on all of those docker containers.
> > producer running on a1, send a single key,  update speed 2000 message/s,
> > each message is 10K size.
> > 3 consumer(different group)  are running. one on each docker.
> > all topics are pre-created.
> > in startup, I do see some latency greater than 100ms, which is fine. and
> > then everything is good. latency is low and consumer don't see anything
> > over 100ms for a while.
> > then I see a few messages have latency over 100ms. then back to normal,
> > then happen again. do seems like gc problem. but I check the gc
> log.  I
> > don't think it can cause over 100ms. (both are G1 collector)
> >
> > after the stream stable running( exclude the startup), the first message
> > over 100ms take 179ms  and the gc ( it has a 30ms pause, but should not
> > cause a 179ms end to end).
> >
> > FROM APP
> >
> > 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure)
> > 3184739K->84018K(5947904K), 0.0093730 secs]
> >
> > 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation Failure)
> > 3184690K->84280K(6053888K), 0.0087473 secs]
> >
> > 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation Failure)
> > 3301176K->84342K(6061056K), 0.0127339 secs]
> >
> > 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation Failure)
> > 3301238K->84624K(6143488K), 0.0140844 secs]
> >
> > 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation Failure)
> > 3386000K->89949K(6144000K), 0.0108118 secs]
> >
> >
> >
> > kafka a1
> >
> > 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 Evacuation Pause)
> > (young), 0.0214200 secs]
> >
> >[Parallel Time: 17.2 ms, GC Workers: 8]
> >
> >   [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, Max:
> > 7982673.8, Diff: 16.3]
> >
> >   [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, Diff: 1.5,
> > Sum: 1.5]
> >
> >   [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: 6.5, Sum: 8.4]
> >
> >  [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: 13, Sum:
> 37]
> >
> >   [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, Sum: 7.1]
> >
> >   [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0,
> > Sum: 0.0]
> >
> >   [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: 6.5, Sum:
> > 36.5]
> >
> >   [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: 0.9, Sum:
> 2.9]
> >
> >  [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, Diff: 24,
> Sum:
> > 83]
> >
> >   [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0,
> Sum:
> > 0.1]
> >
> >   [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, Diff: 16.2,
> > Sum: 56.5]
> >
> >   [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, Max:
> 7982674.5,
> > Diff: 0.6]
> >
> >[Code Root Fixup: 0.0 ms]
> >
> >[Code Root Purge: 0.0 ms]
> >
> >[Clear CT: 1.0 ms]
> >
> >[Other: 3.2 ms]
> >
> >   [Choose CSet: 0.0 ms]
> >
> >   [Ref Proc: 1.9 ms]
> >
> >   [Ref Enq: 0.0 ms]
> >
> >   [Redirty Cards: 0.8 ms]
> >
> >   [Humongous Register: 0.1 ms]
> >
> >   [Humongous Reclaim: 0.0 ms]
> >
> >   [Free CSet: 0.2 ms]
&

Re: Kafka issue

2018-08-20 Thread Nan Xu
maybe I should highlight, I only publish 1 key. so only one broker is going
to handle it. and only 1 stream instance handle it too. what's the typical
throughput/latency I should expect in this case? assuming the processing
logic is very very simple, just get data(integer) and sum. I am more
expecting 100,000 m/s and less than 10ms latency for a single powerful
broker.

Nan

On Mon, Aug 20, 2018 at 12:45 AM Nan Xu  wrote:

> I did several test.  one is with 10 brokers (remote server),
>   one with 3 brokers. (local docker)
>
> both exhibit the same behavior,  I was thinking the same but from at least
> the kafka log, I don't see a rebalance happening. and I am sure my cpu is
> only used about half. and all broker still running.
>
> Nan
>
>
>
> On Mon, Aug 20, 2018 at 12:18 AM Shantanu Deshmukh 
> wrote:
>
>> How many brokers are there in your cluster? This error usually comes when
>> one of the brokers who is leader for a partition dies and you are trying
>> to
>> access it.
>>
>> On Fri, Aug 17, 2018 at 9:23 PM Harish K  wrote:
>>
>> > Hi,
>> >I have installed Kafka and created topic but while data ingestion i
>> get
>> > some errors as follows.Any help would be really appreciated
>> >
>> >
>> > [2018-08-17 06:12:49,838] WARN Error while fetching metadata with
>> > correlation id 24 :
>> > {wikipedia=LEADER_NOT_AVAILABLE}(org.apache.kafka.clients.NetworkClient)
>> >
>> > *server log:*
>> >
>> > [2018-08-17 06:06:00,719] INFO Creating /controller (is it secure?
>> false)
>> > (kafka.utils.ZKCheckedEphemeral)
>> > [2018-08-17 06:06:00,720] INFO Result of znode creation is: OK
>> > (kafka.utils.ZKCheckedEphemeral)
>> > [2018-08-17 06:06:00,720] INFO 0 successfully elected as leader
>> > (kafka.server.ZookeeperLeaderElector)
>> > [2018-08-17 06:06:00,736] ERROR Error while electing or becoming leader
>> on
>> > broker 0 (kafka.server.ZookeeperLeaderElector)
>> > kafka.common.KafkaException: Can't parse json string: null
>> > at kafka.utils.Json$.liftedTree1$1(Json.scala:40)
>> > at kafka.utils.Json$.parseFull(Json.scala:36)
>> > at
>> >
>> >
>> kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:660)
>> > at
>> >
>> >
>> kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:656)
>> > at
>> >
>> >
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> > at kafka.utils.ZkUtils.getReplicaAssignmentForTopics(ZkUtils.scala:656)
>> > at
>> >
>> >
>> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742)
>> > at
>> >
>> >
>> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:333)
>> > at
>> >
>> >
>> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:160)
>> > at
>> >
>> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:85)
>> > at
>> >
>> >
>> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:154)
>> > at
>> >
>> >
>> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:154)
>> > at
>> >
>> >
>> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:154)
>> > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>> > at
>> >
>> >
>> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:153)
>> > at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:825)
>> > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:72)
>> > Caused by: java.lang.NullPointerException
>> > at
>> >
>> >
>> scala.util.parsing.combinator.lexical.Scanners$Scanner.(Scanners.scala:44)
>> > at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:51)
>> > at scala.util.parsing.json.JSON$.parseFull(JSON.scala:65)
>> > at kafka.utils.Json$.liftedTree1$1(Json.scala:37)
>> > ... 17 more
>> >
>> >
>> > *Controller Log:*
>> >
>> > [2018-08-17 06:05:54,644] INFO [Controller 0]: Controller starting up
>

Re: Kafka issue

2018-08-19 Thread Nan Xu
I did several test.  one is with 10 brokers (remote server),
  one with 3 brokers. (local docker)

both exhibit the same behavior,  I was thinking the same but from at least
the kafka log, I don't see a rebalance happening. and I am sure my cpu is
only used about half. and all broker still running.

Nan



On Mon, Aug 20, 2018 at 12:18 AM Shantanu Deshmukh 
wrote:

> How many brokers are there in your cluster? This error usually comes when
> one of the brokers who is leader for a partition dies and you are trying to
> access it.
>
> On Fri, Aug 17, 2018 at 9:23 PM Harish K  wrote:
>
> > Hi,
> >I have installed Kafka and created topic but while data ingestion i
> get
> > some errors as follows.Any help would be really appreciated
> >
> >
> > [2018-08-17 06:12:49,838] WARN Error while fetching metadata with
> > correlation id 24 :
> > {wikipedia=LEADER_NOT_AVAILABLE}(org.apache.kafka.clients.NetworkClient)
> >
> > *server log:*
> >
> > [2018-08-17 06:06:00,719] INFO Creating /controller (is it secure? false)
> > (kafka.utils.ZKCheckedEphemeral)
> > [2018-08-17 06:06:00,720] INFO Result of znode creation is: OK
> > (kafka.utils.ZKCheckedEphemeral)
> > [2018-08-17 06:06:00,720] INFO 0 successfully elected as leader
> > (kafka.server.ZookeeperLeaderElector)
> > [2018-08-17 06:06:00,736] ERROR Error while electing or becoming leader
> on
> > broker 0 (kafka.server.ZookeeperLeaderElector)
> > kafka.common.KafkaException: Can't parse json string: null
> > at kafka.utils.Json$.liftedTree1$1(Json.scala:40)
> > at kafka.utils.Json$.parseFull(Json.scala:36)
> > at
> >
> >
> kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:660)
> > at
> >
> >
> kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:656)
> > at
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> > at kafka.utils.ZkUtils.getReplicaAssignmentForTopics(ZkUtils.scala:656)
> > at
> >
> >
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742)
> > at
> >
> >
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:333)
> > at
> >
> >
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:160)
> > at
> >
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:85)
> > at
> >
> >
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:154)
> > at
> >
> >
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:154)
> > at
> >
> >
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:154)
> > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> > at
> >
> >
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:153)
> > at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:825)
> > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:72)
> > Caused by: java.lang.NullPointerException
> > at
> >
> >
> scala.util.parsing.combinator.lexical.Scanners$Scanner.(Scanners.scala:44)
> > at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:51)
> > at scala.util.parsing.json.JSON$.parseFull(JSON.scala:65)
> > at kafka.utils.Json$.liftedTree1$1(Json.scala:37)
> > ... 17 more
> >
> >
> > *Controller Log:*
> >
> > [2018-08-17 06:05:54,644] INFO [Controller 0]: Controller starting up
> > (kafka.controller.KafkaController)
> > [2018-08-17 06:05:54,659] INFO [Controller 0]: Broker 0 starting become
> > controller state transition (kafka.controller.KafkaController)
> > [2018-08-17 06:05:54,661] INFO [Controller 0]: Initialized controller
> epoch
> > to 2294948 and zk version 2294947 (kafka.controller.KafkaController)
> > [2018-08-17 06:05:54,664] INFO [Controller 0]: Controller 0 incremented
> > epoch to 2294949 (kafka.controller.KafkaController)
> > [2018-08-17 06:05:54,665] DEBUG [Controller 0]: Registering
> > IsrChangeNotificationListener (kafka.controller.KafkaController)
> > [2018-08-17 06:05:54,705] INFO [Controller 0]: Controller startup
> complete
> > (kafka.controller.KafkaController)
> > [2018-08-17 06:05:54,715] DEBUG [Controller 0]: Controller resigning,
> > broker id 0 (kafka.controller.KafkaController)
> > [2018-08-17 06:05:54,715] DEBUG [Controller 0]: De-registering
> > IsrChangeNotificationListener (kafka.controller.KafkaController)
> > [2018-08-17 06:05:54,717] INFO [Partition state machine on Controller 0]:
> > Stopped partition state machine (kafka.controller.PartitionStateMachine)
> > [2018-08-17 06:05:54,718] INFO [Replica state machine on controller 0]:
> > Stopped replica state machine (kafka.controller.ReplicaStateMachine)
> > [2018-08-17 06:05:54,718] INFO [Controller 0]: Broker 0 resigned as the
> > controller 

Re: kafka stream latency

2018-08-19 Thread Nan Xu
t; broker ->
> streams' consumer client, there are multiple phases that can contribute to
> the 100ms latency, and I cannot tell if stream's consumer phase is the
> major contributor. For example, if the topic was not created before, then
> when the broker first received a produce request it may need to create the
> topic, which involves multiple steps including writes to ZK which could
> take time.
>
> There are some confusions from your description: you mentioned "Kafka
> cluster is already up and running", but I think you are referring to "Kafka
> Streams application instances are already up and running", right? Since
> only the latter has rebalance process, while the Kafak brokers do not
> really have "rebalances" except balancing load by migrating partitions.
>
> Guozhang
>
>
>
> On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu  wrote:
>
> > right, so my kafka cluster is already up and running for a while, and I
> can
> > see from the log all broker instance already change from rebalance to
> > running.
> >
> > I did a another test.
> > from producer, right before the message get send to the broker, I put a
> > timestamp in the message. and from the consumer side which is after
> stream
> > processing, I compare this timestamp with current time. I can see some
> > message processing time is above 100ms on some real powerful hardware.
> and
> > from my application gc, all the gc time is below 1ms, kafka gc only
> happen
> > once and below 1ms too.
> >
> > very puzzled. is there any communication to zookeeper, if not get
> response,
> > will cause the broker to pause? I don't think that's the case but at this
> > time don't know what else can be suspected.
> >
> > Nan
> >
> > On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang 
> wrote:
> >
> > > Hello Nan,
> > >
> > > Note that Streams may need some time to rebalance and assign tasks even
> > if
> > > you only starts with one instance.
> > >
> > > I'd suggest you register your state listener in Kafka Streams via
> > > KafkaStreams#setStateListener, and your customized StateListener should
> > > record when the state transits from REBALANCING to RUNNING since only
> > after
> > > that the streams client will start to process the first record.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu  wrote:
> > >
> > > > thanks, which JMX properties indicate  "processing latency spikes"  /
> > > > "throughput"
> > > >
> > > >
> > > > On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax <
> matth...@confluent.io
> > >
> > > > wrote:
> > > >
> > > > > I cannot spot any obvious reasons.
> > > > >
> > > > > As you consume from the result topic for verification, we should
> > verify
> > > > > that the latency spikes original on write and not on read: you
> might
> > > > > want to have a look into Kafka Streams JMX metric to see if
> > processing
> > > > > latency spikes or throughput drops.
> > > > >
> > > > > Also watch for GC pauses in the JVM.
> > > > >
> > > > > Hope this helps.
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 8/17/18 12:13 PM, Nan Xu wrote:
> > > > > > btw, I am using version 0.10.2.0
> > > > > >
> > > > > > On Fri, Aug 17, 2018 at 2:04 PM Nan Xu 
> > wrote:
> > > > > >
> > > > > >> I am working on a kafka stream app, and see huge latency
> variance,
> > > > > >> wondering what can cause this?
> > > > > >>
> > > > > >> the processing is very simple and don't have state, linger.ms
> > > already
> > > > > >> change to 5ms. the message size is around 10K byes and published
> > as
> > > > 2000
> > > > > >> messages/s, network is 10G.  using a regular consumer watch the
> > > > > >> localHistTopic  topic and just every 2000 message print out a
> > > counter,
> > > > > it
> > > > > >> usually every second I get a count 2000 as the publish speed,
> but
> > > > > sometime
> > > > > >> I see it stall for 3 or more seconds 

Re: kafka stream latency

2018-08-19 Thread Nan Xu
right, so my kafka cluster is already up and running for a while, and I can
see from the log all broker instance already change from rebalance to
running.

I did a another test.
from producer, right before the message get send to the broker, I put a
timestamp in the message. and from the consumer side which is after stream
processing, I compare this timestamp with current time. I can see some
message processing time is above 100ms on some real powerful hardware. and
from my application gc, all the gc time is below 1ms, kafka gc only happen
once and below 1ms too.

very puzzled. is there any communication to zookeeper, if not get response,
will cause the broker to pause? I don't think that's the case but at this
time don't know what else can be suspected.

Nan

On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang  wrote:

> Hello Nan,
>
> Note that Streams may need some time to rebalance and assign tasks even if
> you only starts with one instance.
>
> I'd suggest you register your state listener in Kafka Streams via
> KafkaStreams#setStateListener, and your customized StateListener should
> record when the state transits from REBALANCING to RUNNING since only after
> that the streams client will start to process the first record.
>
>
> Guozhang
>
>
> On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu  wrote:
>
> > thanks, which JMX properties indicate  "processing latency spikes"  /
> > "throughput"
> >
> >
> > On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax 
> > wrote:
> >
> > > I cannot spot any obvious reasons.
> > >
> > > As you consume from the result topic for verification, we should verify
> > > that the latency spikes original on write and not on read: you might
> > > want to have a look into Kafka Streams JMX metric to see if processing
> > > latency spikes or throughput drops.
> > >
> > > Also watch for GC pauses in the JVM.
> > >
> > > Hope this helps.
> > >
> > >
> > > -Matthias
> > >
> > > On 8/17/18 12:13 PM, Nan Xu wrote:
> > > > btw, I am using version 0.10.2.0
> > > >
> > > > On Fri, Aug 17, 2018 at 2:04 PM Nan Xu  wrote:
> > > >
> > > >> I am working on a kafka stream app, and see huge latency variance,
> > > >> wondering what can cause this?
> > > >>
> > > >> the processing is very simple and don't have state, linger.ms
> already
> > > >> change to 5ms. the message size is around 10K byes and published as
> > 2000
> > > >> messages/s, network is 10G.  using a regular consumer watch the
> > > >> localHistTopic  topic and just every 2000 message print out a
> counter,
> > > it
> > > >> usually every second I get a count 2000 as the publish speed, but
> > > sometime
> > > >> I see it stall for 3 or more seconds and then print out a few count.
> > > like
> > > >> cpu is paused during that time or message being cache/batch then
> > > processed.
> > > >> any suggestion?
> > > >>
> > > >>   final Properties streamsConfiguration = new Properties();
> > > >>
> > > >>
>  streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > > >> applicationId);
> > > >>
> > > >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,
> > > clientId);
> > > >>
> > > >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_
> > SERVERS_CONFIG,
> > > >> bootstrapServers);
> > > >>
> > > >>
> > > >> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_
> > SERDE_CLASS_CONFIG,
> > > >> Serdes.String()
> > > >>
> > > >> .getClass().getName());
> > > >>
> > > >>
> > >  streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > > >> 10 * 1000);
> > > >>
> > > >> //
> > > >>
> > >
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> > 0);
> > > >>
> > > >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > > >>
> > > >> + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > > >>
> > > >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > > >>
> > > >> + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
> > &

Re: kafka stream latency

2018-08-18 Thread Nan Xu
thanks, which JMX properties indicate  "processing latency spikes"  /
"throughput"


On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax 
wrote:

> I cannot spot any obvious reasons.
>
> As you consume from the result topic for verification, we should verify
> that the latency spikes original on write and not on read: you might
> want to have a look into Kafka Streams JMX metric to see if processing
> latency spikes or throughput drops.
>
> Also watch for GC pauses in the JVM.
>
> Hope this helps.
>
>
> -Matthias
>
> On 8/17/18 12:13 PM, Nan Xu wrote:
> > btw, I am using version 0.10.2.0
> >
> > On Fri, Aug 17, 2018 at 2:04 PM Nan Xu  wrote:
> >
> >> I am working on a kafka stream app, and see huge latency variance,
> >> wondering what can cause this?
> >>
> >> the processing is very simple and don't have state, linger.ms already
> >> change to 5ms. the message size is around 10K byes and published as 2000
> >> messages/s, network is 10G.  using a regular consumer watch the
> >> localHistTopic  topic and just every 2000 message print out a counter,
> it
> >> usually every second I get a count 2000 as the publish speed, but
> sometime
> >> I see it stall for 3 or more seconds and then print out a few count.
> like
> >> cpu is paused during that time or message being cache/batch then
> processed.
> >> any suggestion?
> >>
> >>   final Properties streamsConfiguration = new Properties();
> >>
> >> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >> applicationId);
> >>
> >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,
> clientId);
> >>
> >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> >> bootstrapServers);
> >>
> >>
> >> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> >> Serdes.String()
> >>
> >> .getClass().getName());
> >>
> >>
>  streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> >> 10 * 1000);
> >>
> >> //
> >>
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> >>
> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >> + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> >>
> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >> + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
> >>
> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >> + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30);
> >>
> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >> + ProducerConfig.LINGER_MS_CONFIG,"5");
> >>
> >> streamsConfiguration.put(StreamsConfig.consumerPrefix(
> >> ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),20 * 1024 *
> >> 1024);MS_CONFIG, 10 * 1000);
> >>
> >> //
> >>
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> >>
> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >> + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> >>
> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >> + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
> >>
> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >> + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30);
> >>
> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >> + ProducerConfig.LINGER_MS_CONFIG,"5");
> >>
> >> streamsConfiguration.put(StreamsConfig.consumerPrefix(
> >>
> >> ConsumerConfig. MAX_PARTITION_FETCH_BYTES_CONFIG , 20 *
> 1024 *
> >> 1024);
> >>
> >>
> >>  final StreamsBuilder builder = new StreamsBuilder();
> >>
> >>  final KStream localDeltaStream = builder.stream(
> >>
> >> localDeltaTopic,
> >>
> >> Consumed.with(
> >>
> >> new Serdes.StringSerde(),
> >>
> >> new NodeMutationSerde<>()
> >>
> >> )
> >>
> >> );
> >>
> >>   KStream localHistStream =
> localDeltaStream.mapValues(
> >>
> >> (mutation) -> NodeState
> >>
> >> .newBuilder()
> >>
> >> .setMeta(
> >>
> >> mutation.getMetaMutation().getMeta()
> >>
> >> )
> >>
> >> .setValue(
> >>
> >> mutation.getValueMutation().getValue()
> >>
> >> )
> >>
> >> .build()
> >>
> >> );
> >>
> >>   localHistStream.to(
> >>
> >> localHistTopic,
> >>
> >> Produced.with(new Serdes.StringSerde(), new
> NodeStateSerde<>())
> >>
> >> );
> >>
> >>  streams = new KafkaStreams(builder.build(), streamsConfiguration);
> >>
> >> streams.cleanUp();
> >>
> >> streams.start();
> >>
> >>
> >>
> >>
> >>
> >
>
>


Re: kafka stream latency

2018-08-17 Thread Nan Xu
btw, I am using version 0.10.2.0

On Fri, Aug 17, 2018 at 2:04 PM Nan Xu  wrote:

> I am working on a kafka stream app, and see huge latency variance,
> wondering what can cause this?
>
> the processing is very simple and don't have state, linger.ms already
> change to 5ms. the message size is around 10K byes and published as 2000
> messages/s, network is 10G.  using a regular consumer watch the
> localHistTopic  topic and just every 2000 message print out a counter,  it
> usually every second I get a count 2000 as the publish speed, but sometime
> I see it stall for 3 or more seconds and then print out a few count. like
> cpu is paused during that time or message being cache/batch then processed.
> any suggestion?
>
>   final Properties streamsConfiguration = new Properties();
>
> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> applicationId);
>
> streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, clientId);
>
> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> bootstrapServers);
>
>
> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.String()
>
> .getClass().getName());
>
> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> 10 * 1000);
>
> //
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>
> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
> + ProducerConfig.BATCH_SIZE_CONFIG,163840);
>
> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
> + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
>
> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
> + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30);
>
> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
> + ProducerConfig.LINGER_MS_CONFIG,"5");
>
> streamsConfiguration.put(StreamsConfig.consumerPrefix(
> ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),20 * 1024 *
> 1024);MS_CONFIG, 10 * 1000);
>
> //
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>
> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
> + ProducerConfig.BATCH_SIZE_CONFIG,163840);
>
> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
> + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
>
> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
> + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30);
>
> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
> + ProducerConfig.LINGER_MS_CONFIG,"5");
>
> streamsConfiguration.put(StreamsConfig.consumerPrefix(
>
> ConsumerConfig. MAX_PARTITION_FETCH_BYTES_CONFIG , 20 * 1024 *
> 1024);
>
>
>  final StreamsBuilder builder = new StreamsBuilder();
>
>  final KStream localDeltaStream = builder.stream(
>
> localDeltaTopic,
>
> Consumed.with(
>
> new Serdes.StringSerde(),
>
> new NodeMutationSerde<>()
>
> )
>
> );
>
>   KStream localHistStream = localDeltaStream.mapValues(
>
> (mutation) -> NodeState
>
> .newBuilder()
>
> .setMeta(
>
> mutation.getMetaMutation().getMeta()
>
> )
>
> .setValue(
>
> mutation.getValueMutation().getValue()
>
> )
>
> .build()
>
> );
>
>   localHistStream.to(
>
> localHistTopic,
>
> Produced.with(new Serdes.StringSerde(), new NodeStateSerde<>())
>
> );
>
>  streams = new KafkaStreams(builder.build(), streamsConfiguration);
>
> streams.cleanUp();
>
> streams.start();
>
>
>
>
>


Re: Mirrormaker consumption slowness

2017-12-07 Thread Xu, Zhaohui
Thanks Steve for your tips. 

Yes, we found many sacks in packet sequence of problematic connections and 
observed there was intermittent network jitter in between. That explained the 
behavior seen in our setup.

Regards,
Jeff

On 12/7/17, 7:45 AM, "Steve Miller" <st...@idrathernotsay.com> wrote:

This kind of sounds to me like there’s packet loss somewhere and TCP is 
closing the window to try to limit congestion.  But from the snippets you 
posted, I didn’t see any sacks in the tcpdump output.  If there *are* sacks, 
that’d be a strong indicator of loss somewhere, whether it’s in the network or 
it’s in some host that’s being overwhelmed.

I didn’t have a chance to do the header math to see if TCP’s advertising a 
small window in the lossy case you posted.  But I figured I’d mention this just 
in case it’s useful.

-Steve

> On Dec 6, 2017, at 5:27 PM, tao xiao <xiaotao...@gmail.com> wrote:
> 
> Mirror mare is placed to close to target and send/receive buffer size set
> to 10MB which is the result of bandwidth-delay product. OS level tcp 
buffer
> has also been increased to 16MB max
> 
>> On Wed, 6 Dec 2017 at 15:19 Jan Filipiak <jan.filip...@trivago.com> 
wrote:
>> 
>> Hi,
>> 
>> two questions. Is your MirrorMaker collocated with the source or the
>> target?
>> what are the send and receive buffer sizes on the connections that do 
span
>> across WAN?
>> 
>> Hope we can get you some help.
>> 
>> Best jan
>> 
>> 
>> 
>>> On 06.12.2017 14:36, Xu, Zhaohui wrote:
>>> Any update on this issue?
>>> 
>>> We also run into similar situation recently. The mirrormaker is
>> leveraged to replicate messages between clusters in different dc. But
>> sometimes a portion of partitions are with high consumer lag and tcpdump
>> also shows similar packet delivery pattern. The behavior is sort of weird
>> and is not self-explaining. Wondering whether it has anything to do with
>> the fact that number of consumers is too large?  In our example, we have
>> around 100 consumer connections per broker.
>>> 
>>> Regards,
>>> Jeff
>>> 
>>> On 12/5/17, 10:14 AM, "tao xiao" <xiaotao...@gmail.com> wrote:
>>> 
>>> Hi,
>>> 
>>> any pointer will be highly appreciated
>>> 
>>>> On Thu, 30 Nov 2017 at 14:56 tao xiao <xiaotao...@gmail.com> wrote:
>>>> 
>>>> Hi There,
>>>> 
>>>> 
>>>> 
>>>> We are running into a weird situation when using Mirrormaker to
>> replicate
>>>> messages between Kafka clusters across datacenter and reach you
>> for help in
>>>> case you also encountered this kind of problem before or have
>> some insights
>>>> in this kind of issue.
>>>> 
>>>> 
>>>> 
>>>> Here is the scenario. We have setup a deployment where we run 30
>>>> Mirrormaker instances on 30 different nodes. Each Mirrormaker
>> instance is
>>>> configure with num.streams=1 thus only one consumer runs. The
>> topics to
>>>> replicate is configure with 100 partitions and data is almost
>> evenly
>>>> distributed across all partitions. After running a period of
>> time, weird
>>>> things happened that some of the Mirrormaker instances seems to
>> slow down
>>>> and consume at a relative slow speed from source Kafka cluster.
>> The output
>>>> of tcptrack shows the consume rate of problematic instances
>> dropped to
>>>> ~1MB/s, while the other healthy instances consume at a rate of
>> ~3MB/s. As
>>>> a result, the consumer lag for corresponding partitions are going
>> high.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> After triggering a tcpdump, we noticed the traffic pattern in tcp
>>>> connection of problematic Mirrmaker instances is very different
>> from
>>>> others. Packets flowing in problematic tcp connections are
>> relatively small
>>>> and seq and ack packets are basically coming in one after
>> another. On the
>>>> other hand, the

Re: Mirrormaker consumption slowness

2017-12-06 Thread Xu, Zhaohui
Any update on this issue? 

We also run into similar situation recently. The mirrormaker is leveraged to 
replicate messages between clusters in different dc. But sometimes a portion of 
partitions are with high consumer lag and tcpdump also shows similar packet 
delivery pattern. The behavior is sort of weird and is not self-explaining. 
Wondering whether it has anything to do with the fact that number of consumers 
is too large?  In our example, we have around 100 consumer connections per 
broker.

Regards,
Jeff

On 12/5/17, 10:14 AM, "tao xiao"  wrote:

Hi,

any pointer will be highly appreciated

On Thu, 30 Nov 2017 at 14:56 tao xiao  wrote:

> Hi There,
>
>
>
> We are running into a weird situation when using Mirrormaker to replicate
> messages between Kafka clusters across datacenter and reach you for help 
in
> case you also encountered this kind of problem before or have some 
insights
> in this kind of issue.
>
>
>
> Here is the scenario. We have setup a deployment where we run 30
> Mirrormaker instances on 30 different nodes. Each Mirrormaker instance is
> configure with num.streams=1 thus only one consumer runs. The topics to
> replicate is configure with 100 partitions and data is almost evenly
> distributed across all partitions. After running a period of time, weird
> things happened that some of the Mirrormaker instances seems to slow down
> and consume at a relative slow speed from source Kafka cluster. The output
> of tcptrack shows the consume rate of problematic instances dropped to
> ~1MB/s, while the other healthy instances consume at a rate of  ~3MB/s. As
> a result, the consumer lag for corresponding partitions are going high.
>
>
>
>
> After triggering a tcpdump, we noticed the traffic pattern in tcp
> connection of problematic Mirrmaker instances is very different from
> others. Packets flowing in problematic tcp connections are relatively 
small
> and seq and ack packets are basically coming in one after another. On the
> other hand, the packets in healthy tcp connections are coming in a
> different pattern, basically several seq packets comes with an ack 
packets.
> Below screenshot shows the situation, and these two captures are got on 
the
> same mirrormaker node.
>
>
>
> problematic connection.  ps. 10.kfk.kfk.kfk is kafka broker, 10.mm.mm.mm
> is Mirrormaker node
>
> 
https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fimgur.com%2FZ3odjjT=02%7C01%7Czhaohxu%40ebay.com%7Ca8efe84f9feb47ecb5fd08d53b85d7ac%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636480368398154028=2DdGcjPWD7QI7lZ7v7QDN6I53P9tsSTMzEGdw6IywmU%3D=0
>
>
> healthy connection
>
> 
https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fimgur.com%2Fw0A6qHT=02%7C01%7Czhaohxu%40ebay.com%7Ca8efe84f9feb47ecb5fd08d53b85d7ac%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636480368398154028=v52DmmY9LHN2%2F59Hb5Xo77JuLreOA3lfDyq8eHKmISQ%3D=0
>
>
> If we stop the problematic Mirrormaker instance and when other instances
> take over the lagged partitions, they can consume messages quickly and
> catch up the lag soon. So the broker in source Kafaka cluster is supposed
> to be good. But if Mirrormaker itself causes the issue, how can one tcp
> connection is good but others are problematic since the connections are 
all
> established in the same manner by Kafka library.
>
>
>
> Consumer configuration for Mirrormaker instance as below.
>
> auto.offset.reset=earliest
>
>
> 
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
>
> heartbeat.interval.ms=1
>
> session.timeout.ms=12
>
> request.timeout.ms=15
>
> receive.buffer.bytes=1048576
>
> max.partition.fetch.bytes=2097152
>
> fetch.min.bytes=1048576
>
>
>
> Kafka version is 0.10.0.0 and we have Kafka and Mirrormaker run on Ubuntu
> 14.04
>
>
>
> Any response is appreciated.
>
> Regards,
>
> Tao
>




Re:whether compressed data is still compressed in broker log

2016-07-12 Thread xu

When I set compression.type=gzip, it works very well, but snappy not.



At 2016-07-11 22:41:49, "xu" <xupingyong...@163.com> wrote:

Hi All:
   I update broker version from 0.8.2 to 0.10.0 and set 
"compression.type=snappy" in server.properties. Version of  producers and 
consumers is still 0.8.2.  I expect all the new data received by brokers is 
stored compressedly in log files. But the result is in contrast。


 My question is the same with: 
http://stackoverflow.com/questions/31168085/checking-kafka-data-if-compressed.


I donot understand how compressed data is stored in log files. Could you 
help me?




 

Re: Rolling upgrade from 0.8.2.1 to 0.9.0.1 failing with replicafetchthread OOM errors

2016-07-11 Thread Hao Xu
Oh, I am talking about another memory leak. the offheap memory leak we had
experienced. Which is about Direct Buffer memory. the callstack as below.
 ReplicaFetcherThread.warn - [ReplicaFetcherThread-4-1463989770], Error in
fetch kafka.server.ReplicaFetcherThread$FetchRequest@7f4c1657. Possible
cause: java.lang.OutOfMemoryError: Direct buffer memory
 ERROR kafka-network-thread-75737866-PLAINTEXT-26 Processor.error -
Processor got uncaught exception.
local3.err java.lang.OutOfMemoryError: - Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)

On Mon, Jul 11, 2016 at 10:54 AM, Tom Crayford  wrote:

> Hi (I'm the author of that ticket):
>
> From my understanding limiting MaxDirectMemory won't workaround this memory
> leak. The leak is inside the JVM's implementation, not in normal direct
> buffers. On one of our brokers with this issue, we're seeing the JVM report
> 1.2GB of heap, and 128MB of offheap memory, yet the actual process memory
> is more like 10GB.
>
> Thanks
>
> Tom Crayford
> Heroku Kafka
>



-- 
-hao


whether compressed data is still compressed in broker log

2016-07-11 Thread xu
Hi All:
   I update broker version from 0.8.2 to 0.10.0 and set 
"compression.type=snappy" in server.properties. Version of  producers and 
consumers is still 0.8.2.  I expect all the new data received by brokers is 
stored compressedly in log files. But the result is in contrast。


 My question is the same with: 
http://stackoverflow.com/questions/31168085/checking-kafka-data-if-compressed.


I donot understand how compressed data is stored in log files. Could you 
help me?

Re: Consumer Question

2016-06-19 Thread Shaolu Xu
Hi Chris,

If the topic not exist, it will create a new topic with the name which you
give.

Thanks,
Nicole

On Sat, Jun 18, 2016 at 1:55 AM, Chris Barlock  wrote:

> If you have a consumer listening on a topic and that topic is deleted is
> the consumer made aware -- perhaps by some exception -- or does it
> continue listening, blissfully unaware that it will never hear anything
> more?
>
> Thanks,
>
> Chris
>
>
>


Re: How to use HDP kafka?

2016-06-01 Thread Shaolu Xu
Hi All,

I used the latest HDP 2.4 version.
Did you do some configuration before used HDP? I searched a solution that
is http://www.cnblogs.com/i2u9/p/ambari-kafka-multiip.html, but it not
works for me.

Attachment errorInfo and HDPConfig snapshot.

Thanks in advance!

Thanks,
Nicole


On Wed, Jun 1, 2016 at 8:44 PM, Igor Kravzov <igork.ine...@gmail.com> wrote:

> Hi,
>
> I am unable to see the images. But I use Kafka with HDP right now without
> any problem.
>
> On Tue, May 31, 2016 at 9:33 PM, Shaolu Xu <sh...@tibco-support.com>
> wrote:
>
> > Hi All,
> >
> >
> > Anyone used HDP to run kafka, I used it and face a problem.The following
> > is the error info:
> >
> > [image: Inline image 2]
> >
> >
> > The following is my HDP configuration:
> >
> > [image: Inline image 1]
> > Should I set some configuration on HDP.
> >
> >
> > Thanks in advance.
> >
> >
> > Thanks,
> > Nicole
> >
>


snapshot.rar
Description: application/rar


Re: messages intermittently dropped in a single partition configuration

2016-04-28 Thread Bo Xu
Ok, I think I found the cause of the problem. The default value
of max_in_flight_requests_per_connection is 5 in Python producer. This
turns out too small for my environment and application. When this value is
reached and the producer tries several times and still fails, the message
is dropped. And since send() is asynchronous, there is no way for the
producer to report the failure immediately. So one solution is to block for
'synchronous' sends as described at
http://kafka-python.readthedocs.io/en/1.0.0/usage.html. But this slows down
the producing speed. Another solution, which I end up with, is to
significantly increase max_in_flight_requests_per_connection, e.g., to 1024.



On Thu, Apr 28, 2016 at 10:31 AM, Bo Xu <box...@gmail.com> wrote:

> PS: The message dropping occurred intermittently, not all at the end. For
> example, it is the 10th, 15th, 18th messages that are missing. It it were
> all at the end, it would be understandable because I'm not using flush() to
> force transmitting.
>
> Bo
>
>
> On Thu, Apr 28, 2016 at 10:15 AM, Bo Xu <box...@gmail.com> wrote:
>
>> I set up a simple Kafka configuration, with one topic and one partition.
>> I have a Python producer to continuously publish messages to the Kafka
>> server and a Python consumer to receive messages from the server. Each
>> message is about 10K bytes, far smaller than
>> socket.request.max.bytes=104857600. What I found is that the consumer
>> intermittently missed some messages. I checked the server log file and
>> found that these messages are missing there as well. So it looks like these
>> message were never stored by the server. I also made sure that the producer
>> did not receive any error for every message that it published (using
>> send()).
>>
>> Any clues what could have caused the problem?
>>
>> Thanks,
>> Bo
>>
>
>


Re: messages intermittently dropped in a single partition configuration

2016-04-28 Thread Bo Xu
PS: The message dropping occurred intermittently, not all at the end. For
example, it is the 10th, 15th, 18th messages that are missing. It it were
all at the end, it would be understandable because I'm not using flush() to
force transmitting.

Bo


On Thu, Apr 28, 2016 at 10:15 AM, Bo Xu <box...@gmail.com> wrote:

> I set up a simple Kafka configuration, with one topic and one partition. I
> have a Python producer to continuously publish messages to the Kafka server
> and a Python consumer to receive messages from the server. Each message is
> about 10K bytes, far smaller than socket.request.max.bytes=104857600. What
> I found is that the consumer intermittently missed some messages. I checked
> the server log file and found that these messages are missing there as
> well. So it looks like these message were never stored by the server. I
> also made sure that the producer did not receive any error for every
> message that it published (using send()).
>
> Any clues what could have caused the problem?
>
> Thanks,
> Bo
>


messages intermittently dropped in a single partition configuration

2016-04-28 Thread Bo Xu
I set up a simple Kafka configuration, with one topic and one partition. I
have a Python producer to continuously publish messages to the Kafka server
and a Python consumer to receive messages from the server. Each message is
about 10K bytes, far smaller than socket.request.max.bytes=104857600. What
I found is that the consumer intermittently missed some messages. I checked
the server log file and found that these messages are missing there as
well. So it looks like these message were never stored by the server. I
also made sure that the producer did not receive any error for every
message that it published (using send()).

Any clues what could have caused the problem?

Thanks,
Bo


electing leader failed and result in 0 latest offset

2016-04-20 Thread Qi Xu
Hi folks,
Recently we run into an odd issue that some partition's latest offset
becomes 0. Here's the snapshot of the Kafka Manager. As you can see
partition 2 and 3 becomes zero.

*Partition*

*Latest Offset*

*Leader*

*Replicas*

*In Sync Replicas*

*Preferred Leader?*

*Under Replicated?*

0

25822061

3 

(3,4,5)

(3,5,4)

true

false

1

25822388

4 

(4,5,1)

(4,1,5)

true

false

2

0

2 

(5,1,2)

(2)

false

true

3

0

2 

(1,2,3)

(3,2)

false

true

In the Kafka Controller node, I saw there're some errors like below in
state-change log. The timing seems match, not sure if it's related or not.

[2016-04-14 19:59:21,800] ERROR Controller 3 epoch 74174 initiated state
change for partition [topic,2] from OnlinePartition to OnlinePartition
failed (state.change.logger)
kafka.common.StateChangeFailedException: encountered error while electing
leader for partition [topic,2] due to: Preferred replica 1 for partition
[topic,2] is either not alive or not in the isr. Current leader and ISR:
[{"leader":2,"leader_epoch":169,"isr":[2]}].


And when this happens, basically all these partitions with zero latest
offset fail to get new data. After we restart the controller, everything
goes back normally.

Do you see the similar issue before and any idea about the root cause? What
other information do you suggest to collect to get to the root cause?

Thanks,
Qi


Re: Fallout from upgrading to kafka 0.9.0.0 from 0.8.2.1

2016-03-23 Thread Qi Xu
More information about the issue:
When the issue happens, the controller is always on the 0.9 version Kafka
broker.
In server.log of other brokers, we can see this kind of error:
[2016-03-23 22:36:02,814] ERROR [ReplicaFetcherThread-0-5], Error for
partition [topic,208] to broker
5:org.apache.kafka.common.errors.NotLeaderForPartitionException: This
server is not the leader for that topic-partition.
(kafka.server.ReplicaFetcherThread)

And after restart that controller, everything works again.


On Tue, Mar 22, 2016 at 6:14 PM, Qi Xu <shkir...@gmail.com> wrote:

> Hi folks, Rajiv, Jun,
> I'd like to bring up this thread again from Rajiv Kurian 3 months ago.
> Basically we did the same thing as Rajiv did. I upgraded two machines (out
> of 10) from 0.8.2.1 to 0.9. SO after the upgrade, there will be 2 machines
> in 0.9 and 8 machines in 0.8.2.1. And initially it all works fine. But
> after about 2 hours, all old uploaders and consumers are broken due to no
> leader found for all partitions of all topics. The producer just complains
> "unknown error for topic xxx when it tries to refresh the metadata". And in
> server side there's some error complaining no leader for a partition.
> I'm wondering is there any known issue about 0.9 and 0.8.2 co-existing
> version in the same cluster? Thanks a lot.
>
>
> Below is the original thread:
>
> We had to revert to 0.8.3 because three of our topics seem to have gotten
> corrupted during the upgrade. As soon as we did the upgrade producers to
> the three topics I mentioned stopped being able to do writes. The clients
> complained (occasionally) about leader not found exceptions. We restarted
> our clients and brokers but that didn't seem to help. Actually even after
> reverting to 0.8.3 these three topics were broken. To fix it we had to stop
> all clients, delete the topics, create them again and then restart the
> clients.
>
> I realize this is not a lot of info. I couldn't wait to get more debug info
> because the cluster was actually being used. Has any one run into something
> like this? Are there any known issues with old consumers/producers. The
> topics that got busted had clients writing to them using the old Java
> wrapper over the Scala producer.
>
> Here are the steps I took to upgrade.
>
> For each broker:
>
> 1. Stop the broker.
> 2. Restart with the *0.9* broker running with
> inter.broker.protocol.version=*0.8.2*.X
> 3. Wait for under replicated partitions to go down to 0.
> 4. Go to step 1.
> Once all the brokers were running the *0.9* code with
> inter.broker.protocol.version=*0.8.2*.X we restarted them one by one with
> inter.broker.protocol.version=0.9.0.0
>
> When reverting I did the following.
>
> For each broker.
>
> 1. Stop the broker.
> 2. Restart with the *0.9* broker running with
> inter.broker.protocol.version=*0.8.2*.X
> 3. Wait for under replicated partitions to go down to 0.
> 4. Go to step 1.
>
> Once all the brokers were running *0.9* code with
> inter.broker.protocol.version=*0.8.2*.X  I restarted them one by one with
> the
> 0.8.2.3 broker code. This however like I mentioned did not fix the three
> broken topics.
>


Fallout from upgrading to kafka 0.9.0.0 from 0.8.2.1

2016-03-22 Thread Qi Xu
Hi folks, Rajiv, Jun,
I'd like to bring up this thread again from Rajiv Kurian 3 months ago.
Basically we did the same thing as Rajiv did. I upgraded two machines (out
of 10) from 0.8.2.1 to 0.9. SO after the upgrade, there will be 2 machines
in 0.9 and 8 machines in 0.8.2.1. And initially it all works fine. But
after about 2 hours, all old uploaders and consumers are broken due to no
leader found for all partitions of all topics. The producer just complains
"unknown error for topic xxx when it tries to refresh the metadata". And in
server side there's some error complaining no leader for a partition.
I'm wondering is there any known issue about 0.9 and 0.8.2 co-existing
version in the same cluster? Thanks a lot.


Below is the original thread:

We had to revert to 0.8.3 because three of our topics seem to have gotten
corrupted during the upgrade. As soon as we did the upgrade producers to
the three topics I mentioned stopped being able to do writes. The clients
complained (occasionally) about leader not found exceptions. We restarted
our clients and brokers but that didn't seem to help. Actually even after
reverting to 0.8.3 these three topics were broken. To fix it we had to stop
all clients, delete the topics, create them again and then restart the
clients.

I realize this is not a lot of info. I couldn't wait to get more debug info
because the cluster was actually being used. Has any one run into something
like this? Are there any known issues with old consumers/producers. The
topics that got busted had clients writing to them using the old Java
wrapper over the Scala producer.

Here are the steps I took to upgrade.

For each broker:

1. Stop the broker.
2. Restart with the *0.9* broker running with
inter.broker.protocol.version=*0.8.2*.X
3. Wait for under replicated partitions to go down to 0.
4. Go to step 1.
Once all the brokers were running the *0.9* code with
inter.broker.protocol.version=*0.8.2*.X we restarted them one by one with
inter.broker.protocol.version=0.9.0.0

When reverting I did the following.

For each broker.

1. Stop the broker.
2. Restart with the *0.9* broker running with
inter.broker.protocol.version=*0.8.2*.X
3. Wait for under replicated partitions to go down to 0.
4. Go to step 1.

Once all the brokers were running *0.9* code with
inter.broker.protocol.version=*0.8.2*.X  I restarted them one by one with
the
0.8.2.3 broker code. This however like I mentioned did not fix the three
broken topics.


Help: Producer with SSL did not work after upgrading the kafka 0.8.2 to Kafka 0.9

2016-03-19 Thread Qi Xu
Hi folks,
We just finished the upgrade from 0.8.2 to 0.9 with the instructions in
Kafka web site (that set the protocol version as 0.8.2.x in Kafka server
0.9).
After the upgrade, we want to try the producer with SSL endpoint, but never
worked. Here's the error:

~/kafka_2.11-0.9.0.0$ ./bin/kafka-console-producer.sh --topic testtopic1
--broker-list  --producer.config  ./config/producer.properties
..
[2016-03-17 01:24:46,481] WARN Error while fetching metadata with
correlation id 0 : {testtopic1=UNKNOWN}
(org.apache.kafka.clients.NetworkClient)
[2016-03-17 01:24:46,613] WARN Error while fetching metadata with
correlation id 1 : {testtopic1=UNKNOWN}
(org.apache.kafka.clients.NetworkClient)
[2016-03-17 01:24:46,759] WARN Error while fetching metadata with
correlation id 2 : {testtopic1=UNKNOWN}
(org.apache.kafka.clients.NetworkClient)
[2016-03-17 01:24:46,901] WARN Error while fetching metadata with
correlation id 3 : {testtopic1=UNKNOWN}
(org.apache.kafka.clients.NetworkClient)
[2016-03-17 01:24:47,046] WARN Error while fetching metadata with
correlation id 4 : {testtopic1=UNKNOWN}
(org.apache.kafka.clients.NetworkClient)


In producer.properties, I specified all security information needed, as
below:

security.protocol=SSL
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.keystore.location=/usr/share/kafka/config/server.keystore
ssl.keystore.password=Password
ssl.key.password=Password
ssl.truststore.type=JKS
ssl.truststore.location=/usr/share/kafka/config/server.keystore
ssl.truststore.password=Password

In server side, I don't see any obvious error.
Any idea or hint is very appreciated. Thanks a lot.


Thanks,
Qi


Re: How to bind all Kafka tcp port to private net address

2016-02-01 Thread costa xu
yes, the host.name is useless in this case.
Even if I set the host.name=private ip, the broker also bind on 0.0.0.0

2016-02-01 23:27 GMT+08:00 John Prout <john.pr...@acxiom.com>:

> I have set the host.name option in the server.properties file, but the
> Broker is still binding to all interfaces, and logging that's what it is
> doing.
>
> This is with kafka 0.9.0 running on a Solaris 10 server with 3 Virtual
> interfaces installed, in addition to the Physical interface.
>
> John
>
> -Original Message-
> From: Stephen Powis [mailto:spo...@salesforce.com]
> Sent: Friday, January 29, 2016 10:03 AM
> To: users@kafka.apache.org
> Subject: Re: How to bind all Kafka tcp port to private net address
>
> Pretty sure you want to set this option in your server.properties file:
>
> # Hostname the broker will bind to. If not set, the server will bind to all
> > interfaces
> > #host.name=localhost
> >
>
> On Thu, Jan 28, 2016 at 10:58 PM, costa xu <xxb.sk...@gmail.com> wrote:
>
> > My version is kafka_2.11-0.9.0.0. I find that the kafka listen on
> > multi tcp port on a linux server.
> >
> > [gdata@gdataqosconnd2 kafka_2.11-0.9.0.0]$ netstat -plnt|grep java
> > (Not all processes could be identified, non-owned process info  will
> > not be shown, you would have to be root to see it all.)
> > tcp0  0 10.105.7.243:9092   0.0.0.0:*
> > LISTEN  31011/java
> > tcp0  0 0.0.0.0:51367   0.0.0.0:*
> > LISTEN  31011/java
> > tcp0  0 0.0.0.0:11050.0.0.0:*
> > LISTEN  31011/java
> > tcp0  0 0.0.0.0:42592   0.0.0.0:*
> > LISTEN  31011/java
> >
> > 10.105.7.243:9092 is the broker's port.0 0.0.0.0:1105 is the jmx port
> > that I set in the start script.
> > But I dont know what is the 0 0.0.0.0:51367 and 0 0.0.0.0:42592. And
> > more tricky, the port will change after restarting of the kafka process.
> >
> > So  I want to know how to bind the kafka port to private interface
> > just like '10.105.7.243'.
> > If I can not bind them, can I set the fixed listened port number?
> >
> > My kafka server.properties is:
> > # Licensed to the Apache Software Foundation (ASF) under one or more #
> > contributor license agreements.  See the NOTICE file distributed with
> > # this work for additional information regarding copyright ownership.
> > # The ASF licenses this file to You under the Apache License, Version
> > 2.0 # (the "License"); you may not use this file except in compliance
> > with # the License.  You may obtain a copy of the License at #
> > #http://www.apache.org/licenses/LICENSE-2.0
> > #
> > # Unless required by applicable law or agreed to in writing, software
> > # distributed under the License is distributed on an "AS IS" BASIS, #
> > WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> > # See the License for the specific language governing permissions and
> > # limitations under the License.
> > # see kafka.server.KafkaConfig for additional details and defaults
> >
> > # Server Basics
> > #
> >
> > # The id of the broker. This must be set to a unique integer for each
> > broker.
> > broker.id=1
> >
> > # Socket Server Settings
> > #
> >
> > listeners=PLAINTEXT://10.105.7.243:9092
> >
> > # The port the socket server listens on
> > #port=9092
> >
> > # Hostname the broker will bind to. If not set, the server will bind
> > to all interfaces #host.name=localhost
> >
> > # Hostname the broker will advertise to producers and consumers. If
> > not set, it uses the # value for "host.name" if configured.
> > Otherwise, it will use the value returned from #
> > java.net.InetAddress.getCanonicalHostName().
> > #advertised.host.name=
> >
> > # The port to publish to ZooKeeper for clients to use. If this is not
> > set, # it will publish the same port that the broker binds to.
> > #advertised.port=
> >
> > # The number of threads handling network requests
> > num.network.threads=3
> >
> > # The number of threads doing disk I/O
> > num.io.threads=8
> >
> > # The send buffer (SO_SNDBUF) used by the socket server
> > socket.send.buffer.bytes=102400
> >
> > # The receive buffer (SO_RCVBUF) used by the socket server
> > socket.receive.buffer.bytes=102400
> >
> > # The maximum size of a request that the soc

How to bind all Kafka tcp port to private net address

2016-01-28 Thread costa xu
My version is kafka_2.11-0.9.0.0. I find that the kafka listen on multi tcp
port on a linux server.

[gdata@gdataqosconnd2 kafka_2.11-0.9.0.0]$ netstat -plnt|grep java
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp0  0 10.105.7.243:9092   0.0.0.0:*
LISTEN  31011/java
tcp0  0 0.0.0.0:51367   0.0.0.0:*
LISTEN  31011/java
tcp0  0 0.0.0.0:11050.0.0.0:*
LISTEN  31011/java
tcp0  0 0.0.0.0:42592   0.0.0.0:*
LISTEN  31011/java

10.105.7.243:9092 is the broker's port.0 0.0.0.0:1105 is the jmx port that
I set in the start script.
But I dont know what is the 0 0.0.0.0:51367 and 0 0.0.0.0:42592. And more
tricky, the port will change after restarting of the kafka process.

So  I want to know how to bind the kafka port to private interface just
like '10.105.7.243'.
If I can not bind them, can I set the fixed listened port number?

My kafka server.properties is:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

# Server Basics #

# The id of the broker. This must be set to a unique integer for each
broker.
broker.id=1

# Socket Server Settings
#

listeners=PLAINTEXT://10.105.7.243:9092

# The port the socket server listens on
#port=9092

# Hostname the broker will bind to. If not set, the server will bind to all
interfaces
#host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not
set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value
returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept
(protection against OOM)
socket.request.max.bytes=104857600


# Log Basics #

# A comma seperated list of directories under which to store log files
log.dirs=/data/gdata/var/kafka-logs

# The default number of log partitions per topic. More partitions allow
greater
# parallelism for consumption, but this will also result in more files
across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at
startup and flushing at shutdown.
# This value is recommended to be increased for installations with data
dirs located in RAID array.
num.recovery.threads.per.data.dir=1

# Log Flush Policy #

# Messages are immediately written to the filesystem but by default we only
fsync() to sync
# the OS cache lazily. The following configurations control the flush of
data to disk.
# There are a few important trade-offs here:
#1. Durability: Unflushed data may be lost if you are not using
replication.
#2. Latency: Very large flush intervals may lead to latency spikes when
the flush does occur as there will be a lot of data to flush.
#3. Throughput: The flush is generally the most expensive operation,
and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data
after a period of time or
# every N messages (or both). This can be done globally and overridden on a
per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=1

# The maximum amount of time a message can sit in a log before we force a
flush
#log.flush.interval.ms=1000

# Log Retention Policy
#

# The following configurations control the disposal of 

Re: All brokers are running but some partitions' leader is -1

2015-11-25 Thread Qi Xu
Great to know that. Thanks Gwen!

On Wed, Nov 25, 2015 at 12:03 PM, Gwen Shapira <g...@confluent.io> wrote:

> 1. Yes, you can do a rolling upgrade of brokers from 0.8.2 to 0.9.0. The
> important thing is to upgrade the brokers before you upgrade any of the
> clients.
>
> 2. I'm not aware of issues with 0.9.0 and SparkStreaming. However,
> definitely do your own testing to make sure.
>
> On Wed, Nov 25, 2015 at 11:25 AM, Qi Xu <shkir...@gmail.com> wrote:
>
> > Hi Gwen,
> > Yes, we're going to upgrade the 0.9.0 version. Regarding the upgrade, we
> > definitely don't want to have down time of our cluster.
> > So the upgrade will be machine by machine. Will the release 0.9.0 work
> with
> > the Aug's version together in the same Kafka cluster?
> > Also we currently run spark streaming job (with scala 2.10) against the
> > cluster. Any known issues of 0.9.0 are you aware of under this scenario?
> >
> > Thanks,
> > Tony
> >
> >
> > On Mon, Nov 23, 2015 at 5:41 PM, Gwen Shapira <g...@confluent.io> wrote:
> >
> > > We fixed many many bugs since August. Since we are about to release
> 0.9.0
> > > (with SSL!), maybe wait a day and go with a released and tested
> version.
> > >
> > > On Mon, Nov 23, 2015 at 3:01 PM, Qi Xu <shkir...@gmail.com> wrote:
> > >
> > > > Forgot to mention is that the Kafka version we're using is from Aug's
> > > > Trunk branch---which has the SSL support.
> > > >
> > > > Thanks again,
> > > > Qi
> > > >
> > > >
> > > > On Mon, Nov 23, 2015 at 2:29 PM, Qi Xu <shkir...@gmail.com> wrote:
> > > >
> > > >> Loop another guy from our team.
> > > >>
> > > >> On Mon, Nov 23, 2015 at 2:26 PM, Qi Xu <shkir...@gmail.com> wrote:
> > > >>
> > > >>> Hi folks,
> > > >>> We have a 10 node cluster and have several topics. Each topic has
> > about
> > > >>> 256 partitions with 3 replica factor. Now we run into an issue that
> > in
> > > some
> > > >>> topic, a few partition (< 10)'s leader is -1 and all of them has
> only
> > > one
> > > >>> synced partition.
> > > >>>
> > > >>> From the Kafka manager, here's the snapshot:
> > > >>> [image: Inline image 2]
> > > >>>
> > > >>> [image: Inline image 1]
> > > >>>
> > > >>> here's the state log:
> > > >>> [2015-11-23 21:57:58,598] ERROR Controller 1 epoch 435499 initiated
> > > >>> state change for partition [userlogs,84] from OnlinePartition to
> > > >>> OnlinePartition failed (state.change.logger)
> > > >>> kafka.common.StateChangeFailedException: encountered error while
> > > >>> electing leader for partition [userlogs,84] due to: Preferred
> replica
> > > 0 for
> > > >>> partition [userlogs,84] is either not alive or not in the isr.
> > Current
> > > >>> leader and ISR: [{"leader":-1,"leader_epoch":203,"isr":[1]}].
> > > >>> Caused by: kafka.common.StateChangeFailedException: Preferred
> > replica 0
> > > >>> for partition [userlogs,84] is either not alive or not in the isr.
> > > Current
> > > >>> leader and ISR: [{"leader":-1,"leader_epoch":203,"isr":[1]}]
> > > >>>
> > > >>> My question is:
> > > >>> 1) how could this happen and how can I fix it or work around it?
> > > >>> 2) Is 256 partitions too big? We have about 200+ cores for spark
> > > >>> streaming job.
> > > >>>
> > > >>> Thanks,
> > > >>> Qi
> > > >>>
> > > >>>
> > > >>
> > > >
> > >
> >
>


Re: All brokers are running but some partitions' leader is -1

2015-11-25 Thread Qi Xu
Hi Gwen,
Yes, we're going to upgrade the 0.9.0 version. Regarding the upgrade, we
definitely don't want to have down time of our cluster.
So the upgrade will be machine by machine. Will the release 0.9.0 work with
the Aug's version together in the same Kafka cluster?
Also we currently run spark streaming job (with scala 2.10) against the
cluster. Any known issues of 0.9.0 are you aware of under this scenario?

Thanks,
Tony


On Mon, Nov 23, 2015 at 5:41 PM, Gwen Shapira <g...@confluent.io> wrote:

> We fixed many many bugs since August. Since we are about to release 0.9.0
> (with SSL!), maybe wait a day and go with a released and tested version.
>
> On Mon, Nov 23, 2015 at 3:01 PM, Qi Xu <shkir...@gmail.com> wrote:
>
> > Forgot to mention is that the Kafka version we're using is from Aug's
> > Trunk branch---which has the SSL support.
> >
> > Thanks again,
> > Qi
> >
> >
> > On Mon, Nov 23, 2015 at 2:29 PM, Qi Xu <shkir...@gmail.com> wrote:
> >
> >> Loop another guy from our team.
> >>
> >> On Mon, Nov 23, 2015 at 2:26 PM, Qi Xu <shkir...@gmail.com> wrote:
> >>
> >>> Hi folks,
> >>> We have a 10 node cluster and have several topics. Each topic has about
> >>> 256 partitions with 3 replica factor. Now we run into an issue that in
> some
> >>> topic, a few partition (< 10)'s leader is -1 and all of them has only
> one
> >>> synced partition.
> >>>
> >>> From the Kafka manager, here's the snapshot:
> >>> [image: Inline image 2]
> >>>
> >>> [image: Inline image 1]
> >>>
> >>> here's the state log:
> >>> [2015-11-23 21:57:58,598] ERROR Controller 1 epoch 435499 initiated
> >>> state change for partition [userlogs,84] from OnlinePartition to
> >>> OnlinePartition failed (state.change.logger)
> >>> kafka.common.StateChangeFailedException: encountered error while
> >>> electing leader for partition [userlogs,84] due to: Preferred replica
> 0 for
> >>> partition [userlogs,84] is either not alive or not in the isr. Current
> >>> leader and ISR: [{"leader":-1,"leader_epoch":203,"isr":[1]}].
> >>> Caused by: kafka.common.StateChangeFailedException: Preferred replica 0
> >>> for partition [userlogs,84] is either not alive or not in the isr.
> Current
> >>> leader and ISR: [{"leader":-1,"leader_epoch":203,"isr":[1]}]
> >>>
> >>> My question is:
> >>> 1) how could this happen and how can I fix it or work around it?
> >>> 2) Is 256 partitions too big? We have about 200+ cores for spark
> >>> streaming job.
> >>>
> >>> Thanks,
> >>> Qi
> >>>
> >>>
> >>
> >
>


Re: All brokers are running but some partitions' leader is -1

2015-11-24 Thread Qi Xu
Thanks a lot Prabhjot!
The issue is mitigated by running the preferred replica leader election
tool! Before that, I noticed that it simply could not do leader
election---when I created a new topic, that topic is not available for a
long time until preferred replica leader election finishes.

For the 3 steps above,
1. The replicas are evenly distributed
2. There's some imbalance in terms of the load among brokers, but not
significant. But I guess there might be some brokers down and then up
again---we have agent to restart it automatically.
3. Spark running in another set of machines. The kafka server's CPU/memory
usage is well below 50%.



On Mon, Nov 23, 2015 at 11:18 PM, Prabhjot Bharaj <prabhbha...@gmail.com>
wrote:

> Hi,
>
> With the information provided, these are the steps I can think of (based on
> the experience I had with kafka):-
>
> 1. do a describe on the topic. See if the partitions and replicas are
> evenly distributed amongst all. If not, you might want to try the 'Reassign
> Partitions Tool' -
>
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool
> 2. is/are some partition(s) getting more data than others leading to an
> imbalance of disk space amongst the nodes in the cluster, to an extent that
> the kafka server process goes down on one or more machines in the cluster ?
> 3. From what I understand, your kafka and spark machines are the same ?? !!
> how much memory usage the replica-0 has when your spark cluster is running
> full throttle ?
>
> Workaround -
> Try running the Preferred Replica Leader Election Tool -
>
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-1.PreferredReplicaLeaderElectionTool
> to make some replica (the one that you noticed earlier when the cluster was
> all good) as the leader for this partition
>
> Regards,
> Prabhjot
>
> On Tue, Nov 24, 2015 at 7:11 AM, Gwen Shapira <g...@confluent.io> wrote:
>
> > We fixed many many bugs since August. Since we are about to release 0.9.0
> > (with SSL!), maybe wait a day and go with a released and tested version.
> >
> > On Mon, Nov 23, 2015 at 3:01 PM, Qi Xu <shkir...@gmail.com> wrote:
> >
> > > Forgot to mention is that the Kafka version we're using is from Aug's
> > > Trunk branch---which has the SSL support.
> > >
> > > Thanks again,
> > > Qi
> > >
> > >
> > > On Mon, Nov 23, 2015 at 2:29 PM, Qi Xu <shkir...@gmail.com> wrote:
> > >
> > >> Loop another guy from our team.
> > >>
> > >> On Mon, Nov 23, 2015 at 2:26 PM, Qi Xu <shkir...@gmail.com> wrote:
> > >>
> > >>> Hi folks,
> > >>> We have a 10 node cluster and have several topics. Each topic has
> about
> > >>> 256 partitions with 3 replica factor. Now we run into an issue that
> in
> > some
> > >>> topic, a few partition (< 10)'s leader is -1 and all of them has only
> > one
> > >>> synced partition.
> > >>>
> > >>> From the Kafka manager, here's the snapshot:
> > >>> [image: Inline image 2]
> > >>>
> > >>> [image: Inline image 1]
> > >>>
> > >>> here's the state log:
> > >>> [2015-11-23 21:57:58,598] ERROR Controller 1 epoch 435499 initiated
> > >>> state change for partition [userlogs,84] from OnlinePartition to
> > >>> OnlinePartition failed (state.change.logger)
> > >>> kafka.common.StateChangeFailedException: encountered error while
> > >>> electing leader for partition [userlogs,84] due to: Preferred replica
> > 0 for
> > >>> partition [userlogs,84] is either not alive or not in the isr.
> Current
> > >>> leader and ISR: [{"leader":-1,"leader_epoch":203,"isr":[1]}].
> > >>> Caused by: kafka.common.StateChangeFailedException: Preferred
> replica 0
> > >>> for partition [userlogs,84] is either not alive or not in the isr.
> > Current
> > >>> leader and ISR: [{"leader":-1,"leader_epoch":203,"isr":[1]}]
> > >>>
> > >>> My question is:
> > >>> 1) how could this happen and how can I fix it or work around it?
> > >>> 2) Is 256 partitions too big? We have about 200+ cores for spark
> > >>> streaming job.
> > >>>
> > >>> Thanks,
> > >>> Qi
> > >>>
> > >>>
> > >>
> > >
> >
>
>
>
> --
> -
> "There are only 10 types of people in the world: Those who understand
> binary, and those who don't"
>


Re: All brokers are running but some partitions' leader is -1

2015-11-23 Thread Qi Xu
Loop another guy from our team.

On Mon, Nov 23, 2015 at 2:26 PM, Qi Xu <shkir...@gmail.com> wrote:

> Hi folks,
> We have a 10 node cluster and have several topics. Each topic has about
> 256 partitions with 3 replica factor. Now we run into an issue that in some
> topic, a few partition (< 10)'s leader is -1 and all of them has only one
> synced partition.
>
> From the Kafka manager, here's the snapshot:
> [image: Inline image 2]
>
> [image: Inline image 1]
>
> here's the state log:
> [2015-11-23 21:57:58,598] ERROR Controller 1 epoch 435499 initiated state
> change for partition [userlogs,84] from OnlinePartition to OnlinePartition
> failed (state.change.logger)
> kafka.common.StateChangeFailedException: encountered error while electing
> leader for partition [userlogs,84] due to: Preferred replica 0 for
> partition [userlogs,84] is either not alive or not in the isr. Current
> leader and ISR: [{"leader":-1,"leader_epoch":203,"isr":[1]}].
> Caused by: kafka.common.StateChangeFailedException: Preferred replica 0
> for partition [userlogs,84] is either not alive or not in the isr. Current
> leader and ISR: [{"leader":-1,"leader_epoch":203,"isr":[1]}]
>
> My question is:
> 1) how could this happen and how can I fix it or work around it?
> 2) Is 256 partitions too big? We have about 200+ cores for spark streaming
> job.
>
> Thanks,
> Qi
>
>


Re: Kafka cluster cannot start anymore after unexpected shutdown

2015-09-11 Thread Qi Xu
And I tried to clean up the whole kafka-logs folder, and then starts the
kafka server again.
It will the following errors:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/usr/share/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/share/kafka/tools/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
[2015-09-12 00:58:32,203] WARN No meta.properties file under dir
/datadrive/kafka-logs/meta.properties
(kafka.server.BrokerMetadataCheckpoint)
Exception in thread "kafka-log-cleaner-thread-0"
java.lang.OutOfMemoryError: Java heap space

Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "kafka-network-thread-3-SSL-33"

Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "kafka-network-thread-3-SSL-51"
log4j:ERROR No output stream or file set for the appender named
[kafkaAppender].

Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "kafka-network-thread-3-SSL-56"
[2015-09-12 00:59:54,546] ERROR Uncaught exception in thread
'kafka-network-thread-3-SSL-33': (org.apache.kafka.common.utils.Utils)
[2015-09-12 01:00:34,261] ERROR Uncaught exception in thread
'kafka-network-thread-3-SSL-51': (org.apache.kafka.common.utils.Utils)
[2015-09-12 01:00:51,108] ERROR Uncaught exception in thread
'kafka-network-thread-3-SSL-56': (org.apache.kafka.common.utils.Utils)
[2015-09-12 01:00:48,867] ERROR Processor got uncaught exception.
(kafka.network.Processor)
[2015-09-12 01:00:48,867] ERROR Processor got uncaught exception.
(kafka.network.Processor)
[2015-09-12 01:00:49,121] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 01:00:57,643] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 01:00:54,028] ERROR Uncaught exception in thread
'kafka-network-thread-3-SSL-40': (org.apache.kafka.common.utils.Utils)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 01:00:53,656] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 01:00:53,656] ERROR Uncaught exception in thread
'kafka-network-thread-3-SSL-58': (org.apache.kafka.common.utils.Utils)
java.lang.OutOfMemoryError: Java heap space

Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "main-SendThread(10.1.130.12:2181)"

Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "kafka-network-thread-3-SSL-46"

Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "kafka-network-thread-3-SSL-61"
[2015-09-12 01:00:53,043] WARN Session 0x24f84cb22a90029 for server
10.1.130.12/10.1.130.12:2181, unexpected error, closing socket connection
and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2015-09-12 01:01:22,419] ERROR from main-SendThread(10.1.130.12:2181)
(org.apache.zookeeper.ClientCnxn)
[2015-09-12 01:01:22,051] ERROR Processor got uncaught exception.
(kafka.network.Processor)
[2015-09-12 01:01:23,489] ERROR Uncaught exception in thread
'kafka-network-thread-3-SSL-46': (org.apache.kafka.common.utils.Utils)
[2015-09-12 01:01:21,313] ERROR Processor got uncaught exception.
(kafka.network.Processor)
[2015-09-12 01:01:24,438] ERROR Uncaught exception in thread
'kafka-network-thread-3-SSL-61': (org.apache.kafka.common.utils.Utils)
[2015-09-12 01:01:18,075] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 01:01:02,539] ERROR Uncaught exception in scheduled task
'kafka-recovery-point-checkpoint' (kafka.utils.KafkaScheduler)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 01:01:34,023] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 01:01:35,676] ERROR Uncaught exception in scheduled task
'kafka-recovery-point-checkpoint' (kafka.utils.KafkaScheduler)


And the broker does not appear in zookeeper.
Any idea?

Thanks,
Qi

On Fri, Sep 11, 2015 at 5:53 PM, Qi Xu <shkir...@gmail.com> wrote:

> Hi,
> We're running the Trunk version of Kafka (for its SSL feature) and
> recently I'm trying to enable the kafka manager with it.
> After enabling that,  I find out some machine's Kafka Server is dead.
> Looking at the server.log, it has the following logs.
> java.lang.OutOfMemoryError: Java heap space
> [2015-09-12 00:19:41,310] ERROR Processor got uncaught exception.
> (kafka.network.Processor)
&g

Kafka cluster cannot start anymore after unexpected shutdown

2015-09-11 Thread Qi Xu
Hi,
We're running the Trunk version of Kafka (for its SSL feature) and recently
I'm trying to enable the kafka manager with it.
After enabling that,  I find out some machine's Kafka Server is dead.
Looking at the server.log, it has the following logs.
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 00:19:41,310] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 00:19:44,766] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 00:19:48,567] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 00:19:52,112] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 00:19:56,307] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space


And then I tried to start Kafka on these dead machines, but find the errors
below:
[2015-09-12 00:43:44,878] WARN Found an corrupted index file,
/datadrive/kafka-logs/userlogs-139/.index, deleting and
rebuilding index... (kafka.log.Log)
[2015-09-12 00:43:44,883] WARN Found an corrupted index file,
/datadrive/kafka-logs/userlogs-166/.index, deleting and
rebuilding index... (kafka.log.Log)
[2015-09-12 00:43:44,888] WARN Found an corrupted index file,
/datadrive/kafka-logs/exoactivitylogs-114/.index,
deleting and rebuilding index... (kafka.log.Log)
[2015-09-12 00:43:44,895] WARN Found an corrupted index file,
/datadrive/kafka-logs/exoactivitylogs-15/.index,
deleting and rebuilding index... (kafka.log.Log)
[2015-09-12 00:43:44,900] WARN Found an corrupted index file,
/datadrive/kafka-logs/exoactivitylogs-178/.index,
deleting and rebuilding index... (kafka.log.Log)
[2015-09-12 00:43:44,906] WARN Found an corrupted index file,
/datadrive/kafka-logs/__consumer_offsets-4/.index,
deleting and rebuilding index... (kafka.log.Log)
[2015-09-12 00:43:44,912] WARN Found an corrupted index file,
/datadrive/kafka-logs/userlogs-147/.index, deleting and
rebuilding index... (kafka.log.Log)
[2015-09-12 00:43:44,917] WARN Found an corrupted index file,
/datadrive/kafka-logs/exoactivitylogs-95/.index,
deleting and rebuilding index... (kafka.log.Log)
Exception in thread "kafka-log-cleaner-thread-0"
java.lang.OutOfMemoryError: Java heap space

Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "main-SendThread(10.1.130.12:2181)"
[2015-09-12 00:44:39,466] WARN Session 0x14f84cab5240046 for server
10.1.130.12/10.1.130.12:2181, unexpected error, closing socket connection
and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2015-09-12 00:44:45,169] ERROR from main-SendThread(10.1.130.12:2181)
(org.apache.zookeeper.ClientCnxn)
[2015-09-12 00:45:39,466] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 00:45:39,467] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 00:45:39,732] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 00:45:41,299] ERROR Uncaught exception in scheduled task
'highwatermark-checkpoint' (kafka.utils.KafkaScheduler)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 00:45:49,070] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 00:45:49,702] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 00:45:49,912] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space
[2015-09-12 00:45:50,140] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.lang.OutOfMemoryError: Java heap space

>From zookeeper, I can see that node appear in /brokers/ids, but disappear
when the error above happens.

Do you know what's the problem? Any hint is very appreciated.

Thanks,
Qi


Huge Kafka Message size ( 386073344 ) in response

2015-09-03 Thread Qi Xu
Hi all,
I'm using the Kafka.Net library for implementing the Kafka Producer.
One issue I find out is that sometimes it reads the response from kafka
server, which indicates a huge message size 386073344. Apparently something
must be wrong.
But I'm not sure if it's a special flag that Kafka.net fails to handle or
it's a bug in Kafka Server side. Did you see this before?

Thanks,
Qi


Kafka metadata

2015-08-07 Thread Qi Xu
Hi Everyone,
I have a question that hopes to get some clarification.
In a Kafka cluster, does every broker have the complete view of the
metadata information?
What's the best practice for aproducer to send metadata request? Is it
recommended to send it to all brokers or just one broker?

In our scenario, we want to avoid the situation that each producer needs to
talk to every broker because we have hundreds thousands of producers and
the scalability of connection number will be a concern if every cluster
node is connected by any producer.

Thanks you for your help.

Qi


How to monitor consuming rate and lag?

2015-06-30 Thread Shady Xu
Hi all,

I'm now using https://github.com/airbnb/kafka-statsd-metrics2 to monitor
our Kafka cluster. But there are not metrics about consuming rate and lag,
which are key performance metrics we care about.

So how do you guys monitor consuming rate and lag of each consumer group?


Re: offsets.storage=kafka, dual.commit.enabled=false still requires ZK

2015-06-10 Thread Shady Xu
Looking forward to the 0.8.3 release.

BTW, some official management consoles should be better. Non of the ones
mentioned on the official website rocks.

2015-06-10 2:38 GMT+08:00 Ewen Cheslack-Postava e...@confluent.io:

 The new consumer implementation, which should be included in 0.8.3, only
 needs a bootstrap.servers setting and does not use a zookeeper connection.

 On Tue, Jun 9, 2015 at 1:26 PM, noah iamn...@gmail.com wrote:

  We are setting up a new Kafka project (0.8.2.1) and are trying to go
  straight to consumer offsets stored in Kafka. Unfortunately it looks like
  the Java consumer will try to connect to ZooKeeper regardless of the
  settings.
 
  Will/When will this dependency go away completely? It would simplify our
  deployments if our consumers didn't have to connect to ZooKeeper at all.
 
  P.S. I've asked this on Stack Overflow, if you would like to answer there
  for posterity:
 
 
 http://stackoverflow.com/questions/30719331/kafka-0-8-2-1-offsets-storage-kafka-still-requires-zookeeper
 



 --
 Thanks,
 Ewen



Re: Increased replication factor. Replication didn't happen!

2015-06-10 Thread Shady Xu
Right now, Kafka topics do not support changing replication factor or
partition number after creation. The  kafka-reassign-partitions.sh tool can
only reassign existent partitions.

2015-06-11 9:31 GMT+08:00 Gwen Shapira gshap...@cloudera.com:

 What do the logs show?

 On Wed, Jun 10, 2015 at 5:07 PM, Dillian Murphey
 crackshotm...@gmail.com wrote:
  Ran this:
 
  $KAFKA_HOME/bin/kafka-reassign-partitions.sh
 
  But Kafka did not actually do the replication. Topic description shows
 the
  right numbers, but it just didn't replicate.
 
  What's wrong, and how do I trigger the replication to occur??
 
  I'm running 0.8.2.0
 
  thanks



Kafka issue with FetchRequest buffer size

2015-06-01 Thread Zhenzhong Xu
Hi,

We recently ran into a scenario where we initiate a FetechRequest with a
fixed fetchSize (64k) shown below using Simple Consumer. When the broker
contains an unusually large sized message, this resulted in the broker
returns an empty message set *without any error code*. According to the
document
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example,
this is the expected behavior (***Note also that we ask for a fetchSize of
10 bytes. If the Kafka producers are writing large batches, this might
not be enough, and might return an empty message set. In this case, the
fetchSize should be increased until a non-empty set is returned.*).

However, I am arguing against the behavior of not returning an error code,
because without any indication, the consumer would keep retrying without
realizing whether it's at the end of offset or an actual problem with the
message size. This makes monitoring hard as well. Was there any particular
reason why error code was not returned in this sitation?


FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 1024 * 64)
.build();


Thanks!
Z.


Re: Offset management: client vs broker side responsibility

2015-05-27 Thread Shady Xu
I guess adding a new component will increase the complexity of the system
structure. And if the new component consists of one or a few nodes, it may
becomes the bottleneck of the whole system, if it consists of many nodes,
it will make the system even more complex.

Although every solution has its downsides, I think the current one is
decent.

2015-05-27 17:10 GMT+08:00 Stevo Slavić ssla...@gmail.com:

 It could be a separate server component, does not have to be
 monolith/coupled with broker.
 Such solution would have benefits - single API, pluggable implementations.

 On Wed, May 27, 2015 at 8:57 AM, Shady Xu shad...@gmail.com wrote:

  Storing and managing offsets by broker will leave high pressure on the
  brokers which will affect the performance of the cluster.
 
  You can use the advanced consumer APIs, then you can get the offsets
 either
  from zookeeper or the __consumer_offsets__ topic. On the other hand, if
 you
  use the simple consumer APIs, you mean to manage offsets yourself, then
 you
  should monitor them yourself, simple and plain, right?
 
  2015-04-22 14:36 GMT+08:00 Stevo Slavić ssla...@gmail.com:
 
   Hello Apache Kafka community,
  
   Please correct me if wrong, AFAIK currently (Kafka 0.8.2.x) offset
   management responsibility is mainly client/consumer side
 responsibility.
  
   Wouldn't it be better if it was broker side only responsibility?
  
   E.g. now if one wants to use custom offset management, any of the Kafka
   monitoring tools cannot see the offsets - they would need to use same
   custom client implementation which is practically not possible.
  
   Kind regards,
   Stevo Slavic.