Re: Long log appending time
by the way, I have 3 brokers, the producing rate is about 250 messages/sec.. 2017-12-22 16:20 GMT+08:00 陈江枫 <kan...@gmail.com>: > Hi, all > > I'm trying to optimize the long tail latency in kafka. > My OS is centos 4.3 kernel 2.6.32. Java version 1.8.0_45 > kafka version 0.10.2. > The disk is sata HDD. > > My log: > > [2017-12-22 15:29:58,156] TRACE [ReplicaFetcherThread-2-1], Follower 2 has > replica log end offset 10428 for partition kafka-monitor-topic-0. Received > 241 messages and leader hw 10428 (kafka.server.ReplicaFetcherThread) > [2017-12-22 15:29:58,621] TRACE Appended 241 to /home/disk0/kafka/kafka- > monitor-topic-0/.log at offset 10428 > (kafka.log.LogSegment) > > It took about 500ms to append log the disk. Is there any way to optimize > this? > I've set vm.swapness = 1, but it didnt help, could anyone give some > suggestions (other than switching to ssd)? >
Long log appending time
Hi, all I'm trying to optimize the long tail latency in kafka. My OS is centos 4.3 kernel 2.6.32. Java version 1.8.0_45 kafka version 0.10.2. The disk is sata HDD. My log: [2017-12-22 15:29:58,156] TRACE [ReplicaFetcherThread-2-1], Follower 2 has replica log end offset 10428 for partition kafka-monitor-topic-0. Received 241 messages and leader hw 10428 (kafka.server.ReplicaFetcherThread) [2017-12-22 15:29:58,621] TRACE Appended 241 to /home/disk0/kafka/kafka-monitor-topic-0/.log at offset 10428 (kafka.log.LogSegment) It took about 500ms to append log the disk. Is there any way to optimize this? I've set vm.swapness = 1, but it didnt help, could anyone give some suggestions (other than switching to ssd)?
Re: Kafka Custom Authentication & Authorization
You can implement your own authenticator, check SaslAuthenticator , and your own authorizer, check SimpleAuthorizer 2017-11-12 9:39 GMT+08:00 chidigam .: > Hi All, > To Authenticate & Authorize the producer and consumers, I want to > integrate with third party Entitlement manager. Is there reference > implementation ? > > Any link for doc or comments I appreciate. > > Regards > Bhanu >
Kafka latency optimization
Hi, everyone My kafka version is 0.10.2.1. My service have really low qps (1msg/sec). And our requirement for rtt is really strict. ( 99.9% < 30ms) Currently I've encounter a problem, when kafka run for a long time, 15 days or so, performance start to go down. 2017-10-21 was like Time .num of msgs . percentage cost<=2ms 0 0.000% 2ms1s 0 0.000% But recently, it became : cost<=2ms 0 0.000% 2ms 1s 0 0.000% When I check the log, I don't see a way to check the reason why a specific message have a high rtt. And if there's any way to optimize(OS tune, broker config), please enlighten me
Re: Consumer blocked when auth failed.
Hi, I intentionally use the wrong credential, get the following log: *java.net.ConnectException: Connection refused* * at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)* * at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)* * at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)* * at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:81)* * at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:335)* * at org.apache.kafka.common.network.Selector.poll(Selector.java:303)* * at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)* * at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)* * at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)* * at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138)* * at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216)* * at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193)* * at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:279)* * at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)* * at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)* * at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:168)* * at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:244)* Any Idea how to throw exception in the main thread instead of just blocking. 2017-07-20 14:11 GMT+08:00 Jason Gustafson <ja...@confluent.io>: > What log level do you have configured? You might bump up to DEBUG or TRACE > and see if anything stands out. > > -Jason > > On Tue, Jul 18, 2017 at 7:59 PM, 陈江枫 <kan...@gmail.com> wrote: > > > Hi, > > > > I was integrating Kafka with Spark, using DirectStream, when my > > authentication fail, the stream just blocked. No log, no exceptions were > > thrown. Could some one help to address such situtation > > >
Consumer blocked when auth failed.
Hi, I was integrating Kafka with Spark, using DirectStream, when my authentication fail, the stream just blocked. No log, no exceptions were thrown. Could some one help to address such situtation
kafka authenticate
Hi, all I'm trying to modify kafka authentication using our own authenticating procedure, authorization will stick to kafka's acls . Does every entry which fetches data from certain topic need to go through authentication? ( Including KafkaStreams, replica to leader ,etc.)
Re: How to choose one broker as Group Coordinator
when a consumer join a group, selection will be triggered, and then rebalance. 2017-02-15 17:59 GMT+08:00 Yuanjia: > Hi all, > Group Coordinator can be different for different consumer groups,When > a consumer wants to join a group,how to choose the Group Coordinator? > > Thanks, > Yuanjia Li >
Schema registry
Hi, I'm new to Kafka, and I would like to use schema registry to manager the schema of my topic. The schema I've created: curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{ "schema": "{\"type\": \"record\",\"name\": \"Customer\", \"fields\": [ { \"type\": \"int\", \"name\": \"id\" }, { \"type\": \"string\", \"name\": \"name\" } ] }" }' http://localhost:8081/subjects/Customer/versions HTTP/1.1 200 OK Date: Wed, 15 Feb 2017 07:03:49 GMT Content-Type: application/vnd.schemaregistry.v1+json Content-Length: 9 Server: Jetty(9.2.12.v20150709) {"id":21}% My Customer class: public class Customer { private int id; private String name; public Customer(int ID, String name) { this.id = ID; this.name = name; } //getters and setters } My main code is like the following: for (int i = 0; i < total; i++) { Customer customer = new Customer(i,"customer"+i); producer.send(new ProducerRecord("test1", customer)).get(); } I got the following error: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message Caused by: java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord at io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.getSchema(AbstractKafkaAvroSerDe.java:115) I thought I followed the example of the book