DumpLogSegments tool is used to dump partition data logs (not application logs).
Usage: ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/TEST-TOPIC-0/00000000000000000000.log Use --key-decoder-class , --value-decoder-class options to pass deserializers. On Fri, Mar 18, 2016 at 12:31 PM, Manikumar Reddy <ku...@nmsworks.co.in> wrote: > DumpLogSegments tool is used to dump partition data logs (not application > logs). > > Usage: > ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files > /tmp/kafka-logs/TEST-TOPIC-0/00000000000000000000.log > > Use --key-decoder-class , --key-decoder-class options to pass > deserializers. > > On Fri, Mar 18, 2016 at 12:17 PM, Fang Wong <fw...@salesforce.com> wrote: > >> Thanks Guozhang: >> >> I put server.log in the command line, got the the following error: >> >> -bash-4.1$ ./kafka-run-class.sh kafka.tools.DumpLogSegments --files >> /home/kafka/logs/server.log >> Dumping /home/sfdc/logs/liveAgent/kafka/logs/server.log >> Exception in thread "main" java.lang.NumberFormatException: For input >> string: "server" >> at >> >> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) >> at java.lang.Long.parseLong(Long.java:589) >> at java.lang.Long.parseLong(Long.java:631) >> at >> scala.collection.immutable.StringLike$class.toLong(StringLike.scala:230) >> at scala.collection.immutable.StringOps.toLong(StringOps.scala:31) >> at >> >> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpLog(DumpLogSegments.scala:135) >> at >> >> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:77) >> at >> >> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73) >> at >> >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73) >> at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) >> >> Also how to pass the right deserializers? >> >> Thanks, >> Fang >> >> On Wed, Mar 16, 2016 at 4:15 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >> > Fang, >> > >> > You can use the kafka.tools.DumpLogSegments to scan and view the logs, >> but >> > you need the right deserializers to illustrate the content. >> > >> > Guozhang >> > >> > On Wed, Mar 16, 2016 at 4:03 PM, Fang Wong <fw...@salesforce.com> >> wrote: >> > >> > > Thanks Guozhang! >> > > We are in the process of upgrading to 0.9.0.0. We will look into using >> > > ACLs. >> > > >> > > Is there a way to see what is the request in the kafka server, the >> > request >> > > for my case is byte[]? Is there a way to turn on kafka logging to see >> the >> > > request on the kafka server side? >> > > >> > > Thanks, >> > > Fang >> > > >> > > On Wed, Mar 16, 2016 at 12:03 PM, Guozhang Wang <wangg...@gmail.com> >> > > wrote: >> > > >> > > > Before 0.9 and before anyone knows your server host / port can >> produce >> > > > request to you unless you have a hardware LB or firewall. >> > > > >> > > > In the recent release of 0.9, there is a Security feature added to >> > Kafka, >> > > > including encryption / authentication / authorization. For your >> case, I >> > > > would suggest you upgrade to 0.9 and use its authorization mechanism >> > > using >> > > > ACLs. >> > > > >> > > > Guozhang >> > > > >> > > > >> > > > On Wed, Mar 16, 2016 at 11:36 AM, Fang Wong <fw...@salesforce.com> >> > > wrote: >> > > > >> > > > > Hi Guozhang, >> > > > > >> > > > > The problem is that server "10.225.36.226" is not one of my kafka >> > > > clients, >> > > > > nslookup shows it is another internal server, my servers are like >> > > > > 10.224.146.6 <http://10.224.146.63:9092/>#, I can't even login to >> > that >> > > > > server. All of my messages are at most a few KB. >> > > > > >> > > > > Is it possible anybody within the internal network can send any >> > message >> > > > to >> > > > > kafka? How do I allow a list of fixed servers can send a request >> to >> > > kafka >> > > > > server? >> > > > > >> > > > > Thanks, >> > > > > Fang >> > > > > >> > > > > >> > > > > On Tue, Mar 15, 2016 at 5:31 PM, Guozhang Wang < >> wangg...@gmail.com> >> > > > wrote: >> > > > > >> > > > > > Fang, >> > > > > > >> > > > > > From the logs you showed above there is a single produce request >> > with >> > > > > very >> > > > > > large request size: >> > > > > > >> > > > > > "[2016-03-14 06:43:03,579] INFO Closing socket connection to >> > > > > > /10.225.36.226 due to invalid request: Request of length >> > *808124929* >> > > > is >> > > > > > not valid, it is larger than the maximum size of 104857600 >> bytes. >> > > > > > (kafka.network.Processor)" >> > > > > > >> > > > > > Which is about 770MB while the maximum request size is >> configured >> > as >> > > > > 100MB. >> > > > > > It is from the client hosted at "10.225.36.226", if you can go >> to >> > > that >> > > > > > server and checks the producer logs around that time, maybe you >> can >> > > > > > discover why there comes a single big produce request. >> > > > > > >> > > > > > Guozhang >> > > > > > >> > > > > > >> > > > > > On Mon, Mar 14, 2016 at 1:59 PM, Fang Wong < >> fw...@salesforce.com> >> > > > wrote: >> > > > > > >> > > > > > > After changing log level from INFO to TRACE, here is kafka >> > > > server.log: >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,568] TRACE 156 bytes written. >> > > > > > > (kafka.network.BoundedByteBufferSend) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,575] TRACE 68 bytes read. >> > > > > > > (kafka.network.BoundedByteBufferReceive) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], >> > > Follower >> > > > 8 >> > > > > > > has replica log end offset 0 for partition >> > [__consumer_offsets,20]. >> > > > > > > Received 0 messages and leader hw 0 >> > > > > > > (kafka.server.ReplicaFetcherThread) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], >> > > Follower >> > > > 8 >> > > > > > > has replica log end offset 0 after appending 0 bytes of >> messages >> > > for >> > > > > > > partition [__consumer_offsets,20] >> > > (kafka.server.ReplicaFetcherThread) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,575] TRACE Setting high watermark for >> > replica >> > > 8 >> > > > > > > partition [__consumer_offsets,20] on broker 8 to [0 [-1 : -1]] >> > > > > > > (kafka.cluster.Replica) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], >> > > Follower >> > > > 8 >> > > > > > > set replica high watermark for partition >> [__consumer_offsets,20] >> > > to 0 >> > > > > > > (kafka.server.ReplicaFetcherThread) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], >> > > Follower >> > > > 8 >> > > > > > > has replica log end offset 0 for partition >> > [__consumer_offsets,12]. >> > > > > > > Received 0 messages and leader hw 0 >> > > > > > > (kafka.server.ReplicaFetcherThread) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], >> > > Follower >> > > > 8 >> > > > > > > has replica log end offset 0 after appending 0 bytes of >> messages >> > > for >> > > > > > > partition [__consumer_offsets,12] >> > > (kafka.server.ReplicaFetcherThread) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,575] TRACE Setting high watermark for >> > replica >> > > 8 >> > > > > > > partition [__consumer_offsets,12] on broker 8 to [0 [-1 : -1]] >> > > > > > > (kafka.cluster.Replica) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], >> > > Follower >> > > > 8 >> > > > > > > set replica high watermark for partition >> [__consumer_offsets,12] >> > > to 0 >> > > > > > > (kafka.server.ReplicaFetcherThread) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], >> > Issuing >> > > > to >> > > > > > > broker 7 of fetch request Name: FetchRequest; Version: 0; >> > > > > > > CorrelationId: 397497; ClientId: ReplicaFetcherThread-0-7; >> > > ReplicaId: >> > > > > > > 8; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: >> > > > > > > [__consumer_offsets,20] -> >> > > > > > > PartitionFetchInfo(0,1048576),[__consumer_offsets,12] -> >> > > > > > > PartitionFetchInfo(0,1048576) >> (kafka.server.ReplicaFetcherThread) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,575] TRACE 110 bytes written. >> > > > > > > (kafka.network.BoundedByteBufferSend) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,579] DEBUG Accepted connection from >> > > > > > > /10.225.36.226 on /10.224.146.63:9092. sendBufferSize >> > > > > > > [actual|requested]: [102400|102400] recvBufferSize >> > > > [actual|requested]: >> > > > > > > [102400|102400] (kafka.network.Acceptor) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,579] TRACE Processor id 2 selection time >> = >> > > > > > > 145604848 ns (kafka.network.Processor) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,579] DEBUG Processor 2 listening to new >> > > > > > > connection from /10.225.36.226:36151 >> (kafka.network.Processor) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,579] TRACE Processor id 2 selection time >> = >> > > 3588 >> > > > > > > ns (kafka.network.Processor) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,579] INFO Closing socket connection to >> > > > > > > /10.225.36.226 due to invalid request: Request of length >> > 808124929 >> > > > is >> > > > > > > not valid, it is larger than the maximum size of 104857600 >> bytes. >> > > > > > > (kafka.network.Processor) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,580] DEBUG Closing connection from >> > > > > > > /10.225.36.226:36151 (kafka.network.Processor) >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Here is kafka-request.log: >> > > > > > > >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,463] TRACE Completed request:Name: >> > > FetchRequest; >> > > > > > > Version: 0; CorrelationId: 399486; ClientId: >> > > > ReplicaFetcherThread-0-8; >> > > > > > > ReplicaId: 6; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: >> > > > > > > [__consumer_offsets,5] -> PartitionFetchInfo(0,1048576) from >> > client >> > > > > > > /10.224.146.61:10716 >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> ;totalTime:501,requestQueueTime:0,localTime:1,remoteTime:500,responseQueueTime:0,sendTime:0 >> > > > > > > (kafka.request.logger) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,463] TRACE Processor 1 received request : >> > > Name: >> > > > > > > FetchRequest; Version: 0; CorrelationId: 399487; ClientId: >> > > > > > > ReplicaFetcherThread-0-8; ReplicaId: 6; MaxWait: 500 ms; >> > MinBytes: >> > > 1 >> > > > > > > bytes; RequestInfo: [__consumer_offsets,5] -> >> > > > > > > PartitionFetchInfo(0,1048576) (kafka.network.RequestChannel$) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,744] TRACE Completed request:Name: >> > > FetchRequest; >> > > > > > > Version: 0; CorrelationId: 397091; ClientId: >> > > > ReplicaFetcherThread-0-8; >> > > > > > > ReplicaId: 4; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: >> > > > > > > [__consumer_offsets,37] -> >> > > > > > > PartitionFetchInfo(0,1048576),[__consumer_offsets,45] -> >> > > > > > > PartitionFetchInfo(0,1048576) from client >> > > > > > > /10.224.146.59:12535 >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> ;totalTime:502,requestQueueTime:0,localTime:1,remoteTime:500,responseQueueTime:0,sendTime:1 >> > > > > > > (kafka.request.logger) >> > > > > > > >> > > > > > > [2016-03-14 06:43:03,744] TRACE Processor 1 received request : >> > > Name: >> > > > > > > FetchRequest; Version: 0; CorrelationId: 397092; ClientId: >> > > > > > > ReplicaFetcherThread-0-8; ReplicaId: 4; MaxWait: 500 ms; >> > MinBytes: >> > > 1 >> > > > > > > bytes; RequestInfo: [__consumer_offsets,37] -> >> > > > > > > PartitionFetchInfo(0,1048576),[__consumer_offsets,45] -> >> > > > > > > PartitionFetchInfo(0,1048576) (kafka.network.RequestChannel$) >> > > > > > > >> > > > > > > >> > > > > > > From the log, do you think it is still due to the batch size >> and >> > > > which >> > > > > > > version the fix was in? >> > > > > > > >> > > > > > > >> > > > > > > Thanks >> > > > > > > >> > > > > > > Fang >> > > > > > > >> > > > > > > >> > > > > > > On Tue, Mar 8, 2016 at 11:56 AM, Guozhang Wang < >> > wangg...@gmail.com >> > > > >> > > > > > wrote: >> > > > > > > >> > > > > > > > I cannot think of an encoding or partial message issue at >> top >> > of >> > > my >> > > > > > head >> > > > > > > > (browsed through 0.8.2.2 tickets, none of them seems related >> > > > either). >> > > > > > > > >> > > > > > > > Guozhang >> > > > > > > > >> > > > > > > > On Tue, Mar 8, 2016 at 11:45 AM, Fang Wong < >> > fw...@salesforce.com >> > > > >> > > > > > wrote: >> > > > > > > > >> > > > > > > > > Thanks Guozhang! >> > > > > > > > > >> > > > > > > > > No I don't have a way to reproduce this issue. It randomly >> > > > > happens, I >> > > > > > > am >> > > > > > > > > changing the log level from INFO to trace to see if I can >> get >> > > the >> > > > > > exact >> > > > > > > > > message what was sent when this happens. >> > > > > > > > > >> > > > > > > > > Could it also be some encoding issue or partial message >> > > related? >> > > > > > > > > >> > > > > > > > > Thanks, >> > > > > > > > > Fang >> > > > > > > > > >> > > > > > > > > On Mon, Mar 7, 2016 at 5:03 PM, Guozhang Wang < >> > > > wangg...@gmail.com> >> > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > John, >> > > > > > > > > > >> > > > > > > > > > There is not a specific JIRA for this change as it is >> only >> > > > > > > implemented >> > > > > > > > in >> > > > > > > > > > the new Java producer: >> > > > > > > > > > >> > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1239 >> > > > > > > > > > >> > > > > > > > > > Related classes are RecordAccumulator and MemoryRecords: >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Fang, >> > > > > > > > > > >> > > > > > > > > > Do you have a way to re-produce the issue? I.e. if you >> have >> > > the >> > > > > > exact >> > > > > > > > > same >> > > > > > > > > > produce data at hand, could you validate that their >> > cumulated >> > > > > size >> > > > > > is >> > > > > > > > > less >> > > > > > > > > > than the limit and then try sending them to Kafka and >> see >> > if >> > > it >> > > > > > > always >> > > > > > > > > > triggers the problem? >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Guozhang >> > > > > > > > > > >> > > > > > > > > > On Mon, Mar 7, 2016 at 10:23 AM, Fang Wong < >> > > > fw...@salesforce.com >> > > > > > >> > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > No, we don't have compression turned on the batch >> size is >> > > the >> > > > > > > > default: >> > > > > > > > > > > 16384. >> > > > > > > > > > > But the message size is very small, even with that >> batch >> > > > size, >> > > > > it >> > > > > > > is >> > > > > > > > > > > impossible to exceed the size limit. >> > > > > > > > > > > >> > > > > > > > > > > Thanks, >> > > > > > > > > > > Fang >> > > > > > > > > > > >> > > > > > > > > > > On Sun, Mar 6, 2016 at 6:09 PM, John Dennison < >> > > > > > > > dennison.j...@gmail.com >> > > > > > > > > > >> > > > > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Guozhang, >> > > > > > > > > > > > >> > > > > > > > > > > > Do you know the ticket for for changing the >> "batching >> > > > > criterion >> > > > > > > > from >> > > > > > > > > > > > #.messages to bytes." I am unable to find it. >> Working >> > on >> > > > > > porting >> > > > > > > > > > > > a similar change to pykafka. >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > John >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > On Sat, Mar 5, 2016 at 4:29 PM, Guozhang Wang < >> > > > > > > wangg...@gmail.com> >> > > > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > Hello, >> > > > > > > > > > > > > >> > > > > > > > > > > > > Did you have compression turned on and batching >> (in >> > > terms >> > > > > of >> > > > > > > > > > > #.messages)? >> > > > > > > > > > > > > In that case the whole compressed message set is >> > > treated >> > > > > as a >> > > > > > > > > single >> > > > > > > > > > > > > message on the broker and hence could possibly >> exceed >> > > the >> > > > > > > limit. >> > > > > > > > > > > > > >> > > > > > > > > > > > > In newer versions we have changed the batching >> > > criterion >> > > > > from >> > > > > > > > > > > #.messages >> > > > > > > > > > > > to >> > > > > > > > > > > > > bytes, which is aimed at resolving such issues. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Guozhang >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Thu, Mar 3, 2016 at 1:04 PM, Fang Wong < >> > > > > > > fw...@salesforce.com> >> > > > > > > > > > > wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > > Got the following error message with Kafka >> 0.8.2.1 >> > : >> > > > > > > > > > > > > > [2016-02-26 20:33:43,025] INFO Closing socket >> > > > connection >> > > > > to >> > > > > > > /x >> > > > > > > > > due >> > > > > > > > > > to >> > > > > > > > > > > > > > invalid request: Request of length 1937006964 is >> > not >> > > > > valid, >> > > > > > > it >> > > > > > > > is >> > > > > > > > > > > > larger >> > > > > > > > > > > > > > than the maximum size of 104857600 bytes. >> > > > > > > > > (kafka.network.Processor) >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Didn't send a large message at all, it seems >> like >> > > > > encoding >> > > > > > > > issue >> > > > > > > > > or >> > > > > > > > > > > > > partial >> > > > > > > > > > > > > > request, any suggestion how to fix it? >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > The code is like below: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > ByteArrayOutputStream bos = new >> > > > > > ByteArrayOutputStream(); >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > DataOutputStream dos = new >> > DataOutputStream(bos); >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > dos.writeLong(System.currentTimeMillis()); >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > OutputStreamWriter byteWriter = new >> > > > > > > OutputStreamWriter(bos, >> > > > > > > > > > > > > > >> com.force.commons.text.EncodingUtil.UTF_ENCODING); >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > gson.toJson(obj, byteWriter); >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > byte[] payload = bos.toByteArray(); >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > ProducerRecord<String, byte[]> data = new >> > > > > > > > > > ProducerRecord<String, >> > > > > > > > > > > > > > byte[]>(“Topic”, 0, null, payload); >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > kafkaProducer.send(data); >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > -- >> > > > > > > > > > > > > -- Guozhang >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > -- >> > > > > > > > > > -- Guozhang >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > -- >> > > > > > > > -- Guozhang >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > -- >> > > > > > -- Guozhang >> > > > > > >> > > > > >> > > > >> > > > >> > > > >> > > > -- >> > > > -- Guozhang >> > > > >> > > >> > >> > >> > >> > -- >> > -- Guozhang >> > >> > >