Re: Kafka BrokerState Metric Value 3

2020-08-19 Thread Karolis Pocius
Note that even when all partitions are in sync, leader election might have
not happened yet and the broker isn't serving anything. Which might be OK,
depending on your actual use case.

On Wed, Aug 19, 2020 at 11:40 AM Dhirendra Singh 
wrote:

> Thank you Peter !
> I intended to use broker state to determine the health but i was not sure.
> I will use under replicated partition metric instead.
>
> --dsingh
>
> On Wed, Aug 19, 2020 at 1:40 PM Peter Bukowinski  wrote:
>
> > The broker state metric just reports on the state of the broker itself,
> > not whether it is in sync. A replacement broker will quickly reach a
> broker
> > state of 3 on startup even though it has to catch up on many replicas.
> > Don’t rely on it for checking if a cluster/broker is healthy with no
> > under-replicated partitions.
> >
> > For that, you can look at the underreplicated partition count metric.
> >
> > -- Peter (from phone)
> >
> > > On Aug 19, 2020, at 12:52 AM, Dhirendra Singh 
> > wrote:
> > >
> > > So is this metric just gives information that broker process up and
> > running
> > > ? or does it indicate something more of broker state or partitions it
> > hold ?
> > >
> > >
> > >
> > >> On Mon, Aug 17, 2020 at 6:17 PM Karolis Pocius
> > >>  wrote:
> > >>
> > >> I tried using this metric for determining when the broker is back in
> the
> > >> cluster and became the leader for partitions it owned before restart,
> > but
> > >> that's not the case.
> > >>
> > >> In the end I've settled for checking
> > >> kafka.server:name=LeaderCount,type=ReplicaManager which tells me when
> > the
> > >> broker is actually operational and serving data.
> > >>
> > >> On Mon, Aug 17, 2020 at 3:29 PM Dhirendra Singh <
> dhirendr...@gmail.com>
> > >> wrote:
> > >>
> > >>> I have a question regarding Kafka BrokerState Metric value 3.
> According
> > >> to
> > >>> the documentation value 3 means running state.
> > >>> What does this running state mean for the broker? Does it mean data
> of
> > >> all
> > >>> partitions on this broker is in sync ?
> > >>> Is it safe to assume that when broker transition to state 3 after
> > restart
> > >>> it recovered all partitions data from the leader and is in sync with
> > the
> > >>> leaders ?
> > >>>
> > >>> Thanks,
> > >>> dsingh
> > >>>
> > >>
> >
>


Re: Kafka BrokerState Metric Value 3

2020-08-17 Thread Karolis Pocius
I tried using this metric for determining when the broker is back in the
cluster and became the leader for partitions it owned before restart, but
that's not the case.

In the end I've settled for checking
kafka.server:name=LeaderCount,type=ReplicaManager which tells me when the
broker is actually operational and serving data.

On Mon, Aug 17, 2020 at 3:29 PM Dhirendra Singh 
wrote:

> I have a question regarding Kafka BrokerState Metric value 3. According to
> the documentation value 3 means running state.
> What does this running state mean for the broker? Does it mean data of all
> partitions on this broker is in sync ?
> Is it safe to assume that when broker transition to state 3 after restart
> it recovered all partitions data from the leader and is in sync with the
> leaders ?
>
> Thanks,
> dsingh
>


Re: _consumer_offsets topic produce rate is dam high

2020-06-24 Thread Karolis Pocius
Check if any of your consumers have auto commit turned off and instead
commit after processing each message.

Also, even if all consumers are using auto commit, maybe some of them have
the interval set to something crazy low like 1 ms.

On Sat, Jun 20, 2020 at 8:31 PM Ashutosh singh  wrote:

> Hi Guys,
>
> Hope you all are doing well.
>
> All of sudden I see very high throughput  for _consumer_offsets topic.  it
> is around  30-40K per second.  What could be the reason for such a high
> rate ?  Do I need to be concerned around this ?
>
> [image: image.png]
>
>
> This happens for an hour and after that it goes down.  Now it is happening
> once or twice daily.
>
> I have 8 node cluster , 1000+ topics and 644 consumer groups.
> All nodes have almost equal number of lead partition across nodes and
> almost equal partition on all nodes.
>
> Kafka version : 2.1.1
>
> If you see above graph, there are no other topics where messages are more
> than 300 Message/sec.  only _consumer_offsets  is having so high through
> put.
>
> I know this is internal topic ans store metadata of topics and consumer
> information.  But I don't see anything abnormal in my cluster then why
> _consumer_offset topic is going crazy.  What is going on here ?
>
> Any help will be appreciated.
>
> --
> Thanx & Regard
> Ashutosh Singh
> 08151945559
>
>


Re: Consumer Group got stuck in CompletingRebalance

2020-06-21 Thread Karolis Pocius
Don't mean to resurrect an old thread, but in case you haven't found the
answer, this sounds exactly like the issue I've encountered after upgrading
to 2.4.0

It was caused by this https://issues.apache.org/jira/browse/KAFKA-9752

On Tue, Apr 14, 2020 at 10:08 AM James Brown  wrote:

> We had a consumer crash and restart itself a bunch of times, and then the
> group got stuck in state CompletingRebalance. All of the consumers were
> dead (I checked that there are no processes running on the host in the
> output of kafka-consumer-groups.sh --describe --group group-name
> --members), but the group was still rebalancing. If I tried to start up a
> consumer, it just timed out trying to join the group.
>
> I tried to delete the group, but that fails with
>
> * Group 'trackers-etl' could not be deleted due to:
> java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.GroupNotEmptyException: The group is not
> empty.
>
> I ended up restarting the broker that was the leader for that group; that
> fixed the issue but is obviously a pretty heavy-weight solution.
>
> There was nothing interesting in the logs on that broker; the last thing it
> logged about that group was:
>
> [2020-04-13 21:37:20,308] INFO [GroupCoordinator 6]: Stabilized group
> trackers-etl generation 4987 (__consumer_offsets-29)
> (kafka.coordinator.group.GroupCoordinator)
>
> Has anyone seen this before? Should I file a JIRA ticket? Was there a
> better process than restarting the broker?
> --
> James Brown
> Systems Engineer
>


Re: kafka-consumer-groups.sh CURRENT-OFFSET column show "-" , what mean about it ! thank you !

2020-04-21 Thread Karolis Pocius
It means no consumer has consumed anything from that partition. Likely
because there's no data in that partition yet.

On Tue, Apr 21, 2020 at 8:12 PM 一直以来 <279377...@qq.com> wrote:

> ghy@ghy-VirtualBox:~/T/k/bin$ ./kafka-consumer-groups.sh
> --bootstrap-server localhost:9081 --describe --group test
>
>
> GROUP           TOPIC     
>      PARTITION  CURRENT-OFFSET 
> LOG-END-OFFSET  LAG           
>  CONSUMER-ID               
>                    
>       HOST            CLIENT-ID
> test            myTopic3     
>   4          -       
>        1           
>    -             
>  consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 
> /192.168.1.104 
> consumer-test-1
> test            myTopic3     
>   1          -       
>        1           
>    -             
>  consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 
> /192.168.1.104 
> consumer-test-1
> test            myTopic3     
>   2          -       
>        0           
>    -             
>  consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 
> /192.168.1.104 
> consumer-test-1
> test            myTopic3     
>   3          -       
>        1           
>    -             
>  consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 
> /192.168.1.104 
> consumer-test-1
> test            myTopic3     
>   0          -       
>        2           
>    -             
>  consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 
> /192.168.1.104 
> consumer-test-1


Re: Port already in use

2020-02-21 Thread Karolis Pocius
Choose a different port or check what's already listening on 9099 using
something like: `ss -tunapl | grep 9099`

On Fri, Feb 21, 2020 at 1:08 PM sunil chaudhari 
wrote:

> Hi,
> I just enlabled the Jmx port in kafka broker.
> Since then I am not able to run utilities under /bin
> Example when I run
>  ./kafka-topics.sh —create
> Then it throws bindException port already in use 9099
>
> Before it was running.
> Same thing happening for perf test utilities under /bin.
>
> Please help. How can i run those utilities with JMx port ON.
>
> Regards,
> Sunil.
>


Re: [ANNOUNCE] Apache Kafka 2.2.2

2019-12-02 Thread Karolis Pocius
2.2.2 is a bugfix release, it contains some of the fixes from 2.3.0/1, but
does not backport new features. You can go to
https://kafka.apache.org/downloads and check the release notes of 2.3.0/1
to see what new features and improvements you'd be missing if you didn't
upgrade directly to those versions. As to whether it will make any
difference if you upgrade to 2.2.2 now and another version later -- it
shouldn't really matter.

On Mon, Dec 2, 2019 at 2:15 PM Vinay Kumar  wrote:

> Hi,
> May I know how would this version be different from the later versions
> 2.3.0, 2.3.1.
> I'm looking to upgrade my current Kafka 2.1.0, so can you please let me
> know what differences it would make for upgrading to 2.2.2 or 2.3.1?
>
> Thanks,
> Vinay
>
> On Monday, December 2, 2019, Vahid Hashemian 
> wrote:
>
> > Awesome. Thanks for managing this release Randall!
> >
> > Regards,
> > --Vahid
> >
> > On Sun, Dec 1, 2019 at 5:45 PM Randall Hauch  wrote:
> >
> > > The Apache Kafka community is pleased to announce the release for
> Apache
> > > Kafka 2.2.2
> > >
> > > This is a bugfix release for Apache Kafka 2.2.
> > > All of the changes in this release can be found in the release notes:
> > > https://www.apache.org/dist/kafka/2.2.2/RELEASE_NOTES.html
> > >
> > > You can download the source and binary release from:
> > > https://kafka.apache.org/downloads#2.2.2
> > >
> > >
> > > 
> > ---
> > >
> > >
> > > Apache Kafka is a distributed streaming platform with four core APIs:
> > >
> > >
> > > ** The Producer API allows an application to publish a stream records
> to
> > > one or more Kafka topics.
> > >
> > > ** The Consumer API allows an application to subscribe to one or more
> > > topics and process the stream of records produced to them.
> > >
> > > ** The Streams API allows an application to act as a stream processor,
> > > consuming an input stream from one or more topics and producing an
> > > output stream to one or more output topics, effectively transforming
> the
> > > input streams to output streams.
> > >
> > > ** The Connector API allows building and running reusable producers or
> > > consumers that connect Kafka topics to existing applications or data
> > > systems. For example, a connector to a relational database might
> > > capture every change to a table.
> > >
> > >
> > > With these APIs, Kafka can be used for two broad classes of
> application:
> > >
> > > ** Building real-time streaming data pipelines that reliably get data
> > > between systems or applications.
> > >
> > > ** Building real-time streaming applications that transform or react
> > > to the streams of data.
> > >
> > >
> > > Apache Kafka is in use at large and small companies worldwide,
> including
> > > Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest,
> Rabobank,
> > > Target, The New York Times, Uber, Yelp, and Zalando, among others.
> > >
> > > A big thank you for the following 41 contributors to this release!
> > >
> > > A. Sophie Blee-Goldman, Matthias J. Sax, Bill Bejeck, Jason Gustafson,
> > > Chris Egerton, Boyang Chen, Alex Diachenko, cpettitt-confluent, Magesh
> > > Nandakumar, Randall Hauch, Ismael Juma, John Roesler, Konstantine
> > > Karantasis, Mickael Maison, Nacho Muñoz Gómez, Nigel Liang, Paul,
> Rajini
> > > Sivaram, Robert Yokota, Stanislav Kozlovski, Vahid Hashemian, Victoria
> > > Bialas, cadonna, cwildman, mjarvie, sdreynolds, slim, vinoth chandar,
> > > wenhoujx, Arjun Satish, Chia-Ping Tsai, Colin P. Mccabe, David Arthur,
> > > Dhruvil Shah, Greg Harris, Gunnar Morling, Hai-Dang Dam, Lifei Chen,
> > Lucas
> > > Bradstreet, Manikumar Reddy, Michał Borowiecki
> > >
> > > We welcome your help and feedback. For more information on how to
> > > report problems, and to get involved, visit the project website at
> > > https://kafka.apache.org/
> > >
> > > Thank you!
> > >
> > >
> > > Regards,
> > > Randall Hauch
> > >
> > > --
> > > You received this message because you are subscribed to the Google
> Groups
> > > "kafka-clients" group.
> > > To unsubscribe from this group and stop receiving emails from it, send
> an
> > > email to kafka-clients+unsubscr...@googlegroups.com.
> > > To view this discussion on the web visit
> > > https://groups.google.com/d/msgid/kafka-clients/
> > CALYgK0EsNFakX7F0FDkXvMNmUe8g8w-GNRM7EJjD9CJLK7sn0A%40mail.gmail.com
> > >  > CALYgK0EsNFakX7F0FDkXvMNmUe8g8w-GNRM7EJjD9CJLK7sn0A%40mail.
> > gmail.com?utm_medium=email&utm_source=footer>
> > > .
> > >
> >
> >
> > --
> >
> > Thanks!
> > --Vahid
> >
>


Re: Broker regularly facing memory issues

2019-09-27 Thread Karolis Pocius
How did you arrive at the 10 GB JVM heap value? I'm running Kafka on 16 GB
RAM instances with ~4000 partitions each and only assigning 5 GB to JVM of
which Kafka only seems to be using ~2 GB at any given time.

Also, I've set vm.max_map_count to 262144 -- didn't use any formula to
estimate that, must have been some answer I found online, but it's been
doing its trick -- no issues so far.

On Fri, Sep 27, 2019 at 11:29 AM Arpit Gogia  wrote:

> Hello Kafka user group
>
>
> I am running a Kafka cluster with 3 brokers and have been experiencing
> frequent OutOfMemory errors each time with similar error stack trace
>
>
> java.io.IOException: Map failed
>
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:938)
>
> at
> kafka.log.AbstractIndex$$anonfun$resize$1.apply$mcZ$sp(AbstractIndex.scala:188)
>
> at
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:173)
>
> at
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:173)
>
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>
> at kafka.log.AbstractIndex.resize(AbstractIndex.scala:173)
>
> at
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcZ$sp(AbstractIndex.scala:242)
>
> at
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:242)
>
> at
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:242)
>
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>
> at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241)
>
> at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:501)
>
> at
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$32.apply(Log.scala:1635)
>
> at
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$32.apply(Log.scala:1635)
>
> at scala.Option.foreach(Option.scala:257)
>
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1635)
>
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1599)
>
> at kafka.log.Log.maybeHandleIOException(Log.scala:1996)
>
> at kafka.log.Log.roll(Log.scala:1599)
>
> at kafka.log.Log$$anonfun$deleteSegments$1.apply$mcI$sp(Log.scala:1434)
>
> at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1429)
>
> at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1429)
>
> at kafka.log.Log.maybeHandleIOException(Log.scala:1996)
>
> at kafka.log.Log.deleteSegments(Log.scala:1429)
>
> at kafka.log.Log.deleteOldSegments(Log.scala:1424)
>
> at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1501)
>
> at kafka.log.Log.deleteOldSegments(Log.scala:1492)
>
> at
> kafka.log.LogCleaner$CleanerThread$$anonfun$cleanFilthiestLog$1.apply(LogCleaner.scala:328)
>
> at
> kafka.log.LogCleaner$CleanerThread$$anonfun$cleanFilthiestLog$1.apply(LogCleaner.scala:324)
>
> at scala.collection.immutable.List.foreach(List.scala:392)
>
> at
> kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:324)
>
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:300)
>
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>
> Caused by: java.lang.OutOfMemoryError: Map failed
>
> at sun.nio.ch.FileChannelImpl.map0(Native Method)
>
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:935)
>
> ... 32 more
>
>
> Each broker possesses 16 GB of memory out of which 10 GB is allotted to
> the JVM as heap. Total partition count on each broker is approximately 2000
> with an average partition size of 300 MB.
>
>
> After looking around, I found out that increasing the OS level memory map
> area limit `vm.max_map_count` is a viable solution, since Kafka memory
> map’s segment files while rolling over and the above stack trace indicates
> a failure in doing that. Since then I have increased this every time a
> broker goes down with this error. Currently I am at 250,000 on two brokers
> and 200,000 on one, which is very high considering the estimation formula
> mentioned at https://kafka.apache.org/documentation/#os. Most recently I
> started to monitor the memory map file count (using /proc//maps) of
> the Kafka process on each broker, below is the graph.
>
>
> [image: Screenshot 2019-09-27 at 12.02.38 PM.png]
>
>
> My concern is that this value is on an overall increasing trend, with an
> average increase of 27.7K across brokers in the roughly 2 days of
> monitoring.
>
>
> Following are my questions:
>
>1. Will I have to keep incrementing `vm.max_map_count` till I arrive
>at a stable value?
>2. Could this by any chance indicate a memory leak? Maybe in the
>subroutine that rolls over segment files?
>3. Could the lack of page cache memory be a cause as well? Volume of
>cached memory seems to remain consistent across time so it doesn’t appear
>to be a suspect by I am not ruling it out for now. As a mitigation I will
>be decreasing the JVM heap next time so that more memory is available for
>page cache.
>
> --
>
> *A

Re: How Kafka Manages Network Partition

2019-09-23 Thread Karolis Pocius
AFAIK there won't be two leaders. Once brokers lose connection with
ZooKeeper, a new leader will be elected (whichever can still access
ZooKeeper) and the remaining brokers will fall behind in replication.

Now depending on your config, there might be several other issues: not
enough replicas to satisfy ISR requirement, so no new writes; if the broker
that becomes a new leader was already behind in replication and unclean
leader election is enabled, you might have some data loss, etc.

On Mon, Sep 23, 2019 at 2:52 PM Isuru Boyagane <
isuruboyagane...@cse.mrt.ac.lk> wrote:

> Can anyone clarify how Kafka manages below network partition?
>
> Say we have this configuration before the network partition,
>
>- Kafka cluster has three brokers (say broker_0, broker_1, broker2).
>- broker_1 is the leader.
>- ISR has been reduced to broker_1 and broker_2.
>- There is a Zookeeper cluster.
>
> Now if a network partition happens such that,
>
>1. broker_1(leader), broker_2 in a separate partition
>2. broker_0 and Zookeeper in another partition
>
> (An image clarifying the network partition scenario is attached herewith.)
>
> Will there be two leaders?
> If so, How they continue when the partition is resolved?
>
> Thanks and Regards.
>
>


Re: Question on Kafka Client Upgrade

2019-09-20 Thread Karolis Pocius
I'm pretty sure this won't work. Retrieving protocol version was added in
0.10, allowing clients to be "bidirectionally" compatible. See
https://www.confluent.io/blog/upgrading-apache-kafka-clients-just-got-easier/

In your case you probably want to upgrade brokers to at least 0.10.* and
then you can upgrade clients.

On Fri, Sep 20, 2019 at 1:28 AM Yueming Duan 
wrote:

> Hello, Kafka Users,
>
> Our brokers and clients are in 0.9.0.1, would like to upgrade our producer
> and consumer clients to 2.2.1 first.
>
> Does it matter if we upgrade the client jar from 0.9.0.1 to 2.2.1 - but the
> brokers are using 0.9.0.1?
> https://kafka.apache.org/23/documentation/streams/upgrade-guide
> And any ordering requirements, e.g, consumer clients first?
>
> We only use plaintext producer/consumer clients. For us, would it be a
> problem with new version of client and have extra property of
> *upgrade.from="0.9.0.1"*?
>
> Thanks
> Yueming
>


Re: Question on Kafka Client Upgrade

2019-09-20 Thread Karolis Pocius
That should not be a problem as java clients support bidirectional
compatibility. See
https://www.confluent.io/blog/upgrading-apache-kafka-clients-just-got-easier/

The “bidirectional” client compatibility work done in KIP-35
> 
> and KIP-97
> 
> removed these limitations. New Java clients can now communicate with old
> brokers. The Confluent source-available clients for C, C++, and other
> languages no longer need to be reconfigured when the broker version
> changes. You can now upgrade the client whenever you want to get access to
> new features or bug fixes, without worrying about the broker version.
>

On Thu, Sep 19, 2019 at 3:58 PM M. Manna  wrote:

> Hello,
>
> Does it matter if we upgrade the client jar from 2.1.0 to 2.3.0 - but the
> brokers are using 2.2.0?
>
> https://kafka.apache.org/23/documentation/streams/upgrade-guide
>
> We don't use streams, just plain producer/consumer clients. For us, would
> it matter at all?
>
> Thanks,
>


Re: Kafka BootStrap : Error while deleting the clean shutdown file in dir /tmp/data (kafka.server.LogDirFailureChannel) : Caused by: OOM: Map failed

2019-09-04 Thread Karolis Pocius
I had the same issue which was solved by increasing max_map_count
https://stackoverflow.com/a/43675621


On Wed, Sep 4, 2019 at 2:59 PM SenthilKumar K 
wrote:

> Hello Experts , We have deployed 10 node kafka cluster in production.
> Recently two of the nodes went down due to network problem and we brought
> it up after 24 hours. At the time of bootstrapping the  kafka service on
> the failed nodes , we have seen the below error & broker failed to come up.
>
> Kafka Version : kafka_2.11-2.2.0
>
> JVM Options :
> /a/java64/jdk1.8.0/bin/java -Xmx15G -Xms10G -server -XX:+UseG1GC
> -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35
> -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
> -Xloggc:/a/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M
> -Davoid_insecure_jmxremote
>
>
> [2019-09-03 10:54:10,630] ERROR Error while deleting the clean shutdown
> file in dir /tmp/data (kafka.server.LogDirFailureChannel)
> java.io.IOException: Map failed
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
> at kafka.log.AbstractIndex.(AbstractIndex.scala:126)
> at kafka.log.OffsetIndex.(OffsetIndex.scala:53)
> at kafka.log.LogSegment$.open(LogSegment.scala:632)
> at
>
> kafka.log.Log$$anonfun$kafka$log$Log$$loadSegmentFiles$3.apply(Log.scala:467)
> at
>
> kafka.log.Log$$anonfun$kafka$log$Log$$loadSegmentFiles$3.apply(Log.scala:454)
> at
>
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at
>
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
> at kafka.log.Log.kafka$log$Log$$loadSegmentFiles(Log.scala:454)
> at kafka.log.Log$$anonfun$loadSegments$1.apply$mcV$sp(Log.scala:565)
> at kafka.log.Log$$anonfun$loadSegments$1.apply(Log.scala:559)
> at kafka.log.Log$$anonfun$loadSegments$1.apply(Log.scala:559)
> at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2024)
> at kafka.log.Log.loadSegments(Log.scala:559)
> at kafka.log.Log.(Log.scala:292)
> at kafka.log.Log$.apply(Log.scala:2157)
> at
> kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:265)
> at
>
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$11$$anonfun$apply$15$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:345)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError: Map failed
> at sun.nio.ch.FileChannelImpl.map0(Native Method)
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:937)
> ... 25 more
>
> Any hint to solve this problem ? Thanks in advance!
>
> --Senthil
>


Replica movement between log directories

2019-07-02 Thread Karolis Pocius
Not having much luck with replica movement between directories, so I'd
appreciate if someone validated the steps that I'm taking:

1. Create topics to move json file (with a single topic)
2. Generate a candidate partition reassignment
3. Take the above and replace all instances of "any" with
"/path-to-log-dir" (I want certain partitions moved to a specific log dir
that is the same on each of the five brokers in the cluster)
4. Create reassignment json with the data from step #3
5. Execute reassignment with an increased timeout, just to be safe

What happens next is that some partitions reassign just fine, while others
throw a warning and get stuck forever. Here's the full log for one of the
attempted reassignments:

[2019-07-02 13:58:40,330] INFO [Log partition=topic.0-7, dir=/kafka-data-2]
Loading producer state till offset 0 with message format version 2
(kafka.log.Log)
[2019-07-02 13:58:40,330] INFO [Log partition=topic.0-7, dir=/kafka-data-2]
Completed load of log with 1 segments, log start offset 0 and log end
offset 0 in 1 ms (kafka.log.Log)
[2019-07-02 13:58:40,330] INFO Created log for partition topic.0-7 in
/kafka-data-2 with properties {compression.type -> producer,
message.downconversion.enable -> true, min.insync.replicas -> 2,
segment.jitter.ms -> 0, cleanup.policy -> delete, flush.ms ->
9223372036854775807, segment.bytes -> 1073741824, retention.ms ->
60480, flush.messages -> 9223372036854775807, message.format.version ->
2.2-IV1, file.delete.delay.ms -> 6, max.compaction.lag.ms ->
9223372036854775807, max.message.bytes -> 52428800, min.compaction.lag.ms
-> 0, message.timestamp.type -> LogAppendTime, preallocate -> false,
min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096,
unclean.leader.election.enable -> false, retention.bytes -> -1,
delete.retention.ms -> 8640, segment.ms -> 60480,
message.timestamp.difference.max.ms -> 9223372036854775807,
segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2019-07-02 13:58:40,331] INFO [Partition topic.0-7 broker=2] No
checkpointed highwatermark is found for partition topic.0-7
(kafka.cluster.Partition)
[2019-07-02 13:58:40,331] INFO Replica loaded for partition topic.0-7 with
initial high watermark 0 (kafka.cluster.Replica)
[2019-07-02 13:58:40,331] INFO [ReplicaAlterLogDirsManager on broker 2]
Added fetcher to broker BrokerEndPoint(id=2, host=localhost:-1) for
partitions Map(topic.0-7 -> (offset=0, leaderEpoch=84))
(kafka.server.ReplicaAlterLogDirsManager)
[2019-07-02 13:58:40,389] INFO [ReplicaAlterLogDirsThread-0]: Truncating
partition topic.0-7 to local high watermark 0
(kafka.server.ReplicaAlterLogDirsThread)
[2019-07-02 13:58:40,389] INFO [Log partition=topic.0-7, dir=/kafka-data-2]
Truncating to 0 has no effect as the largest offset in the log is -1
(kafka.log.Log)
[2019-07-02 13:58:41,043] INFO [ReplicaFetcherManager on broker 2] Removed
fetcher for partitions Set(topic.0-7) (kafka.server.ReplicaFetcherManager)
[2019-07-02 13:58:41,043] INFO [ReplicaFetcherManager on broker 2] Added
fetcher to broker BrokerEndPoint(id=0,
host=.ec2.internal:9092) for partitions Map(topic.0-7 ->
(offset=59338, leaderEpoch=85)) (kafka.server.ReplicaFetcherManager)
[2019-07-02 13:58:41,203] INFO [Log partition=topic.0-7, dir=/kafka-data-1]
Truncating to 59338 has no effect as the largest offset in the log is 59337
(kafka.log.Log)
[2019-07-02 13:58:41,227] INFO [ReplicaAlterLogDirsThread-0]: Truncating
partition topic.0-7 to local high watermark 0
(kafka.server.ReplicaAlterLogDirsThread)
[2019-07-02 13:58:41,227] INFO [Log partition=topic.0-7, dir=/kafka-data-2]
Truncating to 0 has no effect as the largest offset in the log is -1
(kafka.log.Log)
[2019-07-02 13:58:41,229] INFO [ReplicaAlterLogDirsThread-0]:
Beginning/resuming copy of partition topic.0-7 from offset 0. Including
this partition, there are 5 remaining partitions to copy by this thread.
(kafka.server.ReplicaAlterLogDirsThread)
[2019-07-02 13:58:41,229] INFO [ReplicaAlterLogDirsThread-0]: Partition
topic.0-7 has an older epoch (84) than the current leader. Will await the
new LeaderAndIsr state before resuming fetching.
(kafka.server.ReplicaAlterLogDirsThread)
[2019-07-02 13:58:41,229] WARN [ReplicaAlterLogDirsThread-0]: Partition
topic.0-7 marked as failed (kafka.server.ReplicaAlterLogDirsThread)
[2019-07-02 13:58:41,667] INFO [ReplicaAlterLogDirsThread-0]: Shutting down
(kafka.server.ReplicaAlterLogDirsThread)
[2019-07-02 13:58:41,667] INFO [ReplicaAlterLogDirsThread-0]: Shutdown
completed (kafka.server.ReplicaAlterLogDirsThread)
[2019-07-02 13:58:41,667] INFO [ReplicaAlterLogDirsThread-0]: Stopped
(kafka.server.ReplicaAlterLogDirsThread)

I have upgraded from 2.2.1 to 2.3.0 (haven't changed inter.broker.protocol
yet) hoping that KAFKA-8346 would somehow improve the situation, but it
seems that it just keeps the thread from dying.

Any pointers to what might be going wrong here would be appreciated.


Re: Reducing replication factor of Kafka Topic

2017-02-09 Thread Karolis Pocius

If you don't want to do it manually, you can try Kafka Assigner 
https://github.com/linkedin/kafka-tools/wiki/Kafka-Assigner

Specifically "Set Replication Factor" module 
https://github.com/linkedin/kafka-tools/wiki/module-set-replication-factor

On 2017.02.09 13:39, Manikumar wrote:

We have to manually create the replica assignment json file and invoke
replica assignment using
kafka-reassign-partitions.sh tool. There will be leader change action,
which should not create any data loss.











   Best
 Regards,
   [cid:adform-logo-signature_b49a8980-6dcb-4066-8392-780ab0a91ccf.png]
   <http://site.adform.com/>










       Karolis Pocius



   IT System Engineer




   Email:
   karolis.poc...@adform.com<mailto:karolis.poc...@adform.com>



   Mobile:
   +370 620 22108



   Sporto g. 18,
   LT-09238 Vilnius, Lithuania



   Adform Insider News<http://blog.adform.com/>












[cid:iab_82fe09f0-2828-4e7b-909e-b5e19c3cd91e.png]










   Disclaimer:
 The information contained in this message and attachments is intended
 solely for the attention and use of the named addressee and may be
 confidential. If you are not the intended recipient, you are reminded that
 the information remains the property of the sender. You must not use,
 disclose, distribute, copy, print or rely on this e-mail. If you have
 received this message in error, please contact the sender immediately and
 irrevocably delete this message and any copies.

On Thu, Feb 9, 2017 at 4:05 PM, Shaik M 
<mailto:munna.had...@gmail.com> wrote:



Hi,

Can we reduce the Kafka Topic Replication without any interruption of
current data flow?

Thanks,
Shaik M








Re: Kafka MirrorMaker Queries

2016-12-15 Thread Karolis Pocius
Hi,

Here are some of my experiences with MirrorMaker, but I'm also eager to read 
what others do:

1. Main issue for me is rebalancing. If you have several instances of MM under 
the same group, when one of them dies, loses network connectivity, or you just 
need to add new partitions to whitelist/blacklist - the whole thing starts 
rebalancing, which can take quite a bit of time (depending on the number of MM 
instances in a group) during which time the messages are either not being 
mirrored or are sent sporadically. Initially I had one MM group to mirror from 
3 remote clusters into main cluster, which proved to be a big mistake - when 
one cluster would be unreachable, others would suffer. Now I run separate 
groups for each cluster and thinking of drilling it down even further and 
splitting it by topic.

2. Generally MM is pretty straight forward. At least the way I do it, is set 
num.streams to the number of partitions in the remote cluster and num.producers 
to the number of partitions in the destination cluster. The only thing to be 
cautious about is num.consumer.fetchers parameter, because it doesn't do what 
documentation says. See https://issues.apache.org/jira/browse/KAFKA-2008

3. As for monitoring, I track the number of incoming messages into source 
cluster and the same for destination cluster. Then graph them on the same panel 
- as long as the two are about the same, it's a good indicator mirroring is 
working fine.

Karolis



Best Regards,   
[cid:adform-logo-signature_b49a8980-6dcb-4066-8392-780ab0a91ccf.png]  
<http://site.adform.com/>
Karolis Pocius
IT System Engineer
Email: karolis.poc...@adform.com<mailto:karolis.poc...@adform.com>
Mobile: +370 620 22108
Sporto g. 18, LT-09238 Vilnius, Lithuania
Adform Insider News<http://blog.adform.com/>


[cid:iab_82fe09f0-2828-4e7b-909e-b5e19c3cd91e.png]



Disclaimer: The information contained in this message and attachments is 
intended solely for the attention and use of the named addressee and may be 
confidential. If you are not the intended recipient, you are reminded that the 
information remains the property of the sender. You must not use, disclose, 
distribute, copy, print or rely on this e-mail. If you have received this 
message in error, please contact the sender immediately and irrevocably delete 
this message and any copies.
On Thu, 2016-12-15 at 14:28 +, Greenhorn Techie wrote:

Hi,

Good Afternoon.

We are implementing Kafka MirrorMaker to replicate data from Production Kafka 
cluster to DR Kafka cluster. I'm trying to understand answers to the following 
queries:

what are the known bottlenecks / issues one needs to be aware from a 
MirrorMaker perspective.
Also are there any documented or accepted best practices while using 
MirrorMaker?
How to monitor MirrorMaker replication jobs to identify any issues during the 
job execution there-by alerting the DevOps/Support team?

Would be grateful to hear opinions from experts out there.


Thanks


Re: log.dirs balance?

2016-11-29 Thread Karolis Pocius
It's difficult enough to balance kafka brokers with a single log 
directory, not to mention attempting to juggle multiple ones. While JBOD 
is great in terms of capacity, it's a pain in terms of management. After 
6 months of constant manual reassignments I ended up going with RAID1+0 
which is what LinkedIn uses as well as Confluent recommends.


Hats off to you if you manage to find a solution to this, just wanted to 
share my painful experience.



On 2016.11.29 21:35, Tim Visher wrote:

Hello,

My kafka deploy has 5 servers with 3 log disks each. Over the weekend I
noticed that on 2 of the 5 servers the partitions appear to be imbalanced
amongst the log.dirs.

```
kafka3
/var/lib/kafka/disk1
3
/var/lib/kafka/disk2
3
/var/lib/kafka/disk3
3
kafka5
/var/lib/kafka/disk1
3
/var/lib/kafka/disk2
4
/var/lib/kafka/disk3
2
kafka1
/var/lib/kafka/disk1
3
/var/lib/kafka/disk2
3
/var/lib/kafka/disk3
3
kafka4
/var/lib/kafka/disk1
4
/var/lib/kafka/disk2
2
/var/lib/kafka/disk3
3
kafka2
/var/lib/kafka/disk1
3
/var/lib/kafka/disk2
3
/var/lib/kafka/disk3
3
```

You can see that 5 and 4 are both unbalanced.

Is there a reason for that? The partitions themselves are pretty much
perfectly balanced, but the directory chosen for them is not.

Is this an anti-pattern to be using multiple log.dirs per server?

Thanks in advance!

--

In Christ,

Timmy V.

http://blog.twonegatives.com/
http://five.sentenc.es/ -- Spend less time on mail




Best Regards

Karolis Pocius
IT System Engineer

Email: karolis.poc...@adform.com
Mobile: +370 620 22108
Sporto g. 18, LT-09238 Vilnius, Lithuania

Disclaimer: 
The information contained in this message and attachments is intended 
solely for the attention and use of the named addressee and may be 
confidential. If you are not the intended recipient, you are reminded that 
the information remains the property of the sender. You must not use, 
disclose, distribute, copy, print or rely on this e-mail. If you have 
received this message in error, please contact the sender immediately and 
irrevocably delete this message and any copies.

Re: Create topic with multi-zookeeper URLs

2016-11-14 Thread Karolis Pocius

That is correct.


On 2016.11.15 08:54, ZHU Hua B wrote:

Hi,


Thanks for your explanation!

Do you mean I just need to write a number "1" in the file myid on 192.168.210.5, not 
"server.1" or other content? Thanks!






Best Regards

Johnny


-Original Message-
From: Karolis Pocius [mailto:k.poc...@adform.com]
Sent: 2016年11月15日 14:47
To: users@kafka.apache.org
Subject: Re: Create topic with multi-zookeeper URLs

You need to create myid file in the datadir (in your case
/tmp/zookeeper/myid) of each instance with a numeric ID inside that file 
corresponding to what you have in this section


server.1=192.168.210.5:2888:3888
server.2=192.168.210.6:2888:3888
server.3=192.168.210.8:2888:3888

So on 192.168.210.5 myid file would say 1, on 192.168.210.6 it would say 2, and 
so on.


On 2016.11.15 08:04, ZHU Hua B wrote:

Hi,


Many thanks for your info!

As you said, I think my configuration might be running two separate instances of zookeeper rather 
than a cluster, so I modify the configuration file as below and start up three zookeeper instances 
in separate machines. While the zookeeper launched, some error occurred that 
"/tmp/zookeeper/myid file is missing ". From zookeeper document, it said " When the 
server starts up, it knows which server it is by looking for the file myid in the data directory. 
That file has the contains the server number, in ASCII." If the file myid will be created 
automatically or manually by myself? What's format for the file myid? Thanks!

Previous configuration:
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0


Current configuration:
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
# For replicated mode
tickTime=2000
initLimit=5
syncLimit=2
server.1=192.168.210.5:2888:3888
server.2=192.168.210.6:2888:3888
server.3=192.168.210.8:2888:3888


# /root/kafka_2.8.0-0.8.0/bin/zookeeper-server-start.sh 
config/zookeeper.properties
[2016-11-15 03:11:54,143] INFO Reading configuration from: 
config/zookeeper.properties 
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-11-15 03:11:54,145] INFO Defaulting to majority quorums 
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-11-15 03:11:54,146] FATAL Invalid config, exiting abnormally 
(org.apache.zookeeper.server.quorum.QuorumPeerMain)
org.apache.zookeeper.server.quorum.QuorumPeerConfig$ConfigException: Error 
processing config/zookeeper.properties
  at 
org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:110)
  at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:99)
  at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
Caused by: java.lang.IllegalArgumentException: /tmp/zookeeper/myid file is 
missing
  at 
org.apache.zookeeper.server.quorum.QuorumPeerConfig.parseProperties(QuorumPeerConfig.java:320)
  at 
org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:106)
  ... 2 more
Invalid config, exiting abnormally





Best Regards

Johnny


-----Original Message-
From: Karolis Pocius [mailto:k.poc...@adform.com]
Sent: 2016年11月14日 17:25
To: users@kafka.apache.org
Subject: Re: Create topic with multi-zookeeper URLs

Can you explain how you launched those two zookeeper instances and maybe share 
their config? You need to make some edits to config in order to run a zookeeper 
cluster - I have a feeling you might be running two separate instances of 
zookeeper rather than a cluster. Also, if you want a cluster you should run 
zookeeper in odd numbers. From zookeeper documentation 
https://zookeeper.apache.org/doc/trunk/zookeeperStarted.html


For replicated mode, a minimum of three servers are required, and it
is strongly recommended that you have an odd number of servers. If you
only have two servers, then you are in a situation where if one of
them fails, there are not enough machines to form a majority quorum.
Two servers is inherently *less* stable than a single server, because
there are two single points of failure.

On 2016.11.14 11:05, ZHU Hua B wrote:

Hi All,


I want to create a topic with command "bin/kafka-topics.sh --create --zookeeper HOST:PORT 
--replication-factor 1 --partitions 1 --topic test", if the option "--zookeeper" could point 
to multi-zookeeper URLs such as "HOST1:PORT1,HOST2:PORT2"?
I tried it as below seems the topic only be listed under the first zookeeper URL. But the 
description of this option said "Multiple URLS can be given to allow 
fail-over." If my usage is wrong? Thanks!


# bin/kafka-topics.sh --create --zookeeper HOST1:PORT1,HOST2:PORT2 --
replication-factor 1 --partitions 1 --topic test creation succeeded!

# bin/kafka-list-topic.sh --zookeeper HOST1:PORT1
topic: test partition: 0leader: 0   replicas: 0 isr: 0

# bin/kafka-list-topic.sh --zookeeper HOST2:PORT2 no topics exist!

# bin/k

Re: Create topic with multi-zookeeper URLs

2016-11-14 Thread Karolis Pocius
You need to create myid file in the datadir (in your case 
/tmp/zookeeper/myid) of each instance with a numeric ID inside that file 
corresponding to what you have in this section



server.1=192.168.210.5:2888:3888
server.2=192.168.210.6:2888:3888
server.3=192.168.210.8:2888:3888


So on 192.168.210.5 myid file would say 1, on 192.168.210.6 it would say 
2, and so on.



On 2016.11.15 08:04, ZHU Hua B wrote:

Hi,


Many thanks for your info!

As you said, I think my configuration might be running two separate instances of zookeeper rather 
than a cluster, so I modify the configuration file as below and start up three zookeeper instances 
in separate machines. While the zookeeper launched, some error occurred that 
"/tmp/zookeeper/myid file is missing ". From zookeeper document, it said " When the 
server starts up, it knows which server it is by looking for the file myid in the data directory. 
That file has the contains the server number, in ASCII." If the file myid will be created 
automatically or manually by myself? What's format for the file myid? Thanks!

Previous configuration:
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0


Current configuration:
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
# For replicated mode
tickTime=2000
initLimit=5
syncLimit=2
server.1=192.168.210.5:2888:3888
server.2=192.168.210.6:2888:3888
server.3=192.168.210.8:2888:3888


# /root/kafka_2.8.0-0.8.0/bin/zookeeper-server-start.sh 
config/zookeeper.properties
[2016-11-15 03:11:54,143] INFO Reading configuration from: 
config/zookeeper.properties 
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-11-15 03:11:54,145] INFO Defaulting to majority quorums 
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-11-15 03:11:54,146] FATAL Invalid config, exiting abnormally 
(org.apache.zookeeper.server.quorum.QuorumPeerMain)
org.apache.zookeeper.server.quorum.QuorumPeerConfig$ConfigException: Error 
processing config/zookeeper.properties
 at 
org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:110)
 at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:99)
 at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
Caused by: java.lang.IllegalArgumentException: /tmp/zookeeper/myid file is 
missing
 at 
org.apache.zookeeper.server.quorum.QuorumPeerConfig.parseProperties(QuorumPeerConfig.java:320)
 at 
org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:106)
 ... 2 more
Invalid config, exiting abnormally





Best Regards

Johnny


-Original Message-
From: Karolis Pocius [mailto:k.poc...@adform.com]
Sent: 2016年11月14日 17:25
To: users@kafka.apache.org
Subject: Re: Create topic with multi-zookeeper URLs

Can you explain how you launched those two zookeeper instances and maybe share 
their config? You need to make some edits to config in order to run a zookeeper 
cluster - I have a feeling you might be running two separate instances of 
zookeeper rather than a cluster. Also, if you want a cluster you should run 
zookeeper in odd numbers. From zookeeper documentation 
https://zookeeper.apache.org/doc/trunk/zookeeperStarted.html


For replicated mode, a minimum of three servers are required, and it
is strongly recommended that you have an odd number of servers. If you
only have two servers, then you are in a situation where if one of
them fails, there are not enough machines to form a majority quorum.
Two servers is inherently *less* stable than a single server, because
there are two single points of failure.

On 2016.11.14 11:05, ZHU Hua B wrote:

Hi All,


I want to create a topic with command "bin/kafka-topics.sh --create --zookeeper HOST:PORT 
--replication-factor 1 --partitions 1 --topic test", if the option "--zookeeper" could point 
to multi-zookeeper URLs such as "HOST1:PORT1,HOST2:PORT2"?
I tried it as below seems the topic only be listed under the first zookeeper URL. But the 
description of this option said "Multiple URLS can be given to allow 
fail-over." If my usage is wrong? Thanks!


# bin/kafka-topics.sh --create --zookeeper HOST1:PORT1,HOST2:PORT2 --
replication-factor 1 --partitions 1 --topic test creation succeeded!

# bin/kafka-list-topic.sh --zookeeper HOST1:PORT1
topic: test partition: 0leader: 0   replicas: 0 isr: 0

# bin/kafka-list-topic.sh --zookeeper HOST2:PORT2 no topics exist!

# bin/kafka-topics.sh
--zookeeperREQUIRED: The connection string for
 the zookeeper connection in the 
form
 host:port. Multiple URLS can be
 given to allow fail-over.


Best Regards

Johnny






Re: Create topic with multi-zookeeper URLs

2016-11-14 Thread Karolis Pocius
Can you explain how you launched those two zookeeper instances and maybe 
share their config? You need to make some edits to config in order to 
run a zookeeper cluster - I have a feeling you might be running two 
separate instances of zookeeper rather than a cluster. Also, if you want 
a cluster you should run zookeeper in odd numbers. From zookeeper 
documentation https://zookeeper.apache.org/doc/trunk/zookeeperStarted.html


For replicated mode, a minimum of three servers are required, and it 
is strongly recommended that you have an odd number of servers. If you 
only have two servers, then you are in a situation where if one of 
them fails, there are not enough machines to form a majority quorum. 
Two servers is inherently *less* stable than a single server, because 
there are two single points of failure. 


On 2016.11.14 11:05, ZHU Hua B wrote:

Hi All,


I want to create a topic with command "bin/kafka-topics.sh --create --zookeeper HOST:PORT 
--replication-factor 1 --partitions 1 --topic test", if the option "--zookeeper" could point 
to multi-zookeeper URLs such as "HOST1:PORT1,HOST2:PORT2"?
I tried it as below seems the topic only be listed under the first zookeeper URL. But the 
description of this option said "Multiple URLS can be given to allow 
fail-over." If my usage is wrong? Thanks!


# bin/kafka-topics.sh --create --zookeeper HOST1:PORT1,HOST2:PORT2 -- 
replication-factor 1 --partitions 1 --topic test
creation succeeded!

# bin/kafka-list-topic.sh --zookeeper HOST1:PORT1
topic: test partition: 0leader: 0   replicas: 0 isr: 0

# bin/kafka-list-topic.sh --zookeeper HOST2:PORT2
no topics exist!

# bin/kafka-topics.sh
--zookeeperREQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.


Best Regards

Johnny






Re: Kafka performance on an ordinary machine

2016-11-09 Thread Karolis Pocius
There is no 'best' configuration, it all depends on your use cases, 
expected load, etc.


Documentation is a good place to start 
https://kafka.apache.org/documentation.html#hwandos


There are a few good benchmark articles. They're a bit dated by now, but 
still hold true in most cases:
* 
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

* https://grey-boundary.io/load-testing-apache-kafka-on-aws/

As to what procedures I'd propose is spinning up a cluster and running 
some tests on it. Sangrenel (used in the second article above) is a 
great tool https://github.com/jamiealquiza/sangrenel for that.



On 2016.11.09 09:58, Majid Golshadi wrote:

Hello
We want to use Kafka in our production environment but we don't have any
information about what's the best server configuration and the best
benchmark can be achieve in our production environment (Based on our
hardware and VMs)
I'm really rookie in this area.
For getting to the best configuration for our production environment what
procedures do you propose?

suppose I want to run kafka on 3 machines with the following configuration:
cpu: 2 core
Ram: 4 gig
HDD. 7500 rpm





Re: Understanding zookeper and kafka server failures

2016-11-08 Thread Karolis Pocius
It depends on the size and load of your cluster. Zookeeper is very I/O 
sensitive, so at least you have to make sure it doesn't share disk with 
the OS or Kafka.


I assume you've read the documentation, but you might want to have a 
look at https://kafka.apache.org/documentation.html#zkops again, it 
provides reasoning behind why you shouldn't run zookeeper and kafka 
together.



On 2016.11.08 17:47, Sachin Mittal wrote:

Hi,
Thanks for the reply. From one obvious reason that is server crashes then
both zookeeper and broker crashes, is there any other reason why we should
not run broker and zookeeper on same server.

If chances of server crash are extremely low can can be brought back up
quickly, then can we keep both on same server.

Thanks
Sachin



On Tue, Nov 8, 2016 at 8:59 PM, Karolis Pocius  wrote:


The question is what happens if one of the zookeeper crashes. Will the

broker on that node also will crash?


If 1/3 zookeeper nodes crashes, the other two will take over. Kafka broker
will not crash. However, you should not run zookeeper and kafka on the same
server in production.


What happens if broker crashes? I suppose other two brokers will take the
load.


Yes, the other two will take the load, but it also depends on the number
of partitions and how they are distributed across cluster.

Now what happens if 2 zookeeper nodes crashes.

Or if 2 brokers crashes. Will my cluster be still working in this case.


If 2 zookeeper nodes crash, there's no longer a majority and your cluster
will be down. If 2 kafka brokers crash and you have replication factor 3
you should be OK, but as in previous answer - it depends on the number of
partitions and how they're spread in the cluster.

So basically what is the difference between zookeeper failure and

server/broker failure.


Again, you shouldn't run zookeeper and kafka on the same server in
production. So the difference is that while kafka is responsible for data,
zookeeper is coordinating tasks between kafka nodes.



On 2016.11.08 17:12, Sachin Mittal wrote:


Hi,
We have following setup.
1. Three instances of zookeeper on three machines.
2. Three instances of kafka server on same three machines.
3. All the topics have replication factor 3.

So when we create a topic on any node, i see that it gets replicated on
all
three instances.
I also see that topic data is getting replicated to all three nodes.

The data to main topic is written by three producers to which all three zk
nodes config is provided in connect string.

This is all working fine.

The question is what happens if one of the zookeeper crashes. Will the
broker on that node also will crash?

What happens if broker crashes? I suppose other two brokers will take the
load.

Now what happens if 2 zookeeper nodes crashes.
Or if 2 brokers crashes. Will my cluster be still working in this case.

So basically what is the difference between zookeeper failure and
server/broker failure.

Thanks
Sachin






Re: Understanding zookeper and kafka server failures

2016-11-08 Thread Karolis Pocius

The question is what happens if one of the zookeeper crashes. Will the
broker on that node also will crash?
If 1/3 zookeeper nodes crashes, the other two will take over. Kafka 
broker will not crash. However, you should not run zookeeper and kafka 
on the same server in production.

What happens if broker crashes? I suppose other two brokers will take the
load.
Yes, the other two will take the load, but it also depends on the number 
of partitions and how they are distributed across cluster.



Now what happens if 2 zookeeper nodes crashes.
Or if 2 brokers crashes. Will my cluster be still working in this case.
If 2 zookeeper nodes crash, there's no longer a majority and your 
cluster will be down. If 2 kafka brokers crash and you have replication 
factor 3 you should be OK, but as in previous answer - it depends on the 
number of partitions and how they're spread in the cluster.



So basically what is the difference between zookeeper failure and
server/broker failure.
Again, you shouldn't run zookeeper and kafka on the same server in 
production. So the difference is that while kafka is responsible for 
data, zookeeper is coordinating tasks between kafka nodes.



On 2016.11.08 17:12, Sachin Mittal wrote:

Hi,
We have following setup.
1. Three instances of zookeeper on three machines.
2. Three instances of kafka server on same three machines.
3. All the topics have replication factor 3.

So when we create a topic on any node, i see that it gets replicated on all
three instances.
I also see that topic data is getting replicated to all three nodes.

The data to main topic is written by three producers to which all three zk
nodes config is provided in connect string.

This is all working fine.

The question is what happens if one of the zookeeper crashes. Will the
broker on that node also will crash?

What happens if broker crashes? I suppose other two brokers will take the
load.

Now what happens if 2 zookeeper nodes crashes.
Or if 2 brokers crashes. Will my cluster be still working in this case.

So basically what is the difference between zookeeper failure and
server/broker failure.

Thanks
Sachin





Broker outage sent controller into infinite loop

2016-11-07 Thread Karolis Pocius
I'm managing a 21 node Kafka cluster, running 0.8.2.1. Recently one 
broker crashed during software RAID rebuild after one of the OS disks 
failed (Kafka logs are stored on separate RAID1+0 drive).


At first the controller behaved as expected, marking the node as down 
and sending out messages to remaining brokers:


[2016-11-04 15:45:03,130] INFO [Controller 20]: Shutting down broker 11 
(kafka.controller.KafkaController)
[2016-11-04 15:45:03,134] DEBUG [Controller 20]: All shutting down 
brokers: 11 (kafka.controller.KafkaController)
[2016-11-04 15:45:03,134] DEBUG [Controller 20]: Live brokers: 
5,10,14,20,1,6,21,9,13,2,17,22,12,7,3,18,16,8,4,15 
(kafka.controller.KafkaController)
[2016-11-04 15:45:03,147] DEBUG [Controller 20]: Removing replica 11 
from ISR 5,11,12 for partition [topic_xx,3]. 
(kafka.controller.KafkaController)


The last line was repeated for each partition on the failed broker until 
it reached __consumer_offsets,39:


[2016-11-04 15:45:03,447] DEBUG [Controller 20]: Removing replica 11 
from ISR 12,16,11 for partition [__consumer_offsets,39]. 
(kafka.controller.KafkaController)
[2016-11-04 15:45:03,450] INFO [Controller 20]: New leader and ISR for 
partition [__consumer_offsets,39] is 
{"leader":16,"leader_epoch":258,"isr":[12,16]} 
(kafka.controller.KafkaController)
[2016-11-04 15:45:03,451] DEBUG The stop replica request (delete = true) 
sent to broker 11 is (kafka.controller.ControllerBrokerRequestBatch)
[2016-11-04 15:45:03,451] DEBUG The stop replica request (delete = 
false) sent to broker 11 is 
[Topic=__consumer_offsets,Partition=39,Replica=11] 
(kafka.controller.ControllerBrokerRequestBatch)


After which (12 seconds later) broker change listener fired:

[2016-11-04 15:45:15,008] INFO [BrokerChangeListener on Controller 20]: 
Broker change listener fired for path /brokers/ids with children 
22,17,18,15,16,13,14,12,21,3,2,20,1,10,7,6,5,4,9,8 
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)


And then the logs just kept looping this:

[2016-11-04 15:45:33,780] WARN [Controller-20-to-broker-11-send-thread], 
Controller 20 epoch 103 fails to send request Name: StopReplicaRequest; 
Version: 0; CorrelationId: 8831; ClientId: ; DeletePartitions: false; 
ControllerId: 20; ControllerEpoch: 103; Partitions: 
[__consumer_offsets,39] to broker id:11,host:10.0.12.41,port:9092. 
Reconnecting to broker. (kafka.controller.RequestSendThread)
[2016-11-04 15:46:03,810] INFO [Controller-20-to-broker-11-send-thread], 
Controller 20 connected to id:11,host:10.0.12.41,port:9092 for sending 
state change requests (kafka.controller.RequestSendThread)


While it was happening, most producers were still trying to reach the 
failed node and throwing an I/O error. An hour later, after restarting 
the controller everything went back to normal.


I searched for known issues and this seems to be the closest to what 
happened https://issues.apache.org/jira/browse/KAFKA-2125 but it's 
marked as "Resolved" and not quite clear what was fixed and where. It 
mentions connection quotas, but each broker has max.connections.per.ip 
set to 1000, so it's not likely that this limit was reached and 
monitoring tools do not show any abnormal growth in connections 
following the initial broker crash.


Also, what I find strange is that logs seem to indicate controller 
managed to connect to the dead broker each time, but then timed out. I 
know for a fact that broker was unreachable, so either the log message 
is worded incorrectly, or it was in fact connecting to something else.


Any ideas as to what happened here or where to look for more clues?