Re: Long log appending time

2017-12-22 Thread
by the way,  I have 3 brokers, the producing rate is about 250
messages/sec..

2017-12-22 16:20 GMT+08:00 陈江枫 <kan...@gmail.com>:

> Hi, all
>
> I'm trying to optimize the long tail latency in kafka.
> My OS is centos 4.3 kernel 2.6.32. Java version 1.8.0_45
> kafka version 0.10.2.
> The disk is sata HDD.
>
> My log:
>
> [2017-12-22 15:29:58,156] TRACE [ReplicaFetcherThread-2-1], Follower 2 has
> replica log end offset 10428 for partition kafka-monitor-topic-0. Received
> 241 messages and leader hw 10428 (kafka.server.ReplicaFetcherThread)
> [2017-12-22 15:29:58,621] TRACE Appended 241 to /home/disk0/kafka/kafka-
> monitor-topic-0/.log at offset 10428
> (kafka.log.LogSegment)
>
> It took about 500ms to append log the disk. Is there any way to optimize
> this?
> I've set vm.swapness = 1, but it didnt help, could anyone give some
> suggestions (other than switching to ssd)?
>


Long log appending time

2017-12-22 Thread
Hi, all

I'm trying to optimize the long tail latency in kafka.
My OS is centos 4.3 kernel 2.6.32. Java version 1.8.0_45
kafka version 0.10.2.
The disk is sata HDD.

My log:

[2017-12-22 15:29:58,156] TRACE [ReplicaFetcherThread-2-1], Follower 2 has
replica log end offset 10428 for partition kafka-monitor-topic-0. Received
241 messages and leader hw 10428 (kafka.server.ReplicaFetcherThread)
[2017-12-22 15:29:58,621] TRACE Appended 241 to
/home/disk0/kafka/kafka-monitor-topic-0/.log at offset
10428 (kafka.log.LogSegment)

It took about 500ms to append log the disk. Is there any way to optimize
this?
I've set vm.swapness = 1, but it didnt help, could anyone give some
suggestions (other than switching to ssd)?


Re: Kafka Custom Authentication & Authorization

2017-11-14 Thread
You can implement your own authenticator, check SaslAuthenticator  , and
your own authorizer, check SimpleAuthorizer

2017-11-12 9:39 GMT+08:00 chidigam . :

> Hi All,
> To Authenticate & Authorize the producer and consumers, I want to
> integrate  with third party Entitlement manager. Is there reference
> implementation ?
>
> Any link for doc or comments I appreciate.
>
> Regards
> Bhanu
>


Kafka latency optimization

2017-11-03 Thread
Hi, everyone
My kafka version is 0.10.2.1.
My service have really low qps (1msg/sec). And our requirement for rtt is
really strict. ( 99.9% < 30ms)
Currently I've encounter a problem, when kafka run for a long time, 15 days
or so, performance start to go down.
2017-10-21 was like
Time .num of msgs .  percentage

cost<=2ms

0

0.000%

2ms1s

0

0.000%

But recently, it became :

cost<=2ms

0

0.000%

2ms1s

0

0.000%
When I check the log, I don't see a way to check the reason why a specific
message have a high rtt. And if there's any way to optimize(OS tune, broker
config), please enlighten me


Re: Consumer blocked when auth failed.

2017-07-24 Thread
Hi,
I intentionally use the wrong credential, get the following log:

 *java.net.ConnectException: Connection refused*
* at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)*
* at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)*
* at
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)*
* at
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:81)*
* at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:335)*
* at org.apache.kafka.common.network.Selector.poll(Selector.java:303)*
* at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)*
* at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)*
* at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)*
* at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138)*
* at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216)*
* at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193)*
* at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:279)*
* at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)*
* at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)*
* at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:168)*
* at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:244)*


Any Idea how to throw exception in the main thread instead of just blocking.

2017-07-20 14:11 GMT+08:00 Jason Gustafson <ja...@confluent.io>:

> What log level do you have configured? You might bump up to DEBUG or TRACE
> and see if anything stands out.
>
> -Jason
>
> On Tue, Jul 18, 2017 at 7:59 PM, 陈江枫 <kan...@gmail.com> wrote:
>
> > Hi,
> >
> > I was integrating Kafka with Spark, using DirectStream, when my
> > authentication fail,  the stream just blocked. No log, no exceptions were
> > thrown. Could some one help to address such situtation
> >
>


Consumer blocked when auth failed.

2017-07-18 Thread
Hi,

I was integrating Kafka with Spark, using DirectStream, when my
authentication fail,  the stream just blocked. No log, no exceptions were
thrown. Could some one help to address such situtation


kafka authenticate

2017-03-06 Thread
Hi, all
I'm trying to modify kafka authentication using our own authenticating
procedure, authorization will stick to kafka's acls .
Does every entry which fetches data from certain topic need to go through
authentication? ( Including KafkaStreams, replica to leader ,etc.)


Re: How to choose one broker as Group Coordinator

2017-02-15 Thread
when a consumer join a group, selection will be triggered, and then
rebalance.

2017-02-15 17:59 GMT+08:00 Yuanjia :

> Hi all,
> Group Coordinator can be different for different consumer groups,When
> a consumer wants to join a group,how to choose the Group Coordinator?
>
> Thanks,
> Yuanjia Li
>


Schema registry

2017-02-15 Thread
Hi, I'm new to Kafka, and I would like to use schema registry to manager
the schema of my topic.

The schema I've created:

curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json"
--data '{ "schema": "{\"type\": \"record\",\"name\": \"Customer\",
\"fields\": [ { \"type\": \"int\", \"name\": \"id\" }, { \"type\":
\"string\", \"name\": \"name\" } ] }" }'
http://localhost:8081/subjects/Customer/versions
HTTP/1.1 200 OK
Date: Wed, 15 Feb 2017 07:03:49 GMT
Content-Type: application/vnd.schemaregistry.v1+json
Content-Length: 9
Server: Jetty(9.2.12.v20150709)

{"id":21}%
My Customer class:

public class Customer {

private int id;

private String name;

public Customer(int ID, String name) {
this.id = ID;
this.name = name;
}

//getters and setters
}


My main code is like the following:

for (int i = 0; i < total; i++) {
Customer customer = new Customer(i,"customer"+i);
producer.send(new ProducerRecord("test1",
customer)).get();
}

I got the following error:

org.apache.kafka.common.errors.SerializationException: Error
serializing Avro message
Caused by: java.lang.IllegalArgumentException: Unsupported Avro type.
Supported types are null, Boolean, Integer, Long, Float, Double,
String, byte[] and IndexedRecord
at 
io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.getSchema(AbstractKafkaAvroSerDe.java:115)


I thought I followed the example of the book