Re: Producer does not fetch metadata when attempting send to topic

2017-04-30 Thread Dmitry Minkovsky
I figured it out :).

After playing with various sync/async variants of the above example, it's
quite simple: I needed to get off the callback thread in
`Producer#send(topic, callback)` before making the second send call. To do
this, I just changed `thenCompose()` to `thenComposeAsync()`. Probably
should have been doing this anyway, and probably with an executor service,
but that's another story :).

Hooray!

On Sun, Apr 30, 2017 at 7:37 PM, Dmitry Minkovsky 
wrote:

> I am attempting to send messages to two topics with a newly created
> producer.
>
> The first message sends fine, but for some reason, the producer does not
> fetch metadata for the second topic before attempting to send. So sending
> to the second topic fails. The producer fetches metadata for the second
> topic only after it fails sending to it for the the first time.
>
> Here is my test (Groovy/Java 8):
>
>
> // Helper function
>
> def send(KafkaProducer producer, ProducerRecord record) {
> CompletableFuture future = new CompletableFuture<>()
> producer.send(record, { meta, e ->
> if (e != null) {
> println "send failed exceptionally ${e.message}"
> future.completeExceptionally(e)
> } else {
> println "send succeeded"
> future.complete(null)
> }
> })
> future
> }
>
>
> // The test
>
> def "test"() {
>
> Properties config = new Properties();
> config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
> config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> 'localhost:9092');
> config.put(ProducerConfig.ACKS_CONFIG, 'all');
> config.put(ProducerConfig.RETRIES_CONFIG, 0);
>
> def producer = new KafkaProducer(config, new
> ByteArraySerializer(), new ByteArraySerializer())
>
> def topic1 = 'topic-1'
> def topic2 = 'topic-2'
> def key1 = 'key-1'.bytes
> def key2 = 'key-2'.bytes
> def message1 = 'some bytes'.bytes
> def message2 = 'more bytes'.bytes
>
> println " SENDING 1 "
> try {
> send(producer, new ProducerRecord(topic1, key1, message1))
>   .thenCompose({
> println " SENDING 2 "
> send(producer, new ProducerRecord(topic2, key2, message2))
>   })
>   .toCompletableFuture().get()
> }
> catch (Exception e) {
> // The above try clause throws. Catch here so that the sleep
> below happens
> // During sleep, metadata for `topic-2` is retrieved.
> Subsequent attempts
> // to send to `topic-2` will succeed. But the first attempt
> fails.
> }
>
> println 'before sleeping'
> sleep 5000
> println 'after sleeping'
>
>
> expect:
> true
> }
>
>
> Result:
>
> [2017-04-30 19:21:08,977] INFO 
> (org.apache.kafka.clients.producer.ProducerConfig:180)
> ProducerConfig values:
> acks = all
> batch.size = 16384
> block.on.buffer.full = false
> bootstrap.servers = [localhost:9092]
> buffer.memory = 33554432
> client.id =
> compression.type = none
> connections.max.idle.ms = 54
> interceptor.classes = null
> key.serializer = class org.apache.kafka.common.serialization.
> ByteArraySerializer
> linger.ms = 0
> max.block.ms = 5000
> max.in.flight.requests.per.connection = 5
> max.request.size = 1048576
> metadata.fetch.timeout.ms = 6
> metadata.max.age.ms = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.sample.window.ms = 3
> partitioner.class = class org.apache.kafka.clients.producer.internals.
> DefaultPartitioner
> receive.buffer.bytes = 32768
> reconnect.backoff.ms = 50
> request.timeout.ms = 3
> retries = 0
> retry.backoff.ms = 100
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> timeout.ms = 3
> value.serializer = class org.apache.kafka.common.serialization.
> ByteArraySerializer
>
> [2017-04-30 19:21:09,192] INFO 
> (org.apache.kafka.common.utils.AppInfoParser:83)
> Kafka version : 0.10.2.1
> [2017-04-30 19:21:09,193] INFO 
> (org.apache.kafka.common.utils.AppInfoParser:84)
> Kafka 

Re: Kafka Producer fails to get metadata on first send attempt

2017-04-30 Thread Dmitry Minkovsky
I just posted a new message with all of this distilled into a simple test
and log output.

On Sat, Apr 29, 2017 at 6:53 PM, Dmitry Minkovsky 
wrote:

> It appears—at least according to debug logs—that the metadata request is
> sent after the metadata update times out:
>
>
> [... stack trace ommitted ...]
>
> Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to
> update metadata after 5000 ms.
>
>
> [2017-04-29 18:47:24,264] DEBUG (org.apache.kafka.clients.NetworkClient:751)
> Sending metadata request (type=MetadataRequest,
> topics=session-create-requests,session-load-requests,*join-requests*) to
> node 0
>
>
> Note: *join-requests* is the topic I am attempting to produce to.
>
> [2017-04-29 18:47:24,272] DEBUG (org.apache.kafka.clients.Metadata:249)
> Updated cluster metadata version 4 to Cluster(id = zKIrpvAdStqc9xcfBUuRxw,
> nodes = [localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic
> = session-load-requests, partition = 0, leader = 0, replicas = [0], isr =
> [0]), Partition(topic = session-create-requests, partition = 0, leader = 0,
> replicas = [0], isr = [0]), Partition(topic = join-requests, partition = 0,
> leader = 0, replicas = [0], isr = [0])])
>
>
> After this, sending with the producer to `join-requests` works.
>
> On Sat, Apr 29, 2017 at 6:26 PM, Dmitry Minkovsky 
> wrote:
>
>> I have a producer that fails to get metadata when it first attempts to
>> send a record to a certain topic. It fails on
>>
>> ClusterAndWaitTime clusterAndWaitTime =
>> waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);  [0]
>>
>> and yields:
>>
>> org.apache.kafka.common.errors.TimeoutException: Failed to update
>> metadata after 3 ms.
>>
>> After the first attempt, however, the producer works fine.
>>
>> This problem does not occur when I isolate the producer in a test.
>>
>> However, when embedded in my service layer—which includes things like a
>> Kafka Streams application and a gRPC service—the above behavior occurs. I
>> played with this for a while and found that this occurs specifically when
>> the producer producers to another topic just before it produces to the one
>> that fails. If I remove the producing to the first topic, producing to the
>> second topic succeeds, even on the first try.
>>
>> Please let me know what data I could provide to help debug this. I looked
>> at logs at DEBUG level for the NetworkClient but nothing stuck out at me. I
>> am on 0.10.2.1, but this happens on 0.10.2.0 and 0.10.1.0 as well. I am not
>> using topic auto creation.
>>
>> Thank you,
>> Dmitry
>>
>>
>> [0] https://github.com/apache/kafka/blob/0.10.2.1/clients/src/ma
>> in/java/org/apache/kafka/clients/producer/KafkaProducer.java#L450
>>
>
>


Producer does not fetch metadata when attempting send to topic

2017-04-30 Thread Dmitry Minkovsky
I am attempting to send messages to two topics with a newly created
producer.

The first message sends fine, but for some reason, the producer does not
fetch metadata for the second topic before attempting to send. So sending
to the second topic fails. The producer fetches metadata for the second
topic only after it fails sending to it for the the first time.

Here is my test (Groovy/Java 8):


// Helper function

def send(KafkaProducer producer, ProducerRecord record) {
CompletableFuture future = new CompletableFuture<>()
producer.send(record, { meta, e ->
if (e != null) {
println "send failed exceptionally ${e.message}"
future.completeExceptionally(e)
} else {
println "send succeeded"
future.complete(null)
}
})
future
}


// The test

def "test"() {

Properties config = new Properties();
config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
'localhost:9092');
config.put(ProducerConfig.ACKS_CONFIG, 'all');
config.put(ProducerConfig.RETRIES_CONFIG, 0);

def producer = new KafkaProducer(config, new ByteArraySerializer(),
new ByteArraySerializer())

def topic1 = 'topic-1'
def topic2 = 'topic-2'
def key1 = 'key-1'.bytes
def key2 = 'key-2'.bytes
def message1 = 'some bytes'.bytes
def message2 = 'more bytes'.bytes

println " SENDING 1 "
try {
send(producer, new ProducerRecord(topic1, key1, message1))
  .thenCompose({
println " SENDING 2 "
send(producer, new ProducerRecord(topic2, key2, message2))
  })
  .toCompletableFuture().get()
}
catch (Exception e) {
// The above try clause throws. Catch here so that the sleep
below happens
// During sleep, metadata for `topic-2` is retrieved.
Subsequent attempts
// to send to `topic-2` will succeed. But the first attempt
fails.
}

println 'before sleeping'
sleep 5000
println 'after sleeping'


expect:
true
}


Result:

[2017-04-30 19:21:08,977] INFO
(org.apache.kafka.clients.producer.ProducerConfig:180) ProducerConfig
values:
acks = all
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 54
interceptor.classes = null
key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 6
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 3
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 3
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 3
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer

[2017-04-30 19:21:09,192] INFO
(org.apache.kafka.common.utils.AppInfoParser:83) Kafka version : 0.10.2.1
[2017-04-30 19:21:09,193] INFO
(org.apache.kafka.common.utils.AppInfoParser:84) Kafka commitId :
e89bffd6b2eff799
 SENDING 1 
[2017-04-30 19:21:09,358] DEBUG
(org.apache.kafka.clients.NetworkClient:767) Initialize connection to node
-1 for sending metadata request
[2017-04-30 19:21:09,358] DEBUG
(org.apache.kafka.clients.NetworkClient:627) Initiating connection to node
-1 at localhost:9092.
[2017-04-30 19:21:09,694] DEBUG
(org.apache.kafka.clients.NetworkClient:590) Completed connection to node
-1.  Fetching API versions.
[2017-04-30 19:21:09,695] DEBUG
(org.apache.kafka.clients.NetworkClient:603) Initiating API versions fetch
from node -1.
[2017-04-30 19:21:09,833] DEBUG
(org.apache.kafka.clients.NetworkClient:558) Recorded API versions for node
-1: (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3],
Offsets(2): 0 to 1 

Re: Controller connection failures

2017-04-30 Thread Chuck Musser
Running inside a Docker container introduced a slight wrinkle. It’s easily 
resolvable. This line from the controller log file was the tip off:

java.io.IOException: Connection to 64174aa85d04:9092 (id: 2 rack: null) failed 

64174aa85d04 was a container ID of one of the other brokers. All the brokers 
were advertising the container ID as their hostname and the controller was not 
able to resolve any of those names. The advertised hostname:port is set via the 
"advertised.listeners” configuration variable in server.properties and defaults 
to the hostname. But inside a Docker container, the hostname defaults to the 
container ID. This name is not resolvable anywhere but inside the container.

So, If you’re running Kafka in a container and expect other systems to connect 
to it via the IP address of the Docker host (the host running your container), 
there are a couple ways to achieve this:

Technique #1 (First solution found, works fine)

1. Leave the “listeners” parameter unconfigured, so that Kafka can bind to the 
private network address that Docker assigns the container. Docker will take 
care of routing packets from outside systems to the container’s Kafka process.

2. Set “advertised.listeners" to the hostname of the Docker host, so that the 
broker can advertise a resolvable name for other systems to contact.

Technique #2 (Discovered a little later, preferable due to less configuration)

1. Simply leave both “listeners” and “advertised.listeners" parameter 
unconfigured, but run the Docker container with the "—hostname” parameter, 
specifying the DNS name of the Docker host on which the container is running. 
This works because “advertised.listeners" will default to the specified name 
(which is resolvable). Also, Docker will add an entry to “/etc/hosts” that maps 
this name to the private network address of the container’s interface. So when 
“listeners” defaults to the same hostname, that name resolves locally to an 
address in the container and Kafka can successfully bind to this address on 
startup.

Also, thanks for the link to the controller internals doc. I wanted some more 
general info on how the system works, so that was informative.

Chuck
> On Apr 30, 2017, at 7:40 AM, Michal Borowiecki 
>  wrote:
> 
> Hi Chuck,
> 
> Are you running zookeepers in the same containers as Kafka brokers?
> 
> Kafka brokers should be able to communicate with any of the zookeepers and, 
> more importantly, zookeepers need to be able to talk to each-other.
> 
> Therefore, the zookeeper port should be exposed too (2181 by default), 
> otherwise you're not going to have a valid cluster.
> 
> Docs on the controller are here:
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals 
> 
> 
> Let me know if that helped.
> 

>> 



Re: Caching in Kafka Streams to ignore garbage message

2017-04-30 Thread Matthias J. Sax
Ah. Sorry.

You are right. Nevertheless, you can set an non-null dummy value like
`byte[0]` instead of the actual "tuple" to not blow up your storage
requirement.


-Matthias


On 4/30/17 10:24 AM, Michal Borowiecki wrote:
> Apologies, I must have not made myself clear.
> 
> I meant the values in the records coming from the input topic (which in
> turn are coming from kafka connect in the example at hand)
> 
> and not the records coming out of the join.
> 
> My intention was to warn against sending null values from kafka connect
> to the topic that is then meant to be read-in as a ktable to filter against.
> 
> 
> Am I clearer now?
> 
> 
> Cheers,
> 
> Michał
> 
> 
> On 30/04/17 18:14, Matthias J. Sax wrote:
>> Your observation is correct.
>>
>> If  you use inner KStream-KTable join, the join will implement the
>> filter automatically as the join will not return any result.
>>
>>
>> -Matthias
>>
>>
>>
>> On 4/30/17 7:23 AM, Michal Borowiecki wrote:
>>> I have something working on the same principle (except not using
>>> connect), that is, I put ids to filter on into a ktable and then (inner)
>>> join a kstream with that ktable.
>>>
>>> I don't believe the value can be null though. In a changlog null value
>>> is interpreted as a delete so won't be put into a ktable.
>>>
>>> The RocksDB store, for one, does this:
>>>
>>> private void putInternal(byte[] rawKey, byte[] rawValue) {
>>> if (rawValue == null) {
>>> try {
>>> db.delete(wOptions, rawKey);
>>>
>>> But any non-null value would do.
>>> Please correct me if miss-understood.
>>>
>>> Cheers,
>>> Michał
>>>
>>> On 27/04/17 22:44, Matthias J. Sax wrote:
>> I'd like to avoid repeated trips to the db, and caching a large amount of
>> data in memory.
 Lookups to the DB would be hard to get done anyway. Ie, it would not
 perform well, as all your calls would need to be synchronous...


>> Is it possible to send a message w/ the id as the partition key to a 
>> topic,
>> and then use the same id as the key, so the same node which will receive
>> the data for an id is the one which will process it?
 That is what I did propose (maybe it was not clear). If you use Connect,
 you can just import the ID into Kafka and leave the value empty (ie,
 null). This reduced you cache data to a minimum. And the KStream-KTable
 join work as you described it :)


 -Matthias

 On 4/27/17 2:37 PM, Ali Akhtar wrote:
> I'd like to avoid repeated trips to the db, and caching a large amount of
> data in memory.
>
> Is it possible to send a message w/ the id as the partition key to a 
> topic,
> and then use the same id as the key, so the same node which will receive
> the data for an id is the one which will process it?
>
>
> On Fri, Apr 28, 2017 at 2:32 AM, Matthias J. Sax 
> wrote:
>
>> The recommended solution would be to use Kafka Connect to load you DB
>> data into a Kafka topic.
>>
>> With Kafka Streams you read your db-topic as KTable and do a (inne)
>> KStream-KTable join to lookup the IDs.
>>
>>
>> -Matthias
>>
>> On 4/27/17 2:22 PM, Ali Akhtar wrote:
>>> I have a Kafka topic which will receive a large amount of data.
>>>
>>> This data has an 'id' field. I need to look up the id in an external db,
>>> see if we are tracking that id, and if yes, we process that message, if
>>> not, we ignore it.
>>>
>>> 99% of the data will be for ids which are not being tracked - 1% or so
>> will
>>> be for ids which are tracked.
>>>
>>> My concern is, that there'd be a lot of round trips to the db made just
>> to
>>> check the id, and if it'd be better to cache the ids being tracked
>>> somewhere, so other ids are ignored.
>>>
>>> I was considering sending a message to another (or the same topic)
>> whenever
>>> a new id is added to the track list, and that id should then get
>> processed
>>> on the node which will process the messages.
>>>
>>> Should I just cache all ids on all nodes (which may be a large amount),
>> or
>>> is there a way to only cache the id on the same kafka streams node which
>>> will receive data for that id?
>>>
>>> -- 
>>> Signature
>>>    Michal Borowiecki
>>> Senior Software Engineer L4
>>> T:  +44 208 742 1600
>>>
>>> 
>>> +44 203 249 8448
>>>
>>> 
>>>  
>>> E:  michal.borowie...@openbet.com
>>> W:  www.openbet.com 
>>>
>>> 
>>> OpenBet Ltd
>>>
>>> Chiswick Park Building 9
>>>
>>> 566 Chiswick High Rd
>>>
>>> London
>>>
>>> W4 5XT
>>>
>>> UK
>>>
>>> 
>>> 
>>>
>>> This message is confidential and intended only for the addressee. If you
>>> have received this message in error, please immediately 

Re: Does Kafka producer waits till previous batch returns responce before sending next one?

2017-04-30 Thread Hans Jespersen
Yes you understand correctly that batch == request

-hans

> On Apr 30, 2017, at 11:58 AM, Petr Novak  wrote:
> 
> Thank you a lot.
>  
> How requests in max.in.flight.requests.per.connection relates to batches? 1 
> request precisely means 1 batch? It would make sense if I think about it. 
> Just to ensure I understand correctly.
>  
> Petr
>  
> From: Michal Borowiecki [mailto:michal.borowie...@openbet.com] 
> Sent: 30. dubna 2017 20:51
> To: users@kafka.apache.org
> Subject: Re: Does Kafka producer waits till previous batch returns responce 
> before sending next one?
>  
> Yes, that's what the docs say in both places:
> 
> max.in.flight.requests.per.connection
> The maximum number of unacknowledged requests the client will send on a 
> single connection before blocking. Note that if this setting is set to be 
> greater than 1 and there are failed sends, there is a risk of message 
> re-ordering due to retries (i.e., if retries are enabled).
>  
> retries
> Setting a value greater than zero will cause the client to resend any record 
> whose send fails with a potentially transient error. Note that this retry is 
> no different than if the client resent the record upon receiving the error. 
> Allowing retries without setting max.in.flight.requests.per.connection to 1 
> will potentially change the ordering of records because if two batches are 
> sent to a single partition, and the first fails and is retried but the second 
> succeeds, then the records in the second batch may appear first.
> Cheers,
> Michał
> 
>  
> On 30/04/17 19:32, Jun MA wrote:
> Does this mean that if the client have retry > 0 and 
> max.in.flight.requests.per.connection > 1, then even if the topic only have 
> one partition, there’s still no guarantee of the ordering?
>  
> Thanks,
> Jun
> On Apr 30, 2017, at 7:57 AM, Hans Jespersen  wrote:
>  
> There is a parameter that controls this behavior called max.in. 
> flight.requests.per.connection
>  
> If you set max.in. flight.requests.per.connection = 1 then the producer waits 
> until previous produce requests returns a response before sending the next 
> one (or retrying). The retries parameter controller the number of times to 
> attempt to produce a batch after a failure.
>  
> If flight.requests.per.connection = 1 and retries is get to the maximum then 
> ordering is guaranteed.
>  
> If there is a timeout then the producer library would try again and again to 
> produce the message and will not skip over to try and produce the next 
> message.
>  
> If you set flight.requests.per.connection > 1 (I think the default is 5) then 
> you can get a commit log with messages out of order wrt the original 
> published order (because retries are done in parallel rather then in series)
>  
> -hans
>  
>  
>  
> On Apr 30, 2017, at 3:13 AM, Petr Novak  wrote:
>  
> Hello,
>  
> Does Kafka producer waits till previous batch returns response before
> sending next one? Do I assume correctly that it does not when retries can
> change ordering?
>  
>  
>  
> Hence batches delay is introduced only by producer internal send loop time
> and linger?
>  
>  
>  
> If a timeout would be localized only to a single batch send request for some
> reason, does it affect the next batch (assuming this batch can go through
> successfully)?
>  
>  
>  
> Many thanks,
>  
> Petr
>  
>  
>  
> -- 
> 
> 
> Michal Borowiecki
> Senior Software Engineer L4
> 
> T:
> +44 208 742 1600
> +44 203 249 8448
>  
> 
> E:
> michal.borowie...@openbet.com
> 
> W:
> www.openbet.com
> 
> OpenBet Ltd
> Chiswick Park Building 9
> 566 Chiswick High Rd
> London
> W4 5XT
> UK
> 
> This message is confidential and intended only for the addressee. If you have 
> received this message in error, please immediately notify the 
> postmas...@openbet.com and delete it from your system as well as any copies. 
> The content of e-mails as well as traffic data may be monitored by OpenBet 
> for employment and security purposes. To protect the environment please do 
> not print this e-mail unless necessary. OpenBet Ltd. Registered Office: 
> Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United 
> Kingdom. A company registered in England and Wales. Registered no. 3134634. 
> VAT no. GB927523612
>  


RE: Does Kafka producer waits till previous batch returns responce before sending next one?

2017-04-30 Thread Petr Novak
Thank you a lot.

 

How requests in max.in.flight.requests.per.connection relates to batches? 1 
request precisely means 1 batch? It would make sense if I think about it. Just 
to ensure I understand correctly.

 

Petr

 

From: Michal Borowiecki [mailto:michal.borowie...@openbet.com] 
Sent: 30. dubna 2017 20:51
To: users@kafka.apache.org
Subject: Re: Does Kafka producer waits till previous batch returns responce 
before sending next one?

 

Yes, that's what the docs say in both places:


max.in.flight.requests.per.connection

The maximum number of unacknowledged requests the client will send on a single 
connection before blocking. Note that if this setting is set to be greater than 
1 and there are failed sends, there is a risk of message re-ordering due to 
retries (i.e., if retries are enabled).

 


retries

Setting a value greater than zero will cause the client to resend any record 
whose send fails with a potentially transient error. Note that this retry is no 
different than if the client resent the record upon receiving the error. 
Allowing retries without setting max.in.flight.requests.per.connection to 1 
will potentially change the ordering of records because if two batches are sent 
to a single partition, and the first fails and is retried but the second 
succeeds, then the records in the second batch may appear first.

Cheers,

Michał

 

On 30/04/17 19:32, Jun MA wrote:

Does this mean that if the client have retry > 0 and 
max.in.flight.requests.per.connection > 1, then even if the topic only have one 
partition, there’s still no guarantee of the ordering?
 
Thanks,
Jun

On Apr 30, 2017, at 7:57 AM, Hans Jespersen   
 wrote:
 
There is a parameter that controls this behavior called max.in. 
flight.requests.per.connection
 
If you set max.in. flight.requests.per.connection = 1 then the producer waits 
until previous produce requests returns a response before sending the next one 
(or retrying). The retries parameter controller the number of times to attempt 
to produce a batch after a failure.
 
If flight.requests.per.connection = 1 and retries is get to the maximum then 
ordering is guaranteed.
 
If there is a timeout then the producer library would try again and again to 
produce the message and will not skip over to try and produce the next message.
 
If you set flight.requests.per.connection > 1 (I think the default is 5) then 
you can get a commit log with messages out of order wrt the original published 
order (because retries are done in parallel rather then in series)
 
-hans
 
 
 

On Apr 30, 2017, at 3:13 AM, Petr Novak   
 wrote:
 
Hello,
 
Does Kafka producer waits till previous batch returns response before
sending next one? Do I assume correctly that it does not when retries can
change ordering?
 
 
 
Hence batches delay is introduced only by producer internal send loop time
and linger?
 
 
 
If a timeout would be localized only to a single batch send request for some
reason, does it affect the next batch (assuming this batch can go through
successfully)?
 
 
 
Many thanks,
 
Petr
 

 

 

-- 




  

Michal Borowiecki


Senior Software Engineer L4





T: 

+44 208 742 1600


+44 203 249 8448


 




E: 

michal.borowie...@openbet.com




W: 

www.openbet.com  




OpenBet Ltd


Chiswick Park Building 9


566 Chiswick High Rd


London


W4 5XT


UK



  

 


This message is confidential and intended only for the addressee. If you have 
received this message in error, please immediately notify the 
postmas...@openbet.com and delete it from your system as well as any copies. 
The content of e-mails as well as traffic data may be monitored by OpenBet for 
employment and security purposes. To protect the environment please do not 
print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick 
Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A 
company registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612

 



Re: Does Kafka producer waits till previous batch returns responce before sending next one?

2017-04-30 Thread Michal Borowiecki

Yes, that's what the docs say in both places:

max.in.flight.requests.per.connection 	The maximum number of 
unacknowledged requests the client will send on a single connection 
before blocking. Note that if this setting is set to be greater than 1 
and there are failed sends, there is a risk of message re-ordering due 
to retries (i.e., if retries are enabled).




retries 	Setting a value greater than zero will cause the client to 
resend any record whose send fails with a potentially transient error. 
Note that this retry is no different than if the client resent the 
record upon receiving the error. Allowing retries without 
setting|max.in.flight.requests.per.connection|to 1 will potentially 
change the ordering of records because if two batches are sent to a 
single partition, and the first fails and is retried but the second 
succeeds, then the records in the second batch may appear first.



Cheers,

Michał


On 30/04/17 19:32, Jun MA wrote:

Does this mean that if the client have retry > 0 and 
max.in.flight.requests.per.connection > 1, then even if the topic only have one 
partition, there’s still no guarantee of the ordering?

Thanks,
Jun

On Apr 30, 2017, at 7:57 AM, Hans Jespersen  wrote:

There is a parameter that controls this behavior called max.in. 
flight.requests.per.connection

If you set max.in. flight.requests.per.connection = 1 then the producer waits 
until previous produce requests returns a response before sending the next one 
(or retrying). The retries parameter controller the number of times to attempt 
to produce a batch after a failure.

If flight.requests.per.connection = 1 and retries is get to the maximum then 
ordering is guaranteed.

If there is a timeout then the producer library would try again and again to 
produce the message and will not skip over to try and produce the next message.

If you set flight.requests.per.connection > 1 (I think the default is 5) then 
you can get a commit log with messages out of order wrt the original published 
order (because retries are done in parallel rather then in series)

-hans




On Apr 30, 2017, at 3:13 AM, Petr Novak  wrote:

Hello,

Does Kafka producer waits till previous batch returns response before
sending next one? Do I assume correctly that it does not when retries can
change ordering?



Hence batches delay is introduced only by producer internal send loop time
and linger?



If a timeout would be localized only to a single batch send request for some
reason, does it affect the next batch (assuming this batch can go through
successfully)?



Many thanks,

Petr



--
Signature
 Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com 


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK




This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com  and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Does Kafka producer waits till previous batch returns responce before sending next one?

2017-04-30 Thread Jun MA
Does this mean that if the client have retry > 0 and 
max.in.flight.requests.per.connection > 1, then even if the topic only have one 
partition, there’s still no guarantee of the ordering?

Thanks,
Jun
> On Apr 30, 2017, at 7:57 AM, Hans Jespersen  wrote:
> 
> There is a parameter that controls this behavior called max.in. 
> flight.requests.per.connection
> 
> If you set max.in. flight.requests.per.connection = 1 then the producer waits 
> until previous produce requests returns a response before sending the next 
> one (or retrying). The retries parameter controller the number of times to 
> attempt to produce a batch after a failure.
> 
> If flight.requests.per.connection = 1 and retries is get to the maximum then 
> ordering is guaranteed.
> 
> If there is a timeout then the producer library would try again and again to 
> produce the message and will not skip over to try and produce the next 
> message.
> 
> If you set flight.requests.per.connection > 1 (I think the default is 5) then 
> you can get a commit log with messages out of order wrt the original 
> published order (because retries are done in parallel rather then in series)
> 
> -hans
> 
> 
> 
>> On Apr 30, 2017, at 3:13 AM, Petr Novak  wrote:
>> 
>> Hello,
>> 
>> Does Kafka producer waits till previous batch returns response before
>> sending next one? Do I assume correctly that it does not when retries can
>> change ordering?
>> 
>> 
>> 
>> Hence batches delay is introduced only by producer internal send loop time
>> and linger?
>> 
>> 
>> 
>> If a timeout would be localized only to a single batch send request for some
>> reason, does it affect the next batch (assuming this batch can go through
>> successfully)?
>> 
>> 
>> 
>> Many thanks,
>> 
>> Petr
>> 



Re: Caching in Kafka Streams to ignore garbage message

2017-04-30 Thread Michal Borowiecki

Apologies, I must have not made myself clear.

I meant the values in the records coming from the input topic (which in 
turn are coming from kafka connect in the example at hand)


and not the records coming out of the join.

My intention was to warn against sending null values from kafka connect 
to the topic that is then meant to be read-in as a ktable to filter against.



Am I clearer now?


Cheers,

Michał


On 30/04/17 18:14, Matthias J. Sax wrote:

Your observation is correct.

If  you use inner KStream-KTable join, the join will implement the
filter automatically as the join will not return any result.


-Matthias



On 4/30/17 7:23 AM, Michal Borowiecki wrote:

I have something working on the same principle (except not using
connect), that is, I put ids to filter on into a ktable and then (inner)
join a kstream with that ktable.

I don't believe the value can be null though. In a changlog null value
is interpreted as a delete so won't be put into a ktable.

The RocksDB store, for one, does this:

private void putInternal(byte[] rawKey, byte[] rawValue) {
 if (rawValue == null) {
 try {
 db.delete(wOptions, rawKey);

But any non-null value would do.
Please correct me if miss-understood.

Cheers,
Michał

On 27/04/17 22:44, Matthias J. Sax wrote:

I'd like to avoid repeated trips to the db, and caching a large amount of
data in memory.

Lookups to the DB would be hard to get done anyway. Ie, it would not
perform well, as all your calls would need to be synchronous...



Is it possible to send a message w/ the id as the partition key to a topic,
and then use the same id as the key, so the same node which will receive
the data for an id is the one which will process it?

That is what I did propose (maybe it was not clear). If you use Connect,
you can just import the ID into Kafka and leave the value empty (ie,
null). This reduced you cache data to a minimum. And the KStream-KTable
join work as you described it :)


-Matthias

On 4/27/17 2:37 PM, Ali Akhtar wrote:

I'd like to avoid repeated trips to the db, and caching a large amount of
data in memory.

Is it possible to send a message w/ the id as the partition key to a topic,
and then use the same id as the key, so the same node which will receive
the data for an id is the one which will process it?


On Fri, Apr 28, 2017 at 2:32 AM, Matthias J. Sax 
wrote:


The recommended solution would be to use Kafka Connect to load you DB
data into a Kafka topic.

With Kafka Streams you read your db-topic as KTable and do a (inne)
KStream-KTable join to lookup the IDs.


-Matthias

On 4/27/17 2:22 PM, Ali Akhtar wrote:

I have a Kafka topic which will receive a large amount of data.

This data has an 'id' field. I need to look up the id in an external db,
see if we are tracking that id, and if yes, we process that message, if
not, we ignore it.

99% of the data will be for ids which are not being tracked - 1% or so

will

be for ids which are tracked.

My concern is, that there'd be a lot of round trips to the db made just

to

check the id, and if it'd be better to cache the ids being tracked
somewhere, so other ids are ignored.

I was considering sending a message to another (or the same topic)

whenever

a new id is added to the track list, and that id should then get

processed

on the node which will process the messages.

Should I just cache all ids on all nodes (which may be a large amount),

or

is there a way to only cache the id on the same kafka streams node which
will receive data for that id?


--
Signature
 Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com 


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK




This message is confidential and intended only for the addressee. If you
have received this message in error, please immediately notify the
postmas...@openbet.com  and delete it
from your system as well as any copies. The content of e-mails as well
as traffic data may be monitored by OpenBet for employment and security
purposes. To protect the environment please do not print this e-mail
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
registered in England and Wales. Registered no. 3134634. VAT no.
GB927523612



--
Signature
 Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com 


OpenBet Ltd

 

Re: Caching in Kafka Streams to ignore garbage message

2017-04-30 Thread Matthias J. Sax
Your observation is correct.

If  you use inner KStream-KTable join, the join will implement the
filter automatically as the join will not return any result.


-Matthias



On 4/30/17 7:23 AM, Michal Borowiecki wrote:
> I have something working on the same principle (except not using
> connect), that is, I put ids to filter on into a ktable and then (inner)
> join a kstream with that ktable.
> 
> I don't believe the value can be null though. In a changlog null value
> is interpreted as a delete so won't be put into a ktable.
> 
> The RocksDB store, for one, does this:
> 
> private void putInternal(byte[] rawKey, byte[] rawValue) {
> if (rawValue == null) {
> try {
> db.delete(wOptions, rawKey);
> 
> But any non-null value would do.
> Please correct me if miss-understood.
> 
> Cheers,
> Michał
> 
> On 27/04/17 22:44, Matthias J. Sax wrote:
 I'd like to avoid repeated trips to the db, and caching a large amount of
 data in memory.
>> Lookups to the DB would be hard to get done anyway. Ie, it would not
>> perform well, as all your calls would need to be synchronous...
>>
>>
 Is it possible to send a message w/ the id as the partition key to a topic,
 and then use the same id as the key, so the same node which will receive
 the data for an id is the one which will process it?
>> That is what I did propose (maybe it was not clear). If you use Connect,
>> you can just import the ID into Kafka and leave the value empty (ie,
>> null). This reduced you cache data to a minimum. And the KStream-KTable
>> join work as you described it :)
>>
>>
>> -Matthias
>>
>> On 4/27/17 2:37 PM, Ali Akhtar wrote:
>>> I'd like to avoid repeated trips to the db, and caching a large amount of
>>> data in memory.
>>>
>>> Is it possible to send a message w/ the id as the partition key to a topic,
>>> and then use the same id as the key, so the same node which will receive
>>> the data for an id is the one which will process it?
>>>
>>>
>>> On Fri, Apr 28, 2017 at 2:32 AM, Matthias J. Sax 
>>> wrote:
>>>
 The recommended solution would be to use Kafka Connect to load you DB
 data into a Kafka topic.

 With Kafka Streams you read your db-topic as KTable and do a (inne)
 KStream-KTable join to lookup the IDs.


 -Matthias

 On 4/27/17 2:22 PM, Ali Akhtar wrote:
> I have a Kafka topic which will receive a large amount of data.
>
> This data has an 'id' field. I need to look up the id in an external db,
> see if we are tracking that id, and if yes, we process that message, if
> not, we ignore it.
>
> 99% of the data will be for ids which are not being tracked - 1% or so
 will
> be for ids which are tracked.
>
> My concern is, that there'd be a lot of round trips to the db made just
 to
> check the id, and if it'd be better to cache the ids being tracked
> somewhere, so other ids are ignored.
>
> I was considering sending a message to another (or the same topic)
 whenever
> a new id is added to the track list, and that id should then get
 processed
> on the node which will process the messages.
>
> Should I just cache all ids on all nodes (which may be a large amount),
 or
> is there a way to only cache the id on the same kafka streams node which
> will receive data for that id?
>

> 
> -- 
> Signature
>  Michal Borowiecki
> Senior Software Engineer L4
>   T:  +44 208 742 1600
> 
>   
>   +44 203 249 8448
> 
>   
>
>   E:  michal.borowie...@openbet.com
>   W:  www.openbet.com 
> 
>   
>   OpenBet Ltd
> 
>   Chiswick Park Building 9
> 
>   566 Chiswick High Rd
> 
>   London
> 
>   W4 5XT
> 
>   UK
> 
>   
> 
> 
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmas...@openbet.com  and delete it
> from your system as well as any copies. The content of e-mails as well
> as traffic data may be monitored by OpenBet for employment and security
> purposes. To protect the environment please do not print this e-mail
> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
> registered in England and Wales. Registered no. 3134634. VAT no.
> GB927523612
> 



signature.asc
Description: OpenPGP digital signature


Kafka-streams process stopped processing messages

2017-04-30 Thread Shimi Kiviti
Hi Everyone,

I have a problem and I hope one of you can help me figuring it out.
One of our kafka-streams processes stopped processing messages

When I turn on debug log I see lots of these messages:

2017-04-30 15:42:20,228 [StreamThread-1] DEBUG o.a.k.c.c.i.Fetcher: Sending
fetch for partitions [devlast-changelog-2] to broker ip-x-x-x-x
.ec2.internal:9092 (id: 1 rack: null)
2017-04-30 15:42:20,696 [StreamThread-1] DEBUG o.a.k.c.c.i.Fetcher:
Ignoring fetched records for devlast-changelog-2 at offset 2962649 since
the current position is 2963379

After a LONG time, the only messages in the log are these:

2017-04-30 16:46:33,324 [kafka-coordinator-heartbeat-thread | sa] DEBUG
o.a.k.c.c.i.AbstractCoordinator: Sending Heartbeat request for group sa to
coordinator ip-x-x-x-x.ec2.internal:9092 (id: 2147483646 rack: null)
2017-04-30 16:46:33,425 [kafka-coordinator-heartbeat-thread | sa] DEBUG
o.a.k.c.c.i.AbstractCoordinator: Received successful Heartbeat response for
group same

Any idea?

Thanks,
Shimi


Re: Issue in Kafka running for few days

2017-04-30 Thread Michal Borowiecki

Ah, yes, you're right. I miss-read it.

My bad. Apologies.

Michal

On 30/04/17 16:02, Svante Karlsson wrote:

@michal

My interpretation is that he's running 2 instances of zookeeper - not 
6. (1 on the "4 broker machine" and one on the other)


I'm not sure where that leaves you in zookeeper land - ie if you 
happen to have a timeout between the two zookeepers will you be out of 
service or will you have a split brain problem? None of the 
alternatives are good. That said - it should be visible in the logs.


Anyway two zk is not a good config - stick to one or go to three.





2017-04-30 15:41 GMT+02:00 Michal Borowiecki 
>:


Hi Jan,

Correct. As I said before it's not common or recommended practice
to run an even number, and I wouldn't recommend it myself. I hope
it didn't sound as if I did.

However, I don't see how this would cause the issue at hand unless
at least 3 out of the 6 zookeepers died, but that could also have
happened in a 5 node setup.

In either case, changing the number of zookeepers is not a
prerequisite to progress debugging this issue further.

Cheers,

Michal


On 30/04/17 13:35, jan wrote:

I looked this up yesterday  when I read the grandparent, as my old
company ran two and I needed to know.
Your link is a bit ambiguous but it has a link to the zookeeper
Getting Started guide which says this:

"
For replicated mode, a minimum of three servers are required, and it
is strongly recommended that you have an odd number of servers. If you
only have two servers, then you are in a situation where if one of
them fails, there are not enough machines to form a majority quorum.
Two servers is inherently less stable than a single server, because
there are two single points of failure.
"




cheers

jan


On 30/04/2017, Michal Borowiecki
  wrote:

Svante, I don't share your opinion.
Having an even number of zookeepers is not a problem in itself, it
simply means you don't get any better resilience than if you had one
fewer instance.
Yes, it's not common or recommended practice, but you are allowed to
have an even number of zookeepers and it's most likely not related to
the problem at hand and does NOT need to be addressed first.

https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup



Abhit, I'm afraid the log snippet is not enough for me to help.
Maybe someone else in the community with more experience can recognize
the symptoms but in the meantime, if you haven't already done so, you
may want to search for similar issues:


https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22ZK%20expired%3B%20shut%20down%20all%20controller%22



searching for text like "ZK expired; shut down all controller" or "No
broker in ISR is alive for" or other interesting events form the log.

Hope that helps,
Michal


On 26/04/17 21:40, Svante Karlsson wrote:

You are not supposed to run an even number of zookeepers. Fix that first

On Apr 26, 2017 20:59, "Abhit Kalsotra" 
  wrote:


Any pointers please


Abhi

On Wed, Apr 26, 2017 at 11:03 PM, Abhit Kalsotra 

wrote:


Hi *

My kafka setup


**OS: Windows Machine*6 broker nodes , 4 on one Machine and 2 on other
Machine*

**ZK instance on (4 broker nodes Machine) and another ZK on (2 broker
nodes machine)*
** 2 Topics with partition size = 50 and replication factor = 3*

I am producing on an average of around 500 messages / sec with each
message size close to 98 bytes...

More or less the message rate stays constant throughout, but after

running

the setup for close to 2 weeks , my Kafka cluster broke and this
happened
twice in a month.  Not able to understand what's the issue, Kafka gurus
please do share your inputs...

the controlle.log file at the time of Kafka broken looks like




*[2017-04-26 12:03:34,998] INFO [Controller 0]: Broker failure callback
for 0,1,3,5,6 (kafka.controller.KafkaController)[2017-04-26

12:03:34,998]

INFO [Controller 0]: Removed ArrayBuffer() from list of shutting down
brokers. (kafka.controller.KafkaController)[2017-04-26 12:03:34,998]

INFO

[Partition state machine on 

Re: Issue in Kafka running for few days

2017-04-30 Thread Svante Karlsson
@michal

My interpretation is that he's running 2 instances of zookeeper - not 6. (1
on the "4 broker machine" and one on the other)

I'm not sure where that leaves you in zookeeper land - ie if you happen to
have a timeout between the two zookeepers will you be out of service or
will you have a split brain problem? None of the alternatives are good.
That said - it should be visible in the logs.

Anyway two zk is not a good config - stick to one or go to three.





2017-04-30 15:41 GMT+02:00 Michal Borowiecki 
:

> Hi Jan,
>
> Correct. As I said before it's not common or recommended practice to run
> an even number, and I wouldn't recommend it myself. I hope it didn't sound
> as if I did.
>
> However, I don't see how this would cause the issue at hand unless at
> least 3 out of the 6 zookeepers died, but that could also have happened in
> a 5 node setup.
>
> In either case, changing the number of zookeepers is not a prerequisite to
> progress debugging this issue further.
>
> Cheers,
>
> Michal
>
> On 30/04/17 13:35, jan wrote:
>
> I looked this up yesterday  when I read the grandparent, as my old
> company ran two and I needed to know.
> Your link is a bit ambiguous but it has a link to the zookeeper
> Getting Started guide which says this:
>
> "
> For replicated mode, a minimum of three servers are required, and it
> is strongly recommended that you have an odd number of servers. If you
> only have two servers, then you are in a situation where if one of
> them fails, there are not enough machines to form a majority quorum.
> Two servers is inherently less stable than a single server, because
> there are two single points of failure.
> "
>  
> 
>
> cheers
>
> jan
>
>
> On 30/04/2017, Michal Borowiecki  
>  wrote:
>
> Svante, I don't share your opinion.
> Having an even number of zookeepers is not a problem in itself, it
> simply means you don't get any better resilience than if you had one
> fewer instance.
> Yes, it's not common or recommended practice, but you are allowed to
> have an even number of zookeepers and it's most likely not related to
> the problem at hand and does NOT need to be addressed 
> first.https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup
>
> Abhit, I'm afraid the log snippet is not enough for me to help.
> Maybe someone else in the community with more experience can recognize
> the symptoms but in the meantime, if you haven't already done so, you
> may want to search for similar issues:
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22ZK%20expired%3B%20shut%20down%20all%20controller%22
>
> searching for text like "ZK expired; shut down all controller" or "No
> broker in ISR is alive for" or other interesting events form the log.
>
> Hope that helps,
> Michal
>
>
> On 26/04/17 21:40, Svante Karlsson wrote:
>
> You are not supposed to run an even number of zookeepers. Fix that first
>
> On Apr 26, 2017 20:59, "Abhit Kalsotra"  
>  wrote:
>
>
> Any pointers please
>
>
> Abhi
>
> On Wed, Apr 26, 2017 at 11:03 PM, Abhit Kalsotra  
> 
> wrote:
>
>
> Hi *
>
> My kafka setup
>
>
> **OS: Windows Machine*6 broker nodes , 4 on one Machine and 2 on other
> Machine*
>
> **ZK instance on (4 broker nodes Machine) and another ZK on (2 broker
> nodes machine)*
> ** 2 Topics with partition size = 50 and replication factor = 3*
>
> I am producing on an average of around 500 messages / sec with each
> message size close to 98 bytes...
>
> More or less the message rate stays constant throughout, but after
>
> running
>
> the setup for close to 2 weeks , my Kafka cluster broke and this
> happened
> twice in a month.  Not able to understand what's the issue, Kafka gurus
> please do share your inputs...
>
> the controlle.log file at the time of Kafka broken looks like
>
>
>
>
> *[2017-04-26 12:03:34,998] INFO [Controller 0]: Broker failure callback
> for 0,1,3,5,6 (kafka.controller.KafkaController)[2017-04-26
>
> 12:03:34,998]
>
> INFO [Controller 0]: Removed ArrayBuffer() from list of shutting down
> brokers. (kafka.controller.KafkaController)[2017-04-26 12:03:34,998]
>
> INFO
>
> [Partition state machine on Controller 0]: Invoking state change to
> OfflinePartition for partitions
> [__consumer_offsets,19],[mytopic,11],[__consumer_
>
> offsets,30],[mytopicOLD,18],[mytopic,13],[__consumer_
> offsets,47],[mytopicOLD,26],[__consumer_offsets,29],[
> mytopicOLD,0],[__consumer_offsets,41],[mytopic,44],[
> mytopicOLD,38],[mytopicOLD,2],[__consumer_offsets,17],[__
> consumer_offsets,10],[mytopic,20],[mytopic,23],[mytopic,30],
> [__consumer_offsets,14],[__consumer_offsets,40],[mytopic,
> 31],[mytopicOLD,43],[mytopicOLD,19],[mytopicOLD,35]
> 

Re: Does Kafka producer waits till previous batch returns responce before sending next one?

2017-04-30 Thread Hans Jespersen
There is a parameter that controls this behavior called max.in. 
flight.requests.per.connection

If you set max.in. flight.requests.per.connection = 1 then the producer waits 
until previous produce requests returns a response before sending the next one 
(or retrying). The retries parameter controller the number of times to attempt 
to produce a batch after a failure.

If flight.requests.per.connection = 1 and retries is get to the maximum then 
ordering is guaranteed.

If there is a timeout then the producer library would try again and again to 
produce the message and will not skip over to try and produce the next message.

If you set flight.requests.per.connection > 1 (I think the default is 5) then 
you can get a commit log with messages out of order wrt the original published 
order (because retries are done in parallel rather then in series)

-hans



> On Apr 30, 2017, at 3:13 AM, Petr Novak  wrote:
> 
> Hello,
> 
> Does Kafka producer waits till previous batch returns response before
> sending next one? Do I assume correctly that it does not when retries can
> change ordering?
> 
> 
> 
> Hence batches delay is introduced only by producer internal send loop time
> and linger?
> 
> 
> 
> If a timeout would be localized only to a single batch send request for some
> reason, does it affect the next batch (assuming this batch can go through
> successfully)?
> 
> 
> 
> Many thanks,
> 
> Petr
> 


Re: Controller connection failures

2017-04-30 Thread Michal Borowiecki

Hi Chuck,

Are you running zookeepers in the same containers as Kafka brokers?

Kafka brokers should be able to communicate with any of the zookeepers 
and, more importantly, zookeepers need to be able to talk to each-other.


Therefore, the zookeeper port should be exposed too (2181 by default), 
otherwise you're not going to have a valid cluster.


Docs on the controller are here:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals

Let me know if that helped.

Cheers,
Michał

On 27/04/17 19:08, Chuck Musser wrote:

I'm running into a problem with a 3 broker cluster where,
intermittently, one of the broker's controller begins to report that
it cannot connect to the other brokers and repeatedly logs the
failure.

Each broker is running in its own Docker container on separate
machines.  These Docker containers have exposed 9092, which I think is
sufficient for operation, but not sure.

The log message are these:
[2017-04-27 17:16:28,985] WARN [Controller-3-to-broker-2-send-thread], 
Controller 3's connection to broker 64174aa85d04:9092 (id: 2 rack: null) was 
unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to 64174aa85d04:9092 (id: 2 rack: null) failed
at 
kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
at 
kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185)
at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2017-04-27 17:16:28,986] WARN [Controller-3-to-broker-1-send-thread], 
Controller 3's connection to broker d4b8943ad4b5:9092 (id: 1 rack: null) was 
unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to d4b8943ad4b5:9092 (id: 1 rack: null) failed
at 
kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
at 
kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185)
at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

This is Kafka 2.12-0.10.2.0. I'm wondering:

1. How do we figure out the cause of the connect failures?
2. What's the controller anyway?
3. Are there some command-line diagnostic tools for inspecting the health of 
the system?

Thanks for any help,
Chuck



--
Signature
 Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com 


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK




This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com  and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Caching in Kafka Streams to ignore garbage message

2017-04-30 Thread Michal Borowiecki
I have something working on the same principle (except not using 
connect), that is, I put ids to filter on into a ktable and then (inner) 
join a kstream with that ktable.


I don't believe the value can be null though. In a changlog null value 
is interpreted as a delete so won't be put into a ktable.


The RocksDB store, for one, does this:

private void putInternal(byte[] rawKey,byte[] rawValue) {
if (rawValue ==null) {
try {
db.delete(wOptions, rawKey);

But any non-null value would do.
Please correct me if miss-understood.

Cheers,
Michał

On 27/04/17 22:44, Matthias J. Sax wrote:

I'd like to avoid repeated trips to the db, and caching a large amount of
data in memory.

Lookups to the DB would be hard to get done anyway. Ie, it would not
perform well, as all your calls would need to be synchronous...



Is it possible to send a message w/ the id as the partition key to a topic,
and then use the same id as the key, so the same node which will receive
the data for an id is the one which will process it?

That is what I did propose (maybe it was not clear). If you use Connect,
you can just import the ID into Kafka and leave the value empty (ie,
null). This reduced you cache data to a minimum. And the KStream-KTable
join work as you described it :)


-Matthias

On 4/27/17 2:37 PM, Ali Akhtar wrote:

I'd like to avoid repeated trips to the db, and caching a large amount of
data in memory.

Is it possible to send a message w/ the id as the partition key to a topic,
and then use the same id as the key, so the same node which will receive
the data for an id is the one which will process it?


On Fri, Apr 28, 2017 at 2:32 AM, Matthias J. Sax 
wrote:


The recommended solution would be to use Kafka Connect to load you DB
data into a Kafka topic.

With Kafka Streams you read your db-topic as KTable and do a (inne)
KStream-KTable join to lookup the IDs.


-Matthias

On 4/27/17 2:22 PM, Ali Akhtar wrote:

I have a Kafka topic which will receive a large amount of data.

This data has an 'id' field. I need to look up the id in an external db,
see if we are tracking that id, and if yes, we process that message, if
not, we ignore it.

99% of the data will be for ids which are not being tracked - 1% or so

will

be for ids which are tracked.

My concern is, that there'd be a lot of round trips to the db made just

to

check the id, and if it'd be better to cache the ids being tracked
somewhere, so other ids are ignored.

I was considering sending a message to another (or the same topic)

whenever

a new id is added to the track list, and that id should then get

processed

on the node which will process the messages.

Should I just cache all ids on all nodes (which may be a large amount),

or

is there a way to only cache the id on the same kafka streams node which
will receive data for that id?





--
Signature
 Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com 


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK




This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com  and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Issue in Kafka running for few days

2017-04-30 Thread Michal Borowiecki

Hi Jan,

Correct. As I said before it's not common or recommended practice to run 
an even number, and I wouldn't recommend it myself. I hope it didn't 
sound as if I did.


However, I don't see how this would cause the issue at hand unless at 
least 3 out of the 6 zookeepers died, but that could also have happened 
in a 5 node setup.


In either case, changing the number of zookeepers is not a prerequisite 
to progress debugging this issue further.


Cheers,

Michal


On 30/04/17 13:35, jan wrote:

I looked this up yesterday  when I read the grandparent, as my old
company ran two and I needed to know.
Your link is a bit ambiguous but it has a link to the zookeeper
Getting Started guide which says this:

"
For replicated mode, a minimum of three servers are required, and it
is strongly recommended that you have an odd number of servers. If you
only have two servers, then you are in a situation where if one of
them fails, there are not enough machines to form a majority quorum.
Two servers is inherently less stable than a single server, because
there are two single points of failure.
"



cheers

jan


On 30/04/2017, Michal Borowiecki  wrote:

Svante, I don't share your opinion.
Having an even number of zookeepers is not a problem in itself, it
simply means you don't get any better resilience than if you had one
fewer instance.
Yes, it's not common or recommended practice, but you are allowed to
have an even number of zookeepers and it's most likely not related to
the problem at hand and does NOT need to be addressed first.
https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup

Abhit, I'm afraid the log snippet is not enough for me to help.
Maybe someone else in the community with more experience can recognize
the symptoms but in the meantime, if you haven't already done so, you
may want to search for similar issues:

https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22ZK%20expired%3B%20shut%20down%20all%20controller%22

searching for text like "ZK expired; shut down all controller" or "No
broker in ISR is alive for" or other interesting events form the log.

Hope that helps,
Michal


On 26/04/17 21:40, Svante Karlsson wrote:

You are not supposed to run an even number of zookeepers. Fix that first

On Apr 26, 2017 20:59, "Abhit Kalsotra"  wrote:


Any pointers please


Abhi

On Wed, Apr 26, 2017 at 11:03 PM, Abhit Kalsotra 
wrote:


Hi *

My kafka setup


**OS: Windows Machine*6 broker nodes , 4 on one Machine and 2 on other
Machine*

**ZK instance on (4 broker nodes Machine) and another ZK on (2 broker
nodes machine)*
** 2 Topics with partition size = 50 and replication factor = 3*

I am producing on an average of around 500 messages / sec with each
message size close to 98 bytes...

More or less the message rate stays constant throughout, but after

running

the setup for close to 2 weeks , my Kafka cluster broke and this
happened
twice in a month.  Not able to understand what's the issue, Kafka gurus
please do share your inputs...

the controlle.log file at the time of Kafka broken looks like




*[2017-04-26 12:03:34,998] INFO [Controller 0]: Broker failure callback
for 0,1,3,5,6 (kafka.controller.KafkaController)[2017-04-26

12:03:34,998]

INFO [Controller 0]: Removed ArrayBuffer() from list of shutting down
brokers. (kafka.controller.KafkaController)[2017-04-26 12:03:34,998]

INFO

[Partition state machine on Controller 0]: Invoking state change to
OfflinePartition for partitions
[__consumer_offsets,19],[mytopic,11],[__consumer_

offsets,30],[mytopicOLD,18],[mytopic,13],[__consumer_
offsets,47],[mytopicOLD,26],[__consumer_offsets,29],[
mytopicOLD,0],[__consumer_offsets,41],[mytopic,44],[
mytopicOLD,38],[mytopicOLD,2],[__consumer_offsets,17],[__
consumer_offsets,10],[mytopic,20],[mytopic,23],[mytopic,30],
[__consumer_offsets,14],[__consumer_offsets,40],[mytopic,
31],[mytopicOLD,43],[mytopicOLD,19],[mytopicOLD,35]
,[__consumer_offsets,18],[mytopic,43],[__consumer_offsets,26],[__consumer_
offsets,0],[mytopic,32],[__consumer_offsets,24],[
mytopicOLD,3],[mytopic,2],[mytopic,3],[mytopicOLD,45],[
mytopic,35],[__consumer_offsets,20],[mytopic,1],[
mytopicOLD,33],[__consumer_offsets,5],[mytopicOLD,47],[__
consumer_offsets,22],[mytopicOLD,8],[mytopic,33],[
mytopic,36],[mytopicOLD,11],[mytopic,47],[mytopicOLD,20],[
mytopic,48],[__consumer_offsets,12],[mytopicOLD,32],[_
_consumer_offsets,8],[mytopicOLD,39],[mytopicOLD,27]
,[mytopicOLD,49],[mytopicOLD,42],[mytopic,21],[mytopicOLD,
31],[mytopic,29],[__consumer_offsets,23],[mytopicOLD,21],[_
_consumer_offsets,48],[__consumer_offsets,11],[mytopic,
18],[__consumer_offsets,13],[mytopic,45],[mytopic,5],[
mytopicOLD,25],[mytopic,6],[mytopicOLD,23],[mytopicOLD,37]
,[__consumer_offsets,6],[__consumer_offsets,49],[

Re: Kafka streams internal topic data retention

2017-04-30 Thread Sachin Mittal
http://kafka.apache.org/documentation.html#topic-config
Check this.

You can use  *--alter *
To override/add the default config.

retention.ms can be used to set topic level config.
For internal topics I suppose you need to provide a topic config map before
creating internal topics.
Example:

final Map topicConfigMap = new HashMap();
topicConfigMap.put("delete.retention.ms", "720");
topicConfigMap.put("min.cleanable.dirty.ratio", "0.01");
topicConfigMap.put("segment.bytes", "104857600");
topicConfigMap.put("segment.ms", "720");

And using Stores.create().enableLogging(topicConfigMap) you pass this map
for creating internal topic (atleast for change log topics this is the way).

Sachin

On Sun, Apr 30, 2017 at 5:28 PM, Shimi Kiviti  wrote:

> Hi
>
> Where can I find what is the Kafka streams internal topic data retention
> time and how to change it
>
> Thanks,
> Shimi
>


Re: Issue in Kafka running for few days

2017-04-30 Thread jan
I looked this up yesterday  when I read the grandparent, as my old
company ran two and I needed to know.
Your link is a bit ambiguous but it has a link to the zookeeper
Getting Started guide which says this:

"
For replicated mode, a minimum of three servers are required, and it
is strongly recommended that you have an odd number of servers. If you
only have two servers, then you are in a situation where if one of
them fails, there are not enough machines to form a majority quorum.
Two servers is inherently less stable than a single server, because
there are two single points of failure.
"



cheers

jan


On 30/04/2017, Michal Borowiecki  wrote:
> Svante, I don't share your opinion.
> Having an even number of zookeepers is not a problem in itself, it
> simply means you don't get any better resilience than if you had one
> fewer instance.
> Yes, it's not common or recommended practice, but you are allowed to
> have an even number of zookeepers and it's most likely not related to
> the problem at hand and does NOT need to be addressed first.
> https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup
>
> Abhit, I'm afraid the log snippet is not enough for me to help.
> Maybe someone else in the community with more experience can recognize
> the symptoms but in the meantime, if you haven't already done so, you
> may want to search for similar issues:
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22ZK%20expired%3B%20shut%20down%20all%20controller%22
>
> searching for text like "ZK expired; shut down all controller" or "No
> broker in ISR is alive for" or other interesting events form the log.
>
> Hope that helps,
> Michal
>
>
> On 26/04/17 21:40, Svante Karlsson wrote:
>> You are not supposed to run an even number of zookeepers. Fix that first
>>
>> On Apr 26, 2017 20:59, "Abhit Kalsotra"  wrote:
>>
>>> Any pointers please
>>>
>>>
>>> Abhi
>>>
>>> On Wed, Apr 26, 2017 at 11:03 PM, Abhit Kalsotra 
>>> wrote:
>>>
 Hi *

 My kafka setup


 **OS: Windows Machine*6 broker nodes , 4 on one Machine and 2 on other
 Machine*

 **ZK instance on (4 broker nodes Machine) and another ZK on (2 broker
 nodes machine)*
 ** 2 Topics with partition size = 50 and replication factor = 3*

 I am producing on an average of around 500 messages / sec with each
 message size close to 98 bytes...

 More or less the message rate stays constant throughout, but after
>>> running
 the setup for close to 2 weeks , my Kafka cluster broke and this
 happened
 twice in a month.  Not able to understand what's the issue, Kafka gurus
 please do share your inputs...

 the controlle.log file at the time of Kafka broken looks like




 *[2017-04-26 12:03:34,998] INFO [Controller 0]: Broker failure callback
 for 0,1,3,5,6 (kafka.controller.KafkaController)[2017-04-26
>>> 12:03:34,998]
 INFO [Controller 0]: Removed ArrayBuffer() from list of shutting down
 brokers. (kafka.controller.KafkaController)[2017-04-26 12:03:34,998]
>>> INFO
 [Partition state machine on Controller 0]: Invoking state change to
 OfflinePartition for partitions
 [__consumer_offsets,19],[mytopic,11],[__consumer_
>>> offsets,30],[mytopicOLD,18],[mytopic,13],[__consumer_
>>> offsets,47],[mytopicOLD,26],[__consumer_offsets,29],[
>>> mytopicOLD,0],[__consumer_offsets,41],[mytopic,44],[
>>> mytopicOLD,38],[mytopicOLD,2],[__consumer_offsets,17],[__
>>> consumer_offsets,10],[mytopic,20],[mytopic,23],[mytopic,30],
>>> [__consumer_offsets,14],[__consumer_offsets,40],[mytopic,
>>> 31],[mytopicOLD,43],[mytopicOLD,19],[mytopicOLD,35]
>>> ,[__consumer_offsets,18],[mytopic,43],[__consumer_offsets,26],[__consumer_
>>> offsets,0],[mytopic,32],[__consumer_offsets,24],[
>>> mytopicOLD,3],[mytopic,2],[mytopic,3],[mytopicOLD,45],[
>>> mytopic,35],[__consumer_offsets,20],[mytopic,1],[
>>> mytopicOLD,33],[__consumer_offsets,5],[mytopicOLD,47],[__
>>> consumer_offsets,22],[mytopicOLD,8],[mytopic,33],[
>>> mytopic,36],[mytopicOLD,11],[mytopic,47],[mytopicOLD,20],[
>>> mytopic,48],[__consumer_offsets,12],[mytopicOLD,32],[_
>>> _consumer_offsets,8],[mytopicOLD,39],[mytopicOLD,27]
>>> ,[mytopicOLD,49],[mytopicOLD,42],[mytopic,21],[mytopicOLD,
>>> 31],[mytopic,29],[__consumer_offsets,23],[mytopicOLD,21],[_
>>> _consumer_offsets,48],[__consumer_offsets,11],[mytopic,
>>> 18],[__consumer_offsets,13],[mytopic,45],[mytopic,5],[
>>> mytopicOLD,25],[mytopic,6],[mytopicOLD,23],[mytopicOLD,37]
>>> ,[__consumer_offsets,6],[__consumer_offsets,49],[
>>> mytopicOLD,13],[__consumer_offsets,28],[__consumer_offsets,4],[__consumer_
>>> offsets,37],[mytopic,12],[mytopicOLD,30],[__consumer_
>>> offsets,31],[__consumer_offsets,44],[mytopicOLD,15],[
>>> 

Kafka streams internal topic data retention

2017-04-30 Thread Shimi Kiviti
Hi

Where can I find what is the Kafka streams internal topic data retention
time and how to change it

Thanks,
Shimi


Does Kafka producer waits till previous batch returns responce before sending next one?

2017-04-30 Thread Petr Novak
Hello,

Does Kafka producer waits till previous batch returns response before
sending next one? Do I assume correctly that it does not when retries can
change ordering?

 

Hence batches delay is introduced only by producer internal send loop time
and linger?

 

If a timeout would be localized only to a single batch send request for some
reason, does it affect the next batch (assuming this batch can go through
successfully)?

 

Many thanks,

Petr



Re: Issue in Kafka running for few days

2017-04-30 Thread Michal Borowiecki

Svante, I don't share your opinion.
Having an even number of zookeepers is not a problem in itself, it 
simply means you don't get any better resilience than if you had one 
fewer instance.
Yes, it's not common or recommended practice, but you are allowed to 
have an even number of zookeepers and it's most likely not related to 
the problem at hand and does NOT need to be addressed first.

https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup

Abhit, I'm afraid the log snippet is not enough for me to help.
Maybe someone else in the community with more experience can recognize 
the symptoms but in the meantime, if you haven't already done so, you 
may want to search for similar issues:


https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22ZK%20expired%3B%20shut%20down%20all%20controller%22

searching for text like "ZK expired; shut down all controller" or "No 
broker in ISR is alive for" or other interesting events form the log.


Hope that helps,
Michal


On 26/04/17 21:40, Svante Karlsson wrote:

You are not supposed to run an even number of zookeepers. Fix that first

On Apr 26, 2017 20:59, "Abhit Kalsotra"  wrote:


Any pointers please


Abhi

On Wed, Apr 26, 2017 at 11:03 PM, Abhit Kalsotra 
wrote:


Hi *

My kafka setup


**OS: Windows Machine*6 broker nodes , 4 on one Machine and 2 on other
Machine*

**ZK instance on (4 broker nodes Machine) and another ZK on (2 broker
nodes machine)*
** 2 Topics with partition size = 50 and replication factor = 3*

I am producing on an average of around 500 messages / sec with each
message size close to 98 bytes...

More or less the message rate stays constant throughout, but after

running

the setup for close to 2 weeks , my Kafka cluster broke and this happened
twice in a month.  Not able to understand what's the issue, Kafka gurus
please do share your inputs...

the controlle.log file at the time of Kafka broken looks like




*[2017-04-26 12:03:34,998] INFO [Controller 0]: Broker failure callback
for 0,1,3,5,6 (kafka.controller.KafkaController)[2017-04-26

12:03:34,998]

INFO [Controller 0]: Removed ArrayBuffer() from list of shutting down
brokers. (kafka.controller.KafkaController)[2017-04-26 12:03:34,998]

INFO

[Partition state machine on Controller 0]: Invoking state change to
OfflinePartition for partitions
[__consumer_offsets,19],[mytopic,11],[__consumer_

offsets,30],[mytopicOLD,18],[mytopic,13],[__consumer_
offsets,47],[mytopicOLD,26],[__consumer_offsets,29],[
mytopicOLD,0],[__consumer_offsets,41],[mytopic,44],[
mytopicOLD,38],[mytopicOLD,2],[__consumer_offsets,17],[__
consumer_offsets,10],[mytopic,20],[mytopic,23],[mytopic,30],
[__consumer_offsets,14],[__consumer_offsets,40],[mytopic,
31],[mytopicOLD,43],[mytopicOLD,19],[mytopicOLD,35]
,[__consumer_offsets,18],[mytopic,43],[__consumer_offsets,26],[__consumer_
offsets,0],[mytopic,32],[__consumer_offsets,24],[
mytopicOLD,3],[mytopic,2],[mytopic,3],[mytopicOLD,45],[
mytopic,35],[__consumer_offsets,20],[mytopic,1],[
mytopicOLD,33],[__consumer_offsets,5],[mytopicOLD,47],[__
consumer_offsets,22],[mytopicOLD,8],[mytopic,33],[
mytopic,36],[mytopicOLD,11],[mytopic,47],[mytopicOLD,20],[
mytopic,48],[__consumer_offsets,12],[mytopicOLD,32],[_
_consumer_offsets,8],[mytopicOLD,39],[mytopicOLD,27]
,[mytopicOLD,49],[mytopicOLD,42],[mytopic,21],[mytopicOLD,
31],[mytopic,29],[__consumer_offsets,23],[mytopicOLD,21],[_
_consumer_offsets,48],[__consumer_offsets,11],[mytopic,
18],[__consumer_offsets,13],[mytopic,45],[mytopic,5],[
mytopicOLD,25],[mytopic,6],[mytopicOLD,23],[mytopicOLD,37]
,[__consumer_offsets,6],[__consumer_offsets,49],[
mytopicOLD,13],[__consumer_offsets,28],[__consumer_offsets,4],[__consumer_
offsets,37],[mytopic,12],[mytopicOLD,30],[__consumer_
offsets,31],[__consumer_offsets,44],[mytopicOLD,15],[
mytopicOLD,29],[mytopic,37],[mytopic,38],[__consumer_
offsets,42],[mytopic,27],[mytopic,26],[mytopic,15],[__
consumer_offsets,34],[mytopic,42],[__consumer_offsets,46],[
mytopic,14],[mytopicOLD,12],[mytopicOLD,1],[mytopic,7],[__
consumer_offsets,25],[mytopicOLD,24],[mytopicOLD,44]
,[mytopicOLD,14],[__consumer_offsets,32],[mytopic,0],[__
consumer_offsets,43],[mytopic,39],[mytopicOLD,5],[mytopic,9]
,[mytopic,24],[__consumer_offsets,36],[mytopic,25],[
mytopicOLD,36],[mytopic,19],[__consumer_offsets,35],[__
consumer_offsets,7],[mytopic,8],[__consumer_offsets,38],[
mytopicOLD,48],[mytopicOLD,9],[__consumer_offsets,1],[
mytopicOLD,6],[mytopic,41],[mytopicOLD,41],[mytopicOLD,7],
[mytopic,17],[mytopicOLD,17],[mytopic,49],[__consumer_
offsets,16],[__consumer_offsets,2]

(kafka.controller.PartitionStateMachine)[2017-04-26 12:03:35,045] INFO
[SessionExpirationListener on 1], ZK expired; shut down all controller
components and try to re-elect
(kafka.controller.KafkaController$SessionExpirationListener)[2017-04-26
12:03:35,045] DEBUG [Controller 1]: Controller resigning, broker id 1