[jira] [Commented] (KAFKA-6568) LogCleanerManager.doneDeleting() should check the partition state before deleting the in progress partition

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366569#comment-16366569
 ] 

ASF GitHub Bot commented on KAFKA-6568:
---

becketqin opened a new pull request #4580: KAFKA-6568; The log cleaner should 
check the partition state before r…
URL: https://github.com/apache/kafka/pull/4580
 
 
   …emoving it from the inProgress map.
   
   The log cleaner should not naively remove the partition from in progress map 
without checking the partition state. This may cause the other thread calling 
`LogCleanerManager.abortAndPauseCleaning()` to hang in definitely.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> LogCleanerManager.doneDeleting() should check the partition state before 
> deleting the in progress partition
> ---
>
> Key: KAFKA-6568
> URL: https://issues.apache.org/jira/browse/KAFKA-6568
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 1.1.0
>
>
> {{LogCleanerManager.doneDeleting()}} removes the partition from the 
> {{inProgress}} map without checking if the partition is paused or not. This 
> will cause the paused partition state to be lost, and may also cause another 
> thread calling {{LogCleanerManager.abortAndPauseCleaning()}} to block 
> indefinitely waiting on the partition state to become paused.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4277) creating ephemeral node already exist

2018-02-15 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366546#comment-16366546
 ] 

Jun Rao commented on KAFKA-4277:


[~nico.meyer], do you know if the conflicting ephemeral node was there 
temporarily or permanently? From the ZK server commit log, was there any 
indication that the ephemeral node was removed after session expiration, and if 
so, when?

> creating ephemeral node already exist
> -
>
> Key: KAFKA-4277
> URL: https://issues.apache.org/jira/browse/KAFKA-4277
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Feixiang Yan
>Priority: Major
>
> I use zookeeper 3.4.6.
> Zookeeper session time out, zkClient try reconnect failed. Then re-establish 
> the session and re-registering broker info in ZK, throws NODEEXISTS Exception.
>  I think it is because the ephemeral node which created by old session has 
> not removed. 
> I read the 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>  of 0.8.1, createEphemeralPathExpectConflictHandleZKBug try create node in a 
> while loop until create success. This can solve the issue. But in 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.10.0.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>   0.10.1 the function removed.
> {noformat}
> [2016-10-07 19:00:32,562] INFO Socket connection established to 
> 10.191.155.238/10.191.155.238:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,564] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1576b11f9b201bd has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,564] INFO Initiating client connection, 
> connectString=10.191.155.237:21819,10.191.155.238:21819,10.191.155.239:21819/cluster2
>  sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@ae71be2 
> (org.apache.zookeeper.ZooKeeper)
> [2016-10-07 19:00:32,566] INFO Opening socket connection to server 
> 10.191.155.237/10.191.155.237:21819. Will not attempt to authenticate using 
> SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO Socket connection established to 
> 10.191.155.237/10.191.155.237:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO Session establishment complete on server 
> 10.191.155.237/10.191.155.237:21819, sessionid = 0x1579ecd39c20006, 
> negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,608] INFO re-registering broker info in ZK for broker 3 
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2016-10-07 19:00:32,610] INFO Creating /brokers/ids/3 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,611] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,614] ERROR Error handling event ZkEvent[New session 
> event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@324f1bc] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.RuntimeException: A broker is already registered on the path 
> /brokers/ids/3. This probably indicates that you either have configured a 
> brokerid that is already in use, or else you have shutdown this broker and 
> restarted it faster than the zookeeper timeout so it appears to be 
> re-registering.
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:305)
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:291)
> at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
> at 
> kafka.server.KafkaHealthcheck$SessionExpireListener.handleNewSession(KafkaHealthcheck.scala:104)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration

2018-02-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366493#comment-16366493
 ] 

Matthias J. Sax commented on KAFKA-6555:


Your observation is correct. Using StandbyTasks is also tricky to get right and 
it requires to configure standby tasks in the first place... I guess, this is 
the reason why Guozhang suggested to have a KIP discussion.

Also, if we see both approaches as complementary, the question is what should 
the "query order be" first try the StandbyTask? And if no StandbyTask available 
fall back to the restoring task?

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable resulting in the partition downtime. We 
> can make the state store partition queryable for the data already present in 
> the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores

2018-02-15 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366286#comment-16366286
 ] 

Guozhang Wang edited comment on KAFKA-5285 at 2/16/18 12:56 AM:


[~davispw] [~xvrl] Thinking about this a bit more, I think we can provide a 
better heuristic to define the upper and lower bound for window store 
{{fetch(keyFrom, keyTo, timeFrom, timeTo)}} indeed, which is the following:

0. First of all, documentation wise we require users to make sure the 
serialized bytes of {{keyFrom}} is smaller than serialized bytes of {{keyTo}} 
lexicograpically. And inside this call, we first check the bytes for this 
condition. 

1. For lower range, we just use the {{keyFrom bytes}}, instead of {{keyFrom + 
minSuffix (s)}}. Since we know there would be no data in between these two 
keys at all, we can save some overhead of bytes alloc and puts.

2. For upper range, we just consider the following two combinations:

 a). keyFrom + minSuffix
 b). keyTo + maxSuffix

where minSuffix = timeFrom + seq0, maxSuffix = timeTo + seqMAX. And then we 
just pick the largest among these two. In all we would replace this with the 
current {{OrderedBytes}} implementations. 

Similarly, for session store {{fetch}}, we do the same thing, except for single 
key fetch we handle it more in a more optimized way:

0. For single key fetch, lower range = key + 0 + timeEndEarliest, upper range = 
key + timeStartLatest + MAX (this is because timeTo <= timeFrom, so they can be 
the bound of each other).

1. For range key fetch, lower range = keyFrom.

2. For range key fetch, upper range = MAX (keyFrom + 0 + timeEndEarliest, keyTo 
+ timeStartLatest + MAX).

WDYT?


was (Author: guozhang):
[~davispw] [~xvrl] Thinking about this a bit more, I think we can provide a 
better heuristic to define the upper and lower bound for window store 
{{fetch(keyFrom, keyTo, timeFrom, timeTo)}} indeed, which is the following:

0. First of all, documentation wise we require users to make sure the 
serialized bytes of {{keyFrom}} is smaller than serialized bytes of {{keyTo}} 
lexicograpically. And inside this call, we first check the bytes for this 
condition. 

1. For lower range, we just use the {{keyFrom bytes}}, instead of {{keyFrom + 
minSuffix (s)}}. Since we know there would be no data in between these two 
keys at all, we can save some overhead of bytes alloc and puts.

2. For upper range, we just consider the following two combinations:

 a). keyFrom + minSuffix
 b). keyTo + maxSuffix

where minSuffix = timeFrom + seq0, maxSuffix = timeTo + seqMAX. And then we 
just pick the largest among these two. In all we would replace this with the 
current {{OrderedBytes}} implementations. 

Similarly, for session store {{fetch}}, we do the same thing, except for single 
key fetch we handle it more in a more optimized way:

0. For single key fetch, lower range = key + timeFrom + timFrom, upper range = 
key + timeTo + timeTo (this is because timeTo <= timeFrom, so they can be the 
bound of each other).

1. For range key fetch, lower range = keyFrom.

2. For range key fetch, upper range = MAX (keyFrom + timeFrom + timeFrom, keyTo 
+ timeTo + timeTo).

WDYT?

> Optimize upper / lower byte range for key range scan on windowed stores
> ---
>
> Key: KAFKA-5285
> URL: https://issues.apache.org/jira/browse/KAFKA-5285
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: performance
>
> The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} 
> {{upperRange}} and {{lowerRange}} does not make any assumptions with respect 
> to the other key bound (e.g. the upper byte bound does not depends on lower 
> key bound).
> It should be possible to optimize the byte range somewhat further using the 
> information provided by the lower bound.
> More specifically, by incorporating that information, we should be able to 
> eliminate the corresponding {{upperRangeFixedSize}} and 
> {{lowerRangeFixedSize}}, since the result should be the same if we implement 
> that optimization.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5802) ScramServerCallbackHandler#handle should check username not being null before calling credentialCache.get()

2018-02-15 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302643#comment-16302643
 ] 

Ted Yu edited comment on KAFKA-5802 at 2/16/18 12:52 AM:
-

+1 from me.


was (Author: yuzhih...@gmail.com):
+1

> ScramServerCallbackHandler#handle should check username not being null before 
> calling credentialCache.get()
> ---
>
> Key: KAFKA-5802
> URL: https://issues.apache.org/jira/browse/KAFKA-5802
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> String username = null;
> for (Callback callback : callbacks) {
> if (callback instanceof NameCallback)
> username = ((NameCallback) callback).getDefaultName();
> else if (callback instanceof ScramCredentialCallback)
> ((ScramCredentialCallback) 
> callback).scramCredential(credentialCache.get(username));
> {code}
> Since ConcurrentHashMap, used by CredentialCache, doesn't allow null keys, we 
> should check that username is not null before calling credentialCache.get()



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6555) Making state store queryable during restoration

2018-02-15 Thread Ashish Surana (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366471#comment-16366471
 ] 

Ashish Surana edited comment on KAFKA-6555 at 2/16/18 12:45 AM:


Sure. Thanks [~mjsax].

This approach: Basically here we are allowing to query state-store in RESTORING 
state. I understand it's restoring and can have old data, but the same can't be 
guaranteed for replica's either. Finally we will have to open up replica's also 
for queries, but I believe this is complementary even with that.

The approach that you are suggesting (KAFKA-6144) is to allow the queries from 
one of the replica's when the main-task is restoring. Few problems I can think 
of with this approach:
 * when there is no replica: the store will be down till main-task reaches 
running state
 * when there are 2 or more replica's: which replica will serve the requests 
(all might be at different states)

Or we can make some of these decisions configurable and leave to the 
application developers to define for their use-cases.

 


was (Author: asurana):
Sure. Thanks [~mjsax].

This approach: Basically here we are allowing to query state-store in RESTORING 
state. I understand it's restoring and can have old data, but the same can't be 
guaranteed for replica's either. Finally we will have to open up replica's also 
for queries, but I believe this is complementary even with that.

The approach that you are suggesting (KAFKA-6144) is to allow the queries from 
one of the replica's when the main-task is restoring. Few problems I can think 
of with this approach:
 * when there is no replica: the store will be down till main-task reaches 
running state
 * when there are 2 or more replica's: which replica will server the requests 
(all might be at different states)

Or we can make some of these decisions configurable and leave to the 
application developers to define for their use-cases.

 

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable resulting in the partition downtime. We 
> can make the state store partition queryable for the data already present in 
> the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6555) Making state store queryable during restoration

2018-02-15 Thread Ashish Surana (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366471#comment-16366471
 ] 

Ashish Surana edited comment on KAFKA-6555 at 2/16/18 12:44 AM:


Sure. Thanks [~mjsax].

This approach: Basically here we are allowing to query state-store in RESTORING 
state. I understand it's restoring and can have old data, but the same can't be 
guaranteed for replica's either. Finally we will have to open up replica's also 
for queries, but I believe this is complementary even with that.

The approach that you are suggesting (KAFKA-6144) is to allow the queries from 
one of the replica's when the main-task is restoring. Few problems I can think 
of with this approach:
 * when there is no replica: the store will be down till main-task reaches 
running state
 * when there are 2 or more replica's: which replica will server the requests 
(all might be at different states)

Or we can make some of these decisions configurable and leave to the 
application developers to define for their use-cases.

 


was (Author: asurana):
Sure. Thanks Matthias.

This approach: Basically here we are allowing to query state-store in RESTORING 
state. I understand it's restoring and can have old data, but the same can't be 
guaranteed for replica's either. Finally we will have to open up replica's also 
for queries, but I believe this is complementary even with that.

The approach that you are suggesting (KAFKA-6144) is to allow the queries from 
one of the replica's when the main-task is restoring. Few problems I can think 
of with this approach:
 * when there is no replica: the store will be down till main-task reaches 
running state
 * when there are 2 or more replica's: which replica will server the requests 
(all might be at different states)

Or we can make some of these decisions configurable and leave to the 
application developers to define for their use-cases.

 

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable resulting in the partition downtime. We 
> can make the state store partition queryable for the data already present in 
> the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6555) Making state store queryable during restoration

2018-02-15 Thread Ashish Surana (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366471#comment-16366471
 ] 

Ashish Surana edited comment on KAFKA-6555 at 2/16/18 12:44 AM:


Sure. Thanks Matthias.

This approach: Basically here we are allowing to query state-store in RESTORING 
state. I understand it's restoring and can have old data, but the same can't be 
guaranteed for replica's either. Finally we will have to open up replica's also 
for queries, but I believe this is complementary even with that.

The approach that you are suggesting (KAFKA-6144) is to allow the queries from 
one of the replica's when the main-task is restoring. Few problems I can think 
of with this approach:
 * when there is no replica: the store will be down till main-task reaches 
running state
 * when there are 2 or more replica's: which replica will server the requests 
(all might be at different states)

Or we can make some of these decisions configurable and leave to the 
application developers to define for their use-cases.

 


was (Author: asurana):
Sure.

This approach: Basically here we are allowing to query state-store in RESTORING 
state. I understand it's restoring and can have old data, but the same can't be 
guaranteed for replica's either. Finally we will have to open up replica's also 
for queries, but I believe this is complementary even with that.

The approach that you are suggesting (KAFKA-6144) is to allow the queries from 
one of the replica's when the main-task is restoring. Few problems I can think 
of with this approach:
 * when there is no replica: the store will be down till main-task reaches 
running state
 * when there are 2 or more replica's: which replica will server the requests 
(all might be at different states)

Or we can make some of these decisions configurable and leave to the 
application developers to define for their use-cases.

 

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable resulting in the partition downtime. We 
> can make the state store partition queryable for the data already present in 
> the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration

2018-02-15 Thread Ashish Surana (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366471#comment-16366471
 ] 

Ashish Surana commented on KAFKA-6555:
--

Sure.

This approach: Basically here we are allowing to query state-store in RESTORING 
state. I understand it's restoring and can have old data, but the same can't be 
guaranteed for replica's either. Finally we will have to open up replica's also 
for queries, but I believe this is complementary even with that.

The approach that you are suggesting (KAFKA-6144) is to allow the queries from 
one of the replica's when the main-task is restoring. Few problems I can think 
of with this approach:
 * when there is no replica: the store will be down till main-task reaches 
running state
 * when there are 2 or more replica's: which replica will server the requests 
(all might be at different states)

Or we can make some of these decisions configurable and leave to the 
application developers to define for their use-cases.

 

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable resulting in the partition downtime. We 
> can make the state store partition queryable for the data already present in 
> the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration

2018-02-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366462#comment-16366462
 ] 

Matthias J. Sax commented on KAFKA-6555:


[~asurana] I added you to the list of contributors and assigned the Jira to 
you. You can now assign Jiras to yourself, too.

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable resulting in the partition downtime. We 
> can make the state store partition queryable for the data already present in 
> the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6555) Making state store queryable during restoration

2018-02-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6555:
--

Assignee: Ashish Surana

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable resulting in the partition downtime. We 
> can make the state store partition queryable for the data already present in 
> the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration

2018-02-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366459#comment-16366459
 ] 

Matthias J. Sax commented on KAFKA-6555:


I had a quick look at the PR and I am not sure, if we want to address the issue 
like this. You allow to query restoring tasks. This seems to be questionable 
from a semantical point of view. I guess, I know understand the difference to 
KAFKA-6144 – there, the idea is to allow to query StandbyTask is case the main 
task in restoring atm. The assumption is, that a StandbyTask hold data that is 
almost up-to-date while in you PR, you would allow to query very old data. 
Furthermore, as long as a StandbyTask would be queried, the data would not 
change – because the main task is restoring, the StandbyTask just sever the 
latest state. If we allow to query a task that is restoring, you would see 
different (old) data until processing resumes. Thus, it's quite different.

Is this a correct summary?

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable resulting in the partition downtime. We 
> can make the state store partition queryable for the data already present in 
> the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5660) Don't throw TopologyBuilderException during runtime

2018-02-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5660:
---
Description: 
{{TopologyBuilderException}} is a pre-runtime exception that should only be 
thrown before {{KafkaStreams#start()}} is called.

However, we do throw {{TopologyBuilderException}} within
 - `SourceNodeFactory#getTopics`
 - `ProcessorContextImpl#getStateStore`
 - `StreamPartitionAssignor#prepareTopic `

(and maybe somewhere else: we should double check if there are other places in 
the code like those).

We should replace those exception with either {{StreamsException}} or with a 
new exception type.

  was:
{{TopologyBuilderException}} is a pre-runtime exception that should only be 
thrown {{KafkaStreams#start()}} is called.

However, we do throw {{TopologyBuilderException}} within

- `SourceNodeFactory#getTopics`
- `ProcessorContextImpl#getStateStore`
- `StreamPartitionAssignor#prepareTopic `

(and maybe somewhere else: we should double check if there are other places in 
the code like those).

We should replace those exception with either {{StreamsException}} or with a 
new exception type.


> Don't throw TopologyBuilderException during runtime
> ---
>
> Key: KAFKA-5660
> URL: https://issues.apache.org/jira/browse/KAFKA-5660
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Matthias J. Sax
>Assignee: Nick Afshartous
>Priority: Major
>
> {{TopologyBuilderException}} is a pre-runtime exception that should only be 
> thrown before {{KafkaStreams#start()}} is called.
> However, we do throw {{TopologyBuilderException}} within
>  - `SourceNodeFactory#getTopics`
>  - `ProcessorContextImpl#getStateStore`
>  - `StreamPartitionAssignor#prepareTopic `
> (and maybe somewhere else: we should double check if there are other places 
> in the code like those).
> We should replace those exception with either {{StreamsException}} or with a 
> new exception type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5660) Don't throw TopologyBuilderException during runtime

2018-02-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366448#comment-16366448
 ] 

Matthias J. Sax commented on KAFKA-5660:


Thanks, [~nafshartous]. I updated the description.

> Don't throw TopologyBuilderException during runtime
> ---
>
> Key: KAFKA-5660
> URL: https://issues.apache.org/jira/browse/KAFKA-5660
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Matthias J. Sax
>Assignee: Nick Afshartous
>Priority: Major
>
> {{TopologyBuilderException}} is a pre-runtime exception that should only be 
> thrown before {{KafkaStreams#start()}} is called.
> However, we do throw {{TopologyBuilderException}} within
>  - `SourceNodeFactory#getTopics`
>  - `ProcessorContextImpl#getStateStore`
>  - `StreamPartitionAssignor#prepareTopic `
> (and maybe somewhere else: we should double check if there are other places 
> in the code like those).
> We should replace those exception with either {{StreamsException}} or with a 
> new exception type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6555) Making state store queryable during restoration

2018-02-15 Thread Ashish Surana (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361515#comment-16361515
 ] 

Ashish Surana edited comment on KAFKA-6555 at 2/15/18 11:42 PM:


Hi Matthias,

  Thanks for pointing to KAFKA-6144. As I go through it, it looks very similar 
ticket but with minor difference:
 * I am suggesting to allow stale reads only from PARTITION_ASSIGNED (not from 
PARTITION_REVOKED) primarily as it's going to be the one in RUNNING state, and 
this is the minimum we need to do keep serving request for this partition. We 
still have only one instance doing read/write, and will have pure standby 
replicas. This approach is good if we continue with current design of one 
active write & read instance.

I have made the changes, and can share it in few days.


was (Author: asurana):
Hi Matthias,

  Thanks for pointing to KAFKA-6144. As I go through it, it looks very similar 
ticket but with minor difference:
 * I am suggesting to allow stale reads only from PARTITION_ASSIGNED (not from 
PARTITION_REVOKED) primarily as it's going to be the one in RUNNING state, and 
this is the minimum we need to do keep serving request for this partition. We 
still have one instance doing write or read and still want to have pure standby 
replicas. This approach is good if we continue with current design of one 
active write/read instance.

I have made the changes, and can share it in few days.

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable resulting in the partition downtime. We 
> can make the state store partition queryable for the data already present in 
> the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6555) Making state store queryable during restoration

2018-02-15 Thread Ashish Surana (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Surana updated KAFKA-6555:
-
Description: 
State store in Kafka streams are currently only queryable when StreamTask is in 
RUNNING state. The idea is to make it queryable even in the RESTORATION 
(PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
making the data inaccessible during this time could be downtime not suitable 
for many applications.

When the active partition goes down then one of the following occurs:
 # One of the standby replica partition gets promoted to active: Replica task 
has to restore the remaining state from the changelog topic before it can 
become RUNNING. The time taken for this depends on how much the replica is 
lagging behind. During this restoration time the state store for that partition 
is currently not queryable resulting in the partition downtime. We can make the 
state store partition queryable for the data already present in the state store.
 # When there is no replica or standby task, then active task will be started 
in one of the existing node. That node has to build the entire state from the 
changelog topic which can take lot of time depending on how big is the 
changelog topic, and keeping state store not queryable during this time is the 
downtime for the parition.

It's very important improvement as it could simply improve the availability of 
microservices developed using kafka streams.

I am working on a patch for this change. Any feedback or comments are welcome.

 

 

  was:
State store in Kafka streams are currently only queryable when StreamTask is in 
RUNNING state. The idea is to make it queryable even in the RESTORATION 
(PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
making the data inaccessible during this time could be downtime not suitable 
for many applications.

When the active partition goes down then one of the following occurs:
 # One of the standby replica partition gets promoted to active: Replica task 
has to restore the remaining state from the changelog topic before it can 
become RUNNING. The time taken for this depends on how much the replica is 
lagging behind. During this restoration time the state store for that partition 
is currently not queryable giving making the partition down. We can make the 
state store partition queryable for the data already present in the state store.
 # When there is no replica or standby task, then active task will be started 
in one of the existing node. That node has to build the entire state from the 
changelog topic which can take lot of time depending on how big is the 
changelog topic, and keeping state store not queryable during this time is the 
downtime for the parition.

It's very important improvement as it could simply improve the availability of 
microservices developed using kafka streams.

I am working on a patch for this change. Any feedback or comments are welcome.

 

 


> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable resulting in the partition downtime. We 
> can make the state store partition queryable for the data already present in 
> the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366412#comment-16366412
 ] 

ASF GitHub Bot commented on KAFKA-6541:
---

hachikuji closed pull request #4548: KAFKA-6541: Fixed a StackOverflow bug in 
kafka-coordinator-heartbeat-thread
URL: https://github.com/apache/kafka/pull/4548
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 0747e8db146..a0d4ed8f57c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -57,6 +57,7 @@
 // flag and the request completion queue below).
 private final Logger log;
 private final KafkaClient client;
+private boolean disconnecting = false;
 private final UnsentRequests unsent = new UnsentRequests();
 private final Metadata metadata;
 private final Time time;
@@ -387,9 +388,13 @@ private void checkDisconnects(long now) {
 }
 
 public void disconnect(Node node) {
-failUnsentRequests(node, DisconnectException.INSTANCE);
-
 synchronized (this) {
+if(disconnecting){
+return;
+}
+disconnecting=true;
+failUnsentRequests(node, DisconnectException.INSTANCE);
+
 client.disconnect(node.idString());
 }
 
@@ -483,6 +488,7 @@ public boolean connectionFailed(Node node) {
 public void tryConnect(Node node) {
 synchronized (this) {
 client.ready(node, time.milliseconds());
+disconnecting = false;
 }
 }
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread
> --
>
> Key: KAFKA-6541
> URL: https://issues.apache.org/jira/browse/KAFKA-6541
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Anh Le
>Priority: Major
>
> There's something wrong with our client library when sending heart beats. 
> This bug seems to be identical to this one: 
> [http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E]
>  
> Here's the log:
>  
> {quote}2018-02-08 13:55:01,102 ERROR 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread
>  Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | 
> default-group':
> java.lang.StackOverflowError: null
>  at java.lang.StringBuilder.append(StringBuilder.java:136)
>  at 
> org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302)
>  at 
> org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271)
>  at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233)
>  at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173)
>  at 
> ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)
>  at 
> ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)
>  at 
> ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)
>  at 
> ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)
>  at 
> ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)
>  at 
> ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)
>  at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)
>  at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)
>  at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)
>  at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398)
>  at ch.qos.logback.classic.Logger.info(Logger.java:583)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> 

[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-02-15 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366411#comment-16366411
 ] 

Gunnar Morling commented on KAFKA-6566:
---

+1 for more exploration whether calling stop() was the original intention. 
FWIW, several comments in Debezium indicate that this at least was the 
asumption on that side when the code was written (of course that assumption may 
have been right or wrong).

Note that calling stop() would be the only chance for the task to clean up its 
resources after an exception on the KC side of the polling loop (while we could 
have our entire polling logic in a try/catch block, there's no chance to react 
to exceptions in WorkerSourceTask).

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread

2018-02-15 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6541.

Resolution: Duplicate

Closing as duplicate for now. Please reopen if you think the issue has not been 
fixed.

> StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread
> --
>
> Key: KAFKA-6541
> URL: https://issues.apache.org/jira/browse/KAFKA-6541
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Anh Le
>Priority: Major
>
> There's something wrong with our client library when sending heart beats. 
> This bug seems to be identical to this one: 
> [http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E]
>  
> Here's the log:
>  
> {quote}2018-02-08 13:55:01,102 ERROR 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread
>  Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | 
> default-group':
> java.lang.StackOverflowError: null
>  at java.lang.StringBuilder.append(StringBuilder.java:136)
>  at 
> org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302)
>  at 
> org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271)
>  at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233)
>  at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173)
>  at 
> ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)
>  at 
> ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)
>  at 
> ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)
>  at 
> ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)
>  at 
> ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)
>  at 
> ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)
>  at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)
>  at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)
>  at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)
>  at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398)
>  at ch.qos.logback.classic.Logger.info(Logger.java:583)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
>  at 
> 

[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-02-15 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366404#comment-16366404
 ] 

Ted Yu commented on KAFKA-6566:
---

Thanks for the comment, Randall.

I did a quick search but haven't found whether not calling {{stop()}} was 
intentional.

Will spend more time investigating related aspects.

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2018-02-15 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366399#comment-16366399
 ] 

Guozhang Wang commented on KAFKA-4969:
--

Yup, re-opening to keep track.

> State-store workload-aware StreamsPartitionAssignor
> ---
>
> Key: KAFKA-4969
> URL: https://issues.apache.org/jira/browse/KAFKA-4969
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.1.0, 1.0.1
>
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different 
> "types" of tasks. For example, task can be stateless of have one or multiple 
> stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless 
> and 2 stateful tasks and the app is running with 2 instances. To share the 
> "store load" it would be good to place one stateless and one stateful task 
> per instance. Right now, there is no guarantee about this, and it can happen, 
> that one instance processed both stateless tasks while the other processes 
> both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" 
> including a cost model for task placement. We should consider the following 
> parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
> This improvement should be backed by a design document in the project wiki 
> (no KIP required though) as it's a fairly complex change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2018-02-15 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-4969:
--

> State-store workload-aware StreamsPartitionAssignor
> ---
>
> Key: KAFKA-4969
> URL: https://issues.apache.org/jira/browse/KAFKA-4969
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.1.0, 1.0.1
>
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different 
> "types" of tasks. For example, task can be stateless of have one or multiple 
> stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless 
> and 2 stateful tasks and the app is running with 2 instances. To share the 
> "store load" it would be good to place one stateless and one stateful task 
> per instance. Right now, there is no guarantee about this, and it can happen, 
> that one instance processed both stateless tasks while the other processes 
> both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" 
> including a cost model for task placement. We should consider the following 
> parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
> This improvement should be backed by a design document in the project wiki 
> (no KIP required though) as it's a fairly complex change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores

2018-02-15 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-5285:


Assignee: Guozhang Wang  (was: Xavier Léauté)

> Optimize upper / lower byte range for key range scan on windowed stores
> ---
>
> Key: KAFKA-5285
> URL: https://issues.apache.org/jira/browse/KAFKA-5285
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: performance
>
> The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} 
> {{upperRange}} and {{lowerRange}} does not make any assumptions with respect 
> to the other key bound (e.g. the upper byte bound does not depends on lower 
> key bound).
> It should be possible to optimize the byte range somewhat further using the 
> information provided by the lower bound.
> More specifically, by incorporating that information, we should be able to 
> eliminate the corresponding {{upperRangeFixedSize}} and 
> {{lowerRangeFixedSize}}, since the result should be the same if we implement 
> that optimization.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration

2018-02-15 Thread Ashish Surana (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366398#comment-16366398
 ] 

Ashish Surana commented on KAFKA-6555:
--

Raised PR @[https://github.com/apache/kafka/pull/4575]

Please review and let me know your comments. These are the minimal changes 
required, so not sure if we need kip.

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable giving making the partition down. We can 
> make the state store partition queryable for the data already present in the 
> state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-02-15 Thread Gunnar Morling (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-6566:
--
Affects Version/s: 1.0.0

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6560) Use single-point queries than range queries for windowed aggregation operators

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366392#comment-16366392
 ] 

ASF GitHub Bot commented on KAFKA-6560:
---

guozhangwang opened a new pull request #4578: KAFKA-6560: Replace range query 
with newly added single point query in Windowed Aggregation [WIP]
URL: https://github.com/apache/kafka/pull/4578
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use single-point queries than range queries for windowed aggregation operators
> --
>
> Key: KAFKA-6560
> URL: https://issues.apache.org/jira/browse/KAFKA-6560
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: needs-kip
>
> Today for windowed aggregations in Streams DSL, the underlying implementation 
> is leveraging the fetch(key, from, to) API to get all the related windows for 
> a single record to update. However, this is a very inefficient operation with 
> significant amount of CPU time iterating over window stores. On the other 
> hand, since the operator implementation itself have full knowledge of the 
> window specs it can actually translate this operation into multiple 
> single-point queries with the accurate window start timestamp, which would 
> largely reduce the overhead.
> The proposed approach is to add a single fetch API to the WindowedStore and 
> use that in the KStreamWindowedAggregate / KStreamWindowedReduce operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-02-15 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366390#comment-16366390
 ] 

Randall Hauch commented on KAFKA-6566:
--

Also, if we're not calling {{stop()}} on the task, we should find out if that 
was the original intension. If so, then there's nothing to fix here, or that if 
we want to change the behavior we'll need a KIP. However, if that was the 
intention and we're simply no longer doing it, then it's a bug that we can fix 
without a KIP.

Regardless, it seems like there's quite a bit more investigation required 
before we know how the code should be changed.

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-02-15 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366382#comment-16366382
 ] 

Randall Hauch commented on KAFKA-6566:
--

[~te...@apache.org], maybe we should first understand what happens to the other 
tasks and the connector when the `WorkerSourceTask` throws an exception. For 
example, if {{task.stop()}} should be called when the connector and all tasks 
are cleaned up, then we won't also want it in the suggested location.

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-02-15 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6566:
-
Component/s: KafkaConnect

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6568) LogCleanerManager.doneDeleting() should check the partition state before deleting the in progress partition

2018-02-15 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-6568:

Description: {{LogCleanerManager.doneDeleting()}} removes the partition 
from the {{inProgress}} map without checking if the partition is paused or not. 
This will cause the paused partition state to be lost, and may also cause 
another thread calling {{LogCleanerManager.abortAndPauseCleaning()}} to block 
indefinitely waiting on the partition state to become paused.  (was: 
{{LogCleanerManager.doneDeleting()}} removes the partition from the 
{{inProgress}} map without checking if the partition is paused or not. This 
will cause the paused partition state to be lost. And may cause another thread 
calling {{LogCleanerManager.abortAndPauseCleaning()}} to block indefinitely 
waiting on the partition state to become paused.)

> LogCleanerManager.doneDeleting() should check the partition state before 
> deleting the in progress partition
> ---
>
> Key: KAFKA-6568
> URL: https://issues.apache.org/jira/browse/KAFKA-6568
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 1.1.0
>
>
> {{LogCleanerManager.doneDeleting()}} removes the partition from the 
> {{inProgress}} map without checking if the partition is paused or not. This 
> will cause the paused partition state to be lost, and may also cause another 
> thread calling {{LogCleanerManager.abortAndPauseCleaning()}} to block 
> indefinitely waiting on the partition state to become paused.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6568) LogCleanerManager.doneDeleting() should check the partition state before deleting the in progress partition

2018-02-15 Thread Jiangjie Qin (JIRA)
Jiangjie Qin created KAFKA-6568:
---

 Summary: LogCleanerManager.doneDeleting() should check the 
partition state before deleting the in progress partition
 Key: KAFKA-6568
 URL: https://issues.apache.org/jira/browse/KAFKA-6568
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.2, 1.0.0, 0.10.2.1, 0.10.1.1
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Fix For: 1.1.0


{{LogCleanerManager.doneDeleting()}} removes the partition from the 
{{inProgress}} map without checking if the partition is paused or not. This 
will cause the paused partition state to be lost. And may cause another thread 
calling {{LogCleanerManager.abortAndPauseCleaning()}} to block indefinitely 
waiting on the partition state to become paused.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5660) Don't throw TopologyBuilderException during runtime

2018-02-15 Thread Nick Afshartous (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366360#comment-16366360
 ] 

Nick Afshartous commented on KAFKA-5660:


Seems like the first sentence of the description is missing "before"

  should only be thrown *before* {{KafkaStreams#start()}} is called. 

> Don't throw TopologyBuilderException during runtime
> ---
>
> Key: KAFKA-5660
> URL: https://issues.apache.org/jira/browse/KAFKA-5660
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Matthias J. Sax
>Assignee: Nick Afshartous
>Priority: Major
>
> {{TopologyBuilderException}} is a pre-runtime exception that should only be 
> thrown {{KafkaStreams#start()}} is called.
> However, we do throw {{TopologyBuilderException}} within
> - `SourceNodeFactory#getTopics`
> - `ProcessorContextImpl#getStateStore`
> - `StreamPartitionAssignor#prepareTopic `
> (and maybe somewhere else: we should double check if there are other places 
> in the code like those).
> We should replace those exception with either {{StreamsException}} or with a 
> new exception type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366357#comment-16366357
 ] 

ASF GitHub Bot commented on KAFKA-6566:
---

tedyu opened a new pull request #4577: KAFKA-6566 SourceTask#stop() not called 
after exception raised in poll()
URL: https://github.com/apache/kafka/pull/4577
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gunnar Morling
>Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366331#comment-16366331
 ] 

ASF GitHub Bot commented on KAFKA-5285:
---

guozhangwang opened a new pull request #4576: KAFKA-5285: Use optimized upper 
and lower bound for window key schema [WIP]
URL: https://github.com/apache/kafka/pull/4576
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Optimize upper / lower byte range for key range scan on windowed stores
> ---
>
> Key: KAFKA-5285
> URL: https://issues.apache.org/jira/browse/KAFKA-5285
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>Priority: Major
>  Labels: performance
>
> The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} 
> {{upperRange}} and {{lowerRange}} does not make any assumptions with respect 
> to the other key bound (e.g. the upper byte bound does not depends on lower 
> key bound).
> It should be possible to optimize the byte range somewhat further using the 
> information provided by the lower bound.
> More specifically, by incorporating that information, we should be able to 
> eliminate the corresponding {{upperRangeFixedSize}} and 
> {{lowerRangeFixedSize}}, since the result should be the same if we implement 
> that optimization.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6567) KStreamWindowReduce can be replaced by KStreamWindowAggregate

2018-02-15 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6567:


 Summary: KStreamWindowReduce can be replaced by 
KStreamWindowAggregate
 Key: KAFKA-6567
 URL: https://issues.apache.org/jira/browse/KAFKA-6567
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


This is a tech debt worth cleaning up: KStreamWindowReduce should be able to be 
replaced by KStreamWindowAggregate. In fact, we have already done this for 
session windows, where in {{SessionWindowedKStreamImpl}} we use 
{{reduceInitializer}}, {{aggregatorForReducer}} and {{mergerForAggregator}} to 
replace reducer with aggregator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2018-02-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366299#comment-16366299
 ] 

Matthias J. Sax commented on KAFKA-4969:


Should we really close this JIRA? I think we could do even better and the 
description covers more than addressed so far. (As alternative, we can also 
create a new ticket, and keep this one with a limited scope). WDYT?

> State-store workload-aware StreamsPartitionAssignor
> ---
>
> Key: KAFKA-4969
> URL: https://issues.apache.org/jira/browse/KAFKA-4969
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.1.0, 1.0.1
>
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different 
> "types" of tasks. For example, task can be stateless of have one or multiple 
> stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless 
> and 2 stateful tasks and the app is running with 2 instances. To share the 
> "store load" it would be good to place one stateless and one stateful task 
> per instance. Right now, there is no guarantee about this, and it can happen, 
> that one instance processed both stateless tasks while the other processes 
> both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" 
> including a cost model for task placement. We should consider the following 
> parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
> This improvement should be backed by a design document in the project wiki 
> (no KIP required though) as it's a fairly complex change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-02-15 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-6566:
-

 Summary: SourceTask#stop() not called after exception raised in 
poll()
 Key: KAFKA-6566
 URL: https://issues.apache.org/jira/browse/KAFKA-6566
 Project: Kafka
  Issue Type: Bug
Reporter: Gunnar Morling


Having discussed this with [~rhauch], it has been my assumption that 
{{SourceTask#stop()}} will be called by the Kafka Connect framework in case an 
exception has been raised in {{poll()}}. That's not the case, though. As an 
example see the connector and task below.

Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
action to take, as it'll allow the task to clean up any resources such as 
releasing any database connections, right after that failure and not only once 
the connector is stopped.

{code}
package com.example;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

public class TestConnector extends SourceConnector {

@Override
public String version() {
return null;
}

@Override
public void start(Map props) {
}

@Override
public Class taskClass() {
return TestTask.class;
}

@Override
public List> taskConfigs(int maxTasks) {
return Collections.singletonList(Collections.singletonMap("foo", 
"bar"));
}

@Override
public void stop() {
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

public static class TestTask extends SourceTask {

@Override
public String version() {
return null;
}

@Override
public void start(Map props) {
}

@Override
public List poll() throws InterruptedException {
throw new RuntimeException();
}

@Override
public void stop() {
System.out.println("stop() called");
}
}
}

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2018-02-15 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4969:
-
Fix Version/s: 1.0.1

> State-store workload-aware StreamsPartitionAssignor
> ---
>
> Key: KAFKA-4969
> URL: https://issues.apache.org/jira/browse/KAFKA-4969
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.1.0, 1.0.1
>
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different 
> "types" of tasks. For example, task can be stateless of have one or multiple 
> stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless 
> and 2 stateful tasks and the app is running with 2 instances. To share the 
> "store load" it would be good to place one stateless and one stateful task 
> per instance. Right now, there is no guarantee about this, and it can happen, 
> that one instance processed both stateless tasks while the other processes 
> both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" 
> including a cost model for task placement. We should consider the following 
> parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
> This improvement should be backed by a design document in the project wiki 
> (no KIP required though) as it's a fairly complex change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores

2018-02-15 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366286#comment-16366286
 ] 

Guozhang Wang commented on KAFKA-5285:
--

[~davispw] [~xvrl] Thinking about this a bit more, I think we can provide a 
better heuristic to define the upper and lower bound for window store 
{{fetch(keyFrom, keyTo, timeFrom, timeTo)}} indeed, which is the following:

0. First of all, documentation wise we require users to make sure the 
serialized bytes of {{keyFrom}} is smaller than serialized bytes of {{keyTo}} 
lexicograpically. And inside this call, we first check the bytes for this 
condition. 

1. For lower range, we just use the {{keyFrom bytes}}, instead of {{keyFrom + 
minSuffix (s)}}. Since we know there would be no data in between these two 
keys at all, we can save some overhead of bytes alloc and puts.

2. For upper range, we just consider the following two combinations:

 a). keyFrom + minSuffix
 b). keyTo + maxSuffix

where minSuffix = timeFrom + seq0, maxSuffix = timeTo + seqMAX. And then we 
just pick the largest among these two. In all we would replace this with the 
current {{OrderedBytes}} implementations. 

Similarly, for session store {{fetch}}, we do the same thing, except for single 
key fetch we handle it more in a more optimized way:

0. For single key fetch, lower range = key + timeFrom + timFrom, upper range = 
key + timeTo + timeTo (this is because timeTo <= timeFrom, so they can be the 
bound of each other).

1. For range key fetch, lower range = keyFrom.

2. For range key fetch, upper range = MAX (keyFrom + timeFrom + timeFrom, keyTo 
+ timeTo + timeTo).

WDYT?

> Optimize upper / lower byte range for key range scan on windowed stores
> ---
>
> Key: KAFKA-5285
> URL: https://issues.apache.org/jira/browse/KAFKA-5285
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>Priority: Major
>  Labels: performance
>
> The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} 
> {{upperRange}} and {{lowerRange}} does not make any assumptions with respect 
> to the other key bound (e.g. the upper byte bound does not depends on lower 
> key bound).
> It should be possible to optimize the byte range somewhat further using the 
> information provided by the lower bound.
> More specifically, by incorporating that information, we should be able to 
> eliminate the corresponding {{upperRangeFixedSize}} and 
> {{lowerRangeFixedSize}}, since the result should be the same if we implement 
> that optimization.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6549) Deadlock while processing Controller Events

2018-02-15 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-6549.
---
Resolution: Fixed

Fixed under KAFKA-6517.

> Deadlock while processing Controller Events
> ---
>
> Key: KAFKA-6549
> URL: https://issues.apache.org/jira/browse/KAFKA-6549
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Blocker
> Fix For: 1.1.0
>
> Attachments: td.txt
>
>
> Stack traces from a single node test cluster that was deadlocked while 
> processing controller Reelect and Expire events. Attached stack-trace.
> {quote}
> "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 
> nid=0x7d03 waiting on condition [0x7278b000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0007bccadf30> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363)
>  at 
> org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531)
>  at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)
> Locked ownable synchronizers:
>  - <0x000780054860> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>  
> "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 
> nid=0xad03 waiting on condition [0x73fd3000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0007bcc584a0> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148)
>  at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1439)
>  at 
> kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1432)
>  at 
> kafka.zk.KafkaZkClient.registerZNodeChangeHandlerAndCheckExistence(KafkaZkClient.scala:1171)
>  at 
> kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1475)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>  at 

[jira] [Updated] (KAFKA-6565) Sudden unresponsiveness from broker + file descriptor leak

2018-02-15 Thread Noam Berman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Noam Berman updated KAFKA-6565:
---
Description: 
Running 3 kafka clusters v0.10.1.0  on aws, 5 brokers each (+5 zookeepers), in 
the past week about 2-3 times a day (on all clusters, no specific brokers) we 
encounter a situation in which a single broker stops responding to all requests 
from clients/other brokers. at this moment all produced messages fail, and 
other brokers start writing this to log repeatedly:

[2018-02-15 17:56:08,949] WARN [ReplicaFetcherThread-0-878], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@772ad40c 
(kafka.server.ReplicaFetcherThread)
 java.io.IOException: Connection to 878 was disconnected before the response 
was read
 at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115)
 at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)
 at scala.Option.foreach(Option.scala:257)
 at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112)
 at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108)
 at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137)
 at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
 at 
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
 at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253)
 at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
 at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

 

producers (examples):

 2018-02-13 21:30:51:010 thread=[kafka-producer-network-thread | 
catalog-index-0] WARN  Got error produce response with correlation id 8147 on 
topic-partition catalog-1, retrying (49 attempts left). Error: REQUEST_TIMED_OUT

2018-02-13 21:31:06:221 thread=[kafka-producer-network-thread | 
catalog-index-0] WARN  Got error produce response with correlation id 8166 on 
topic-partition catalog-index-18, retrying (48 attempts left). Error: 
NOT_LEADER_FOR_PARTITION

 

Once this happens, file descriptors on the affected broker start increasing at 
a high rate until it reaches its maximum.

no relevant info logs on the affected broker, we'll try to gather debug logs 
and attach them to the bug next time it happens.

the big issue here is that although the broker is unresponsive, it stays in the 
cluster and its zookeeper node isn't cleared, so there's no rebalance 
whatsoever - all clients start failing until a manual shutdown occurs.

 

  was:
Running 3 kafka clusters v0.10.1.0  on aws, 5 brokers each (+5 zookeepers), in 
the past week about 2-3 times a day (on all clusters, no specific brokers) we 
encounter a situation in which a single broker stops responding to all requests 
from clients/other brokers. at this moment all produced messages fail, and 
other brokers start writing this to log repeatedly:

[2018-02-15 17:56:08,949] WARN [ReplicaFetcherThread-0-878], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@772ad40c 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 878 was disconnected before the response was 
read
 at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115)
 at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)
 at scala.Option.foreach(Option.scala:257)
 at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112)
 at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108)
 at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137)
 at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
 at 
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
 at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253)
 at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
 at 

[jira] [Updated] (KAFKA-6565) Sudden unresponsiveness from broker + file descriptor leak

2018-02-15 Thread Noam Berman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Noam Berman updated KAFKA-6565:
---
Environment: 
3 separate clusters running 0.10.1.0 with 5 nodes  + 5 zookeepers.
clients are mostly java 0.10.2.1, and a few nodejs clients 
(https://github.com/oleksiyk/kafka). 
throughput per broker: ~3.5MB per second. each broker is a leader of ~2500 
partitions. 

Debian GNU/Linux 8.8 (jessie)
java version “1.8.0_65”
c4.4xlarge , 1500gb gp2 disks

  was:
3 separate clusters running 0.10.1.0 with 5 nodes  + 5 zookeepers.
clients are mostly java 0.10.2.1, and a few nodejs clients 
(https://github.com/oleksiyk/kafka). 
throughput per broker: ~3.5MB per second. each broker is a leader of ~2500 
partitions. 


> Sudden unresponsiveness from broker + file descriptor leak
> --
>
> Key: KAFKA-6565
> URL: https://issues.apache.org/jira/browse/KAFKA-6565
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: 3 separate clusters running 0.10.1.0 with 5 nodes  + 5 
> zookeepers.
> clients are mostly java 0.10.2.1, and a few nodejs clients 
> (https://github.com/oleksiyk/kafka). 
> throughput per broker: ~3.5MB per second. each broker is a leader of ~2500 
> partitions. 
> Debian GNU/Linux 8.8 (jessie)
> java version “1.8.0_65”
> c4.4xlarge , 1500gb gp2 disks
>Reporter: Noam Berman
>Priority: Blocker
>
> Running 3 kafka clusters v0.10.1.0  on aws, 5 brokers each (+5 zookeepers), 
> in the past week about 2-3 times a day (on all clusters, no specific brokers) 
> we encounter a situation in which a single broker stops responding to all 
> requests from clients/other brokers. at this moment all produced messages 
> fail, and other brokers start writing this to log repeatedly:
> [2018-02-15 17:56:08,949] WARN [ReplicaFetcherThread-0-878], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@772ad40c 
> (kafka.server.ReplicaFetcherThread)
>  java.io.IOException: Connection to 878 was disconnected before the response 
> was read
>  at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115)
>  at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112)
>  at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108)
>  at 
> kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137)
>  at 
> kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
>  at 
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
>  at 
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253)
>  at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
>  at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
>  
> producers (examples):
>  2018-02-13 21:30:51:010 thread=[kafka-producer-network-thread | 
> catalog-index-0] WARN  Got error produce response with correlation id 8147 on 
> topic-partition catalog-1, retrying (49 attempts left). Error: 
> REQUEST_TIMED_OUT
> 2018-02-13 21:31:06:221 thread=[kafka-producer-network-thread | 
> catalog-index-0] WARN  Got error produce response with correlation id 8166 on 
> topic-partition catalog-index-18, retrying (48 attempts left). Error: 
> NOT_LEADER_FOR_PARTITION
>  
> Once this happens, file descriptors on the affected broker start increasing 
> at a high rate until it reaches its maximum.
> no relevant info logs on the affected broker, we'll try to gather debug logs 
> and attach them to the bug next time it happens.
> the big issue here is that although the broker is unresponsive, it stays in 
> the cluster and its zookeeper node isn't cleared, so there's no rebalance 
> whatsoever - all clients start failing until a manual shutdown occurs.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6565) Sudden unresponsiveness from broker + file descriptor leak

2018-02-15 Thread Noam Berman (JIRA)
Noam Berman created KAFKA-6565:
--

 Summary: Sudden unresponsiveness from broker + file descriptor leak
 Key: KAFKA-6565
 URL: https://issues.apache.org/jira/browse/KAFKA-6565
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.1.0
 Environment: 3 separate clusters running 0.10.1.0 with 5 nodes  + 5 
zookeepers.
clients are mostly java 0.10.2.1, and a few nodejs clients 
(https://github.com/oleksiyk/kafka). 
throughput per broker: ~3.5MB per second. each broker is a leader of ~2500 
partitions. 
Reporter: Noam Berman


Running 3 kafka clusters v0.10.1.0  on aws, 5 brokers each (+5 zookeepers), in 
the past week about 2-3 times a day (on all clusters, no specific brokers) we 
encounter a situation in which a single broker stops responding to all requests 
from clients/other brokers. at this moment all produced messages fail, and 
other brokers start writing this to log repeatedly:

[2018-02-15 17:56:08,949] WARN [ReplicaFetcherThread-0-878], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@772ad40c 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 878 was disconnected before the response was 
read
 at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115)
 at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)
 at scala.Option.foreach(Option.scala:257)
 at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112)
 at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108)
 at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137)
 at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
 at 
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
 at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253)
 at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
 at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

 

Once this happens, file descriptors on the affected broker start increasing at 
a high rate until it reaches its maximum.

no relevant info logs on the affected broker, we'll try to gather debug logs 
and attach them to the bug next time it happens.

the big issue here is that although the broker is unresponsive, it stays in the 
cluster and its zookeeper node isn't cleared, so there's no rebalance 
whatsoever - all clients start failing until a manual shutdown occurs.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6517) ZooKeeperClient holds a lock while waiting for responses, blocking shutdown

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366212#comment-16366212
 ] 

ASF GitHub Bot commented on KAFKA-6517:
---

rajinisivaram closed pull request #4551: KAFKA-6517: Avoid deadlock in 
ZooKeeperClient during session expiry
URL: https://github.com/apache/kafka/pull/4551
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 9a1d16274df..3934fd0ad5d 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{ArrayBlockingQueue, 
ConcurrentHashMap, CountDownLat
 import com.yammer.metrics.core.{Gauge, MetricName}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
-import kafka.utils.Logging
+import kafka.utils.{KafkaScheduler, Logging}
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, 
DataCallback, StatCallback, StringCallback, VoidCallback}
 import org.apache.zookeeper.KeeperException.Code
@@ -59,6 +59,7 @@ class ZooKeeperClient(connectString: String,
   private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, 
ZNodeChildChangeHandler]().asScala
   private val inFlightRequests = new Semaphore(maxInFlightRequests)
   private val stateChangeHandlers = new ConcurrentHashMap[String, 
StateChangeHandler]().asScala
+  private[zookeeper] val expiryScheduler = new KafkaScheduler(0, 
"zk-session-expiry-handler")
 
   private val metricNames = Set[String]()
 
@@ -90,6 +91,7 @@ class ZooKeeperClient(connectString: String,
 
   metricNames += "SessionState"
 
+  expiryScheduler.startup()
   waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
 
   override def metricName(name: String, metricTags: 
scala.collection.Map[String, String]): MetricName = {
@@ -122,7 +124,7 @@ class ZooKeeperClient(connectString: String,
* response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). 
Otherwise, the most specific common supertype
* will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]).
*/
-  def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): 
Seq[Req#Response] = inReadLock(initializationLock) {
+  def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): 
Seq[Req#Response] = {
 if (requests.isEmpty)
   Seq.empty
 else {
@@ -132,10 +134,12 @@ class ZooKeeperClient(connectString: String,
   requests.foreach { request =>
 inFlightRequests.acquire()
 try {
-  send(request) { response =>
-responseQueue.add(response)
-inFlightRequests.release()
-countDownLatch.countDown()
+  inReadLock(initializationLock) {
+send(request) { response =>
+  responseQueue.add(response)
+  inFlightRequests.release()
+  countDownLatch.countDown()
+}
   }
 } catch {
   case e: Throwable =>
@@ -148,7 +152,8 @@ class ZooKeeperClient(connectString: String,
 }
   }
 
-  private def send[Req <: AsyncRequest](request: Req)(processResponse: 
Req#Response => Unit): Unit = {
+  // Visibility to override for testing
+  private[zookeeper] def send[Req <: AsyncRequest](request: 
Req)(processResponse: Req#Response => Unit): Unit = {
 // Safe to cast as we always create a response of the right type
 def callback(response: AsyncResponse): Unit = 
processResponse(response.asInstanceOf[Req#Response])
 
@@ -303,12 +308,18 @@ class ZooKeeperClient(connectString: String,
 stateChangeHandlers.clear()
 zooKeeper.close()
 metricNames.foreach(removeMetric(_))
+expiryScheduler.shutdown()
 info("Closed.")
   }
 
   def sessionId: Long = inReadLock(initializationLock) {
 zooKeeper.getSessionId
   }
+
+  // Only for testing
+  private[zookeeper] def currentZooKeeper: ZooKeeper = 
inReadLock(initializationLock) {
+zooKeeper
+  }
   
   private def initialize(): Unit = {
 if (!connectionState.isAlive) {
@@ -352,12 +363,14 @@ class ZooKeeperClient(connectString: String,
 error("Auth failed.")
 stateChangeHandlers.values.foreach(_.onAuthFailure())
   } else if (state == KeeperState.Expired) {
-inWriteLock(initializationLock) {
-  info("Session expired.")
-  stateChangeHandlers.values.foreach(_.beforeInitializingSession())
-  initialize()
-  stateChangeHandlers.values.foreach(_.afterInitializingSession())
-

[jira] [Issue Comment Deleted] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores

2018-02-15 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5285:
-
Comment: was deleted

(was: [~davispw] [~xvrl] Thinking about this a bit more, I think we can provide 
a better heuristic to define the upper and lower bound for window store 
{{fetch(keyFrom, keyTo, timeFrom, timeTo)}} indeed, which is to just consider 
the following four combinations:

1. keyFrom + minSuffix
2. keyTo + minSuffix
3. keyFrom + maxSuffix
4. keyTo + maxSuffix

where minSuffix = timeFrom + seq0, and maxSuffix = timeTo + seqMAX. And then we 
just pick the smallest and largest among these four as lower and upper range, 
to replace the current {{OrderedBytes}} implementations. WDYT?)

> Optimize upper / lower byte range for key range scan on windowed stores
> ---
>
> Key: KAFKA-5285
> URL: https://issues.apache.org/jira/browse/KAFKA-5285
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>Priority: Major
>  Labels: performance
>
> The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} 
> {{upperRange}} and {{lowerRange}} does not make any assumptions with respect 
> to the other key bound (e.g. the upper byte bound does not depends on lower 
> key bound).
> It should be possible to optimize the byte range somewhat further using the 
> information provided by the lower bound.
> More specifically, by incorporating that information, we should be able to 
> eliminate the corresponding {{upperRangeFixedSize}} and 
> {{lowerRangeFixedSize}}, since the result should be the same if we implement 
> that optimization.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores

2018-02-15 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366165#comment-16366165
 ] 

Guozhang Wang commented on KAFKA-5285:
--

[~davispw] [~xvrl] Thinking about this a bit more, I think we can provide a 
better heuristic to define the upper and lower bound for window store 
{{fetch(keyFrom, keyTo, timeFrom, timeTo)}} indeed, which is to just consider 
the following four combinations:

1. keyFrom + minSuffix
2. keyTo + minSuffix
3. keyFrom + maxSuffix
4. keyTo + maxSuffix

where minSuffix = timeFrom + seq0, and maxSuffix = timeTo + seqMAX. And then we 
just pick the smallest and largest among these four as lower and upper range, 
to replace the current {{OrderedBytes}} implementations. WDYT?

> Optimize upper / lower byte range for key range scan on windowed stores
> ---
>
> Key: KAFKA-5285
> URL: https://issues.apache.org/jira/browse/KAFKA-5285
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>Priority: Major
>  Labels: performance
>
> The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} 
> {{upperRange}} and {{lowerRange}} does not make any assumptions with respect 
> to the other key bound (e.g. the upper byte bound does not depends on lower 
> key bound).
> It should be possible to optimize the byte range somewhat further using the 
> information provided by the lower bound.
> More specifically, by incorporating that information, we should be able to 
> eliminate the corresponding {{upperRangeFixedSize}} and 
> {{lowerRangeFixedSize}}, since the result should be the same if we implement 
> that optimization.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores

2018-02-15 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5285:
-
Summary: Optimize upper / lower byte range for key range scan on windowed 
stores  (was: optimize upper / lower byte range for key range scan on windowed 
stores)

> Optimize upper / lower byte range for key range scan on windowed stores
> ---
>
> Key: KAFKA-5285
> URL: https://issues.apache.org/jira/browse/KAFKA-5285
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>Priority: Major
>  Labels: performance
>
> The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} 
> {{upperRange}} and {{lowerRange}} does not make any assumptions with respect 
> to the other key bound (e.g. the upper byte bound does not depends on lower 
> key bound).
> It should be possible to optimize the byte range somewhat further using the 
> information provided by the lower bound.
> More specifically, by incorporating that information, we should be able to 
> eliminate the corresponding {{upperRangeFixedSize}} and 
> {{lowerRangeFixedSize}}, since the result should be the same if we implement 
> that optimization.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6512) Java Producer: Excessive memory usage with compression enabled

2018-02-15 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-6512.
---
   Resolution: Fixed
Fix Version/s: (was: 1.2.0)
   1.1.0

Implemented options 1) and 2) from the description.

> Java Producer: Excessive memory usage with compression enabled
> --
>
> Key: KAFKA-6512
> URL: https://issues.apache.org/jira/browse/KAFKA-6512
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
> Environment: Windows 10
>Reporter: Kyle Tinker
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.1.0
>
> Attachments: KafkaSender.java
>
>
> h2. User Story
> As a user of the Java producer, I want a predictable memory usage for the 
> Kafka client so that I can ensure that my system is sized appropriately and 
> will be stable even under heavy usage.
> As a user of the Java producer, I want a smaller memory footprint so that my 
> systems don't consume as many resources.
> h2. Acceptance Criteria
>  * Enabling Compression in Kafka should not significantly increase the memory 
> usage of Kafka
>  * The memory usage of Kafka's Java Producer should be roughly in line with 
> the buffer size (buffer.memory) and the number of producers declared.
> h2. Additional Information
> I've observed high memory usage in the producer when enabling compression 
> (gzip or lz4).  I don't observe the behavior with compression off, but with 
> it on I'll run out of heap (2GB).  Using a Java profiler, I see the data is 
> in the KafkaLZ4BlockOutputStream (or related class for gzip).   I see that 
> MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but 
> is not successful.  I'm most likely network bottlenecked, so I expect the 
> producer buffers to be full while the job is running and potentially a lot of 
> unacknowledged records.
> I've tried using the default buffer.memory with 20 producers (across 20 
> threads) and sending data as quickly as I can.  I've also tried 1MB of 
> buffer.memory, which seemed to reduce memory consumption but I could still 
> run OOM in certain cases.  I have max.in.flight.requests.per.connection set 
> to 1.  In short, I should only have ~20 MB (20* 1MB) of data in buffers, but 
> I can easily exhaust 2000 MB used by Kafka.
> In looking at the code more, it looks like the KafkaLZ4BlockOutputStream 
> doesn't clear the compressedBuffer or buffer when close() is called.  In my 
> heap dump, both of those are ~65k size each, meaning that each batch is 
> taking up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000 
> and messages are 1k each until the batch fills).
> Kafka tries to manage memory usage by calling 
> MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release 
> resources required for record appends (e.g. compression buffers)".  However, 
> this method doesn't actually clear those buffers because 
> KafkaLZ4BlockOutputStream.close() only writes the block and end mark and 
> closes the output stream.  It doesn't actually clear the buffer and 
> compressedBuffer in KafkaLZ4BlockOutputStream.  Those stay allocated in RAM 
> until the block is acknowledged by the broker, processed in 
> Sender:handleProduceResponse(), and the batch is deallocated.  This memory 
> usage therefore increases, possibly without bound.  In my test program, the 
> program died with approximately 345 unprocessed batches per producer (20 
> producers), despite having max.in.flight.requests.per.connection=1.
> h2. Steps to Reproduce
>  # Create a topic test with plenty of storage
>  # Use a connection with a very fast upload pipe and limited download.  This 
> allows the outbound data to go out, but acknowledgements to be delayed 
> flowing in.
>  # Download KafkaSender.java (attached to this ticket)
>  # Set line 17 to reference your Kafka broker
>  # Run the program with a 1GB Xmx value
> h2. Possible solutions
> There are a few possible optimizations I can think of:
>  # We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as 
> non-final and null them in the close() method
>  # We could declare the MemoryRecordsBuilder.appendStream non-final and null 
> it in the closeForRecordAppends() method
>  # We could have the ProducerBatch discard the recordsBuilder in 
> closeForRecordAppends(), however, this is likely a bad idea because the 
> recordsBuilder contains significant metadata that is likely needed after the 
> stream is closed.  It is also final.
>  # We could try to limit the number of non-acknowledged batches in flight.  
> This would bound the maximum memory usage but may negatively impact 
> performance.
>  
> Fix #1 would only improve the LZ4 algorithm, and not any other 

[jira] [Comment Edited] (KAFKA-4277) creating ephemeral node already exist

2018-02-15 Thread Nico Meyer (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366008#comment-16366008
 ] 

Nico Meyer edited comment on KAFKA-4277 at 2/15/18 5:47 PM:


We also just encountered the same problem. Kafka 0.11.0.1 and ZooKeeper 3.4.6.

It seems that Zookeeper does not guarantee that the ephemeral nodes are gone 
after a session expired. At least not if one connects to a different Zookeeper 
node, which is exactly what happened in our case:
{code:java}
2018-02-15 13:43:03,464 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 
172.20.0.206/172.20.0.206:2181. Will not attempt to authenticate using SASL 
(unknown error)
2018-02-15 13:43:04,462 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:primeConnection(876)) - Socket connection established to 
172.20.0.206/172.20.0.206:2181, initiating session
2018-02-15 13:43:04,549 WARN  zookeeper.ClientCnxn 
(ClientCnxn.java:onConnected(1285)) - Unable to reconnect to ZooKeeper service, 
session 0x1614ad433b00b17 has expired
2018-02-15 13:43:04,549 INFO  zookeeper.ClientCnxn (ClientCnxn.java:run(1154)) 
- Unable to reconnect to ZooKeeper service, session 0x1614ad433b00b17 has 
expired, closing socket connection
2018-02-15 13:43:04,549 INFO  zkclient.ZkClient 
(ZkClient.java:processStateChanged(713)) - zookeeper state changed (Expired)
2018-02-15 13:43:04,550 INFO  zookeeper.ZooKeeper (ZooKeeper.java:(438)) 
- Initiating client connection, 
connectString=172.20.0.215:2181,172.20.0.244:2181,172.20.0.204:2181,172.20.0.206:2181,172.20.0.208:2181/3rdparty/kafka08
 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@199bd995
2018-02-15 13:43:04,550 INFO  zookeeper.ClientCnxn (ClientCnxn.java:run(519)) - 
EventThread shut down for session: 0x1614ad433b00b17
2018-02-15 13:43:04,552 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 
172.20.0.204/172.20.0.204:2181. Will not attempt to authenticate using SASL 
(unknown error)
2018-02-15 13:43:04,553 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:primeConnection(876)) - Socket connection established to 
172.20.0.204/172.20.0.204:2181, initiating session
2018-02-15 13:43:04,553 INFO  zookeeper.ClientCnxn (ClientCnxn.java:run(1158)) 
- Unable to read additional data from server sessionid 0x0, likely server has 
closed socket, closing socket connection and attempting reconnect
2018-02-15 13:43:04,699 INFO  zkclient.ZkClient 
(ZkClient.java:waitForKeeperState(936)) - Waiting for keeper state SyncConnected
2018-02-15 13:43:04,699 INFO  zkclient.ZkClient 
(ZkClient.java:waitForKeeperState(936)) - Waiting for keeper state SyncConnected
2018-02-15 13:43:05,373 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 
172.20.0.206/172.20.0.206:2181. Will not attempt to authenticate using SASL 
(unknown error)
2018-02-15 13:43:05,374 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:primeConnection(876)) - Socket connection established to 
172.20.0.206/172.20.0.206:2181, initiating session
2018-02-15 13:43:06,575 WARN  zookeeper.ClientCnxn (ClientCnxn.java:run(1108)) 
- Client session timed out, have not heard from server in 1201ms for sessionid 
0x0
2018-02-15 13:43:06,575 INFO  zookeeper.ClientCnxn (ClientCnxn.java:run(1156)) 
- Client session timed out, have not heard from server in 1201ms for sessionid 
0x0, closing socket connection and attempting reconnect
2018-02-15 13:43:07,180 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 
172.20.0.215/172.20.0.215:2181. Will not attempt to authenticate using SASL 
(unknown error)
2018-02-15 13:43:07,180 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:primeConnection(876)) - Socket connection established to 
172.20.0.215/172.20.0.215:2181, initiating session
2018-02-15 13:43:07,182 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:onConnected(1299)) - Session establishment complete on server 
172.20.0.215/172.20.0.215:2181, sessionid = 0x161997d55350158, negotiated 
timeout = 6000
2018-02-15 13:43:07,182 INFO  zkclient.ZkClient 
(ZkClient.java:processStateChanged(713)) - zookeeper state changed 
(SyncConnected)
2018-02-15 13:43:07,184 INFO  server.KafkaHealthcheck$SessionExpireListener 
(Logging.scala:info(70)) - re-registering broker info in ZK for broker 11
2018-02-15 13:43:07,203 INFO  utils.ZKCheckedEphemeral (Logging.scala:info(70)) 
- Creating /brokers/ids/11 (is it secure? false)
2018-02-15 13:43:07,246 INFO  utils.ZKCheckedEphemeral (Logging.scala:info(70)) 
- Result of znode creation is: NODEEXISTS
2018-02-15 13:43:07,304 ERROR zkclient.ZkEventThread 
(ZkEventThread.java:run(78)) - Error handling event ZkEvent[New session event 
sent to kafka.server.KafkaHealthcheck$SessionExpireListener@bb373b3]
java.lang.RuntimeException: A broker is already registered on the 

[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records

2018-02-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366010#comment-16366010
 ] 

Matthias J. Sax commented on KAFKA-6502:


If we really change the timestamp handling, and only look at the first record 
per partition to determine which record to pick next, this issue resolved 
itself, as we can to lazy deserialization for the first record per partition 
only.

> Kafka streams deserialization handler not committing offsets on error records
> -
>
> Key: KAFKA-6502
> URL: https://issues.apache.org/jira/browse/KAFKA-6502
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Soby Chacko
>Priority: Minor
>
> See this StackOverflow issue: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]
> and this comment: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]
>  I am trying to use the LogAndContinueExceptionHandler on deserialization. It 
> works fine when an error occurs by successfully logging and continuing. 
> However, on a continuous stream of errors, it seems like these messages are 
> not committed and on a restart of the application they reappear again.  It is 
> more problematic if I try to send the messages in error to a DLQ. On a 
> restart, they are sent again to DLQ. As soon as I have a good record coming 
> in, it looks like the offset moves further and not seeing the already logged 
> messages again after a restart. 
> I reproduced this behavior by running the sample provided here: 
> [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]
> I changed the incoming value Serde to 
> {{Serdes.Integer().getClass().getName()}} to force a deserialization error on 
> input and reduced the commit interval to just 1 second. Also added the 
> following to the config.
> {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
>  LogAndContinueExceptionHandler.class);}}.
>  It looks like when deserialization exceptions occur, this flag is never set 
> to be true here: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
>  It only becomes true once processing succeeds. That might be the reason why 
> commit is not happening even after I manually call processorContext#commit().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4277) creating ephemeral node already exist

2018-02-15 Thread Nico Meyer (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366008#comment-16366008
 ] 

Nico Meyer commented on KAFKA-4277:
---

We also just encountered the same problem. Kafka 0.11.0.1 and ZooKeeper 3.4.6.

It seems that Zookeeper does not guarantee that the ephemeral nodes are gone 
after a session expired. At least not if one connects to a different Zookeeper 
node, which is exactly what happened in our case:
{code:java}
2018-02-15 13:43:03,464 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 
172.20.0.206/172.20.0.206:2181. Will not attempt to authenticate using SASL 
(unknown error)
2018-02-15 13:43:04,462 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:primeConnection(876)) - Socket connection established to 
172.20.0.206/172.20.0.206:2181, initiating session
2018-02-15 13:43:04,549 WARN  zookeeper.ClientCnxn 
(ClientCnxn.java:onConnected(1285)) - Unable to reconnect to ZooKeeper service, 
session 0x1614ad433b00b17 has expired
2018-02-15 13:43:04,549 INFO  zookeeper.ClientCnxn (ClientCnxn.java:run(1154)) 
- Unable to reconnect to ZooKeeper service, session 0x1614ad433b00b17 has 
expired, closing socket connection
2018-02-15 13:43:04,549 INFO  zkclient.ZkClient 
(ZkClient.java:processStateChanged(713)) - zookeeper state changed (Expired)
2018-02-15 13:43:04,550 INFO  zookeeper.ZooKeeper (ZooKeeper.java:(438)) 
- Initiating client connection, 
connectString=172.20.0.215:2181,172.20.0.244:2181,172.20.0.204:2181,172.20.0.206:2181,172.20.0.208:2181/3rdparty/kafka08
 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@199bd995
2018-02-15 13:43:04,550 INFO  zookeeper.ClientCnxn (ClientCnxn.java:run(519)) - 
EventThread shut down for session: 0x1614ad433b00b17
2018-02-15 13:43:04,552 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 
172.20.0.204/172.20.0.204:2181. Will not attempt to authenticate using SASL 
(unknown error)
2018-02-15 13:43:04,553 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:primeConnection(876)) - Socket connection established to 
172.20.0.204/172.20.0.204:2181, initiating session
2018-02-15 13:43:04,553 INFO  zookeeper.ClientCnxn (ClientCnxn.java:run(1158)) 
- Unable to read additional data from server sessionid 0x0, likely server has 
closed socket, closing socket connection and attempting reconnect
2018-02-15 13:43:04,699 INFO  zkclient.ZkClient 
(ZkClient.java:waitForKeeperState(936)) - Waiting for keeper state SyncConnected
2018-02-15 13:43:04,699 INFO  zkclient.ZkClient 
(ZkClient.java:waitForKeeperState(936)) - Waiting for keeper state SyncConnected
2018-02-15 13:43:05,373 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 
172.20.0.206/172.20.0.206:2181. Will not attempt to authenticate using SASL 
(unknown error)
2018-02-15 13:43:05,374 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:primeConnection(876)) - Socket connection established to 
172.20.0.206/172.20.0.206:2181, initiating session
2018-02-15 13:43:06,575 WARN  zookeeper.ClientCnxn (ClientCnxn.java:run(1108)) 
- Client session timed out, have not heard from server in 1201ms for sessionid 
0x0
2018-02-15 13:43:06,575 INFO  zookeeper.ClientCnxn (ClientCnxn.java:run(1156)) 
- Client session timed out, have not heard from server in 1201ms for sessionid 
0x0, closing socket connection and attempting reconnect
2018-02-15 13:43:07,180 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 
172.20.0.215/172.20.0.215:2181. Will not attempt to authenticate using SASL 
(unknown error)
2018-02-15 13:43:07,180 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:primeConnection(876)) - Socket connection established to 
172.20.0.215/172.20.0.215:2181, initiating session
2018-02-15 13:43:07,182 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:onConnected(1299)) - Session establishment complete on server 
172.20.0.215/172.20.0.215:2181, sessionid = 0x161997d55350158, negotiated 
timeout = 6000
2018-02-15 13:43:07,182 INFO  zkclient.ZkClient 
(ZkClient.java:processStateChanged(713)) - zookeeper state changed 
(SyncConnected)
2018-02-15 13:43:07,184 INFO  server.KafkaHealthcheck$SessionExpireListener 
(Logging.scala:info(70)) - re-registering broker info in ZK for broker 11
2018-02-15 13:43:07,203 INFO  utils.ZKCheckedEphemeral (Logging.scala:info(70)) 
- Creating /brokers/ids/11 (is it secure? false)
2018-02-15 13:43:07,246 INFO  utils.ZKCheckedEphemeral (Logging.scala:info(70)) 
- Result of znode creation is: NODEEXISTS
2018-02-15 13:43:07,304 ERROR zkclient.ZkEventThread 
(ZkEventThread.java:run(78)) - Error handling event ZkEvent[New session event 
sent to kafka.server.KafkaHealthcheck$SessionExpireListener@bb373b3]
java.lang.RuntimeException: A broker is already registered on the path 
/brokers/ids/11. This probably indicates that 

[jira] [Commented] (KAFKA-6512) Java Producer: Excessive memory usage with compression enabled

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365989#comment-16365989
 ] 

ASF GitHub Bot commented on KAFKA-6512:
---

rajinisivaram closed pull request #4570: KAFKA-6512: Discard references to 
buffers used for compression
URL: https://github.com/apache/kafka/pull/4570
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
 
b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
index 8cfc37be826..591ab169364 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.record;
 
-import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
@@ -34,7 +33,7 @@
  *
  * This class is not thread-safe.
  */
-public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
+public final class KafkaLZ4BlockOutputStream extends OutputStream {
 
 public static final int MAGIC = 0x184D2204;
 public static final int LZ4_MAX_HEADER_LENGTH = 19;
@@ -52,9 +51,10 @@
 private final boolean useBrokenFlagDescriptorChecksum;
 private final FLG flg;
 private final BD bd;
-private final byte[] buffer;
-private final byte[] compressedBuffer;
 private final int maxBlockSize;
+private OutputStream out;
+private byte[] buffer;
+private byte[] compressedBuffer;
 private int bufferOffset;
 private boolean finished;
 
@@ -71,7 +71,7 @@
  * @throws IOException
  */
 public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean 
blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException {
-super(out);
+this.out = out;
 compressor = LZ4Factory.fastestInstance().fastCompressor();
 checksum = XXHashFactory.fastestInstance().hash32();
 this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
@@ -204,7 +204,6 @@ private void writeBlock() throws IOException {
 private void writeEndMark() throws IOException {
 ByteUtils.writeUnsignedIntLE(out, 0);
 // TODO implement content checksum, update flg.validate()
-finished = true;
 }
 
 @Override
@@ -259,15 +258,26 @@ private void ensureNotFinished() {
 
 @Override
 public void close() throws IOException {
-if (!finished) {
-// basically flush the buffer writing the last block
-writeBlock();
-// write the end block and finish the stream
-writeEndMark();
-}
-if (out != null) {
-out.close();
-out = null;
+try {
+if (!finished) {
+// basically flush the buffer writing the last block
+writeBlock();
+// write the end block
+writeEndMark();
+}
+} finally {
+try {
+if (out != null) {
+try (OutputStream outStream = out) {
+outStream.flush();
+}
+}
+} finally {
+out = null;
+buffer = null;
+compressedBuffer = null;
+finished = true;
+}
 }
 }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index a9b57ac22df..6f6404fa2d9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -23,6 +23,7 @@
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import static org.apache.kafka.common.utils.Utils.wrapNullable;
@@ -38,11 +39,15 @@
  */
 public class MemoryRecordsBuilder {
 private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
+private static final DataOutputStream CLOSED_STREAM = new 
DataOutputStream(new OutputStream() {
+@Override
+public void write(int b) throws IOException {
+throw new IllegalStateException("MemoryRecordsBuilder is closed 
for record appends");
+}
+});
 
 private final TimestampType timestampType;
 private final CompressionType compressionType;
-// Used to append records, may compress data on the fly
-private final 

[jira] [Resolved] (KAFKA-2167) ZkUtils updateEphemeralPath JavaDoc (spelling and correctness)

2018-02-15 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2167.
--
Resolution: Fixed

> ZkUtils updateEphemeralPath JavaDoc (spelling and correctness)
> --
>
> Key: KAFKA-2167
> URL: https://issues.apache.org/jira/browse/KAFKA-2167
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jon Bringhurst
>Priority: Major
>  Labels: newbie
>
> I'm not 100% sure on this, but it seems like "persistent" should instead say 
> "ephemeral" in the JavaDoc. Also, note that "parrent" is misspelled.
> {noformat}
>   /**
>* Update the value of a persistent node with the given path and data.
>* create parrent directory if necessary. Never throw NodeExistException.
>*/
>   def updateEphemeralPath(client: ZkClient, path: String, data: String): Unit 
> = {
> try {
>   client.writeData(path, data)
> }
> catch {
>   case e: ZkNoNodeException => {
> createParentPath(client, path)
> client.createEphemeral(path, data)
>   }
>   case e2 => throw e2
> }
>   }
> {noformat}
> should be:
> {noformat}
>   /**
>* Update the value of an ephemeral node with the given path and data.
>* create parent directory if necessary. Never throw NodeExistException.
>*/
>   def updateEphemeralPath(client: ZkClient, path: String, data: String): Unit 
> = {
> try {
>   client.writeData(path, data)
> }
> catch {
>   case e: ZkNoNodeException => {
> createParentPath(client, path)
> client.createEphemeral(path, data)
>   }
>   case e2 => throw e2
> }
>   }
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6240) Support dynamic updates of frequently updated broker configs

2018-02-15 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-6240.
---
   Resolution: Fixed
Fix Version/s: (was: 1.2.0)
   1.1.0

> Support dynamic updates of frequently updated broker configs
> 
>
> Key: KAFKA-6240
> URL: https://issues.apache.org/jira/browse/KAFKA-6240
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.1.0
>
>
> See 
> [KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration]
>  for details.
> Implementation will be done under sub-tasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6564) Fix broken links in Dockerfile

2018-02-15 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365510#comment-16365510
 ] 

Manikumar commented on KAFKA-6564:
--

This was fixed in -KAFKA-6247- 

> Fix broken links in Dockerfile
> --
>
> Key: KAFKA-6564
> URL: https://issues.apache.org/jira/browse/KAFKA-6564
> Project: Kafka
>  Issue Type: Test
>Reporter: Andriy Sorokhtey
>Priority: Minor
>
> https://github.com/apache/kafka/blob/1.0.0/tests/docker/Dockerfile
> {noformat}
> # Install binary test dependencies.
> ENV MIRROR="http://mirrors.ocf.berkeley.edu/apache/;
> RUN mkdir -p "/opt/kafka-0.8.2.2" && curl -s 
> "${MIRROR}kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 
> -C "/opt/kafka-0.8.2.2"
> RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s 
> "${MIRROR}kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 
> -C "/opt/kafka-0.9.0.1"
> RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s 
> "${MIRROR}kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz" | tar xz 
> --strip-components=1 -C "/opt/kafka-0.10.0.1"
> RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s 
> "${MIRROR}kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz" | tar xz 
> --strip-components=1 -C "/opt/kafka-0.10.1.1"
> RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s 
> "${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz" | tar xz 
> --strip-components=1 -C "/opt/kafka-0.10.2.1"
> RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s 
> "${MIRROR}kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz" | tar xz 
> --strip-components=1 -C "/opt/kafka-0.11.0.0"
> {noformat}
> This links seems to be broken and automated tests executed on docker fails 
> with error:
> {noformat}
> log: /bin/sh -c mkdir -p "/opt/kafka-0.8.2.2" && curl -s 
> "${MIRROR}kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 
> -C "/opt/kafka-0.8.2.2"' returned a non-zero code: 2
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6564) Fix broken links in Dockerfile

2018-02-15 Thread Andriy Sorokhtey (JIRA)
Andriy Sorokhtey created KAFKA-6564:
---

 Summary: Fix broken links in Dockerfile
 Key: KAFKA-6564
 URL: https://issues.apache.org/jira/browse/KAFKA-6564
 Project: Kafka
  Issue Type: Test
Reporter: Andriy Sorokhtey


https://github.com/apache/kafka/blob/1.0.0/tests/docker/Dockerfile
{noformat}
# Install binary test dependencies.
ENV MIRROR="http://mirrors.ocf.berkeley.edu/apache/;
RUN mkdir -p "/opt/kafka-0.8.2.2" && curl -s 
"${MIRROR}kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 
-C "/opt/kafka-0.8.2.2"
RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s 
"${MIRROR}kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 
-C "/opt/kafka-0.9.0.1"
RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s 
"${MIRROR}kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 
-C "/opt/kafka-0.10.0.1"
RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s 
"${MIRROR}kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 
-C "/opt/kafka-0.10.1.1"
RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s 
"${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 
-C "/opt/kafka-0.10.2.1"
RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s 
"${MIRROR}kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 
-C "/opt/kafka-0.11.0.0"
{noformat}
This links seems to be broken and automated tests executed on docker fails with 
error:
{noformat}
log: /bin/sh -c mkdir -p "/opt/kafka-0.8.2.2" && curl -s 
"${MIRROR}kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 
-C "/opt/kafka-0.8.2.2"' returned a non-zero code: 2
{noformat}

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6536) Streams quickstart pom.xml is missing versions for a bunch of plugins

2018-02-15 Thread Yaswanth Kumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365411#comment-16365411
 ] 

Yaswanth Kumar commented on KAFKA-6536:
---

Great thanks!

> Streams quickstart pom.xml is missing versions for a bunch of plugins
> -
>
> Key: KAFKA-6536
> URL: https://issues.apache.org/jira/browse/KAFKA-6536
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0, 0.11.0.2, 1.0.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Yaswanth Kumar
>Priority: Major
>  Labels: newbie
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> There are a bunch of plugins being used that maven helpfully warns you about 
> being unversioned:
> {code:java}
> > [INFO] Scanning for projects...
> > [WARNING]
> > [WARNING] Some problems were encountered while building the effective model 
> > for org.apache.kafka:streams-quickstart-java:maven-archetype:1.0.1
> > [WARNING] 'build.plugins.plugin.version' for 
> > org.apache.maven.plugins:maven-shade-plugin is missing. @ 
> > org.apache.kafka:streams-quickstart:1.0.1, 
> > /Users/ewencp/kafka.git/.release_work_dir/kafka/streams/quickstart/pom.xml, 
> > line 64, column 21
> > [WARNING] 'build.plugins.plugin.version' for 
> > com.github.siom79.japicmp:japicmp-maven-plugin is missing. @ 
> > org.apache.kafka:streams-quickstart:1.0.1, 
> > /Users/ewencp/kafka.git/.release_work_dir/kafka/streams/quickstart/pom.xml, 
> > line 74, column 21
> > [WARNING]
> > [WARNING] Some problems were encountered while building the effective model 
> > for org.apache.kafka:streams-quickstart:pom:1.0.1
> > [WARNING] 'build.plugins.plugin.version' for 
> > org.apache.maven.plugins:maven-shade-plugin is missing. @ line 64, column 21
> > [WARNING] 'build.plugins.plugin.version' for 
> > com.github.siom79.japicmp:japicmp-maven-plugin is missing. @ line 74, 
> > column 21
> > [WARNING]
> > [WARNING] It is highly recommended to fix these problems because they 
> > threaten the stability of your build.
> > [WARNING]
> > [WARNING] For this reason, future Maven versions might no longer support 
> > building such malformed projects.{code}
> Unversioned dependencies are dangerous as they make the build 
> non-reproducible. In fact, a released version may become very difficult to 
> build as the user would have to track down the working versions of the 
> plugins. This seems particularly bad for the quickstart as it's likely to be 
> copy/pasted into people's own projects.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6563) Kafka online backup

2018-02-15 Thread Werner Daehn (JIRA)
Werner Daehn created KAFKA-6563:
---

 Summary: Kafka online backup
 Key: KAFKA-6563
 URL: https://issues.apache.org/jira/browse/KAFKA-6563
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Werner Daehn


If you consider Kafka to be a "database" with "transaction logs", you need a 
way for backup/recovery just like databases do.

The beauty of such a solution would be to enable Kafka for smaller scenarios 
where you do not want to have a large cluster. You could even use a single node 
Kafka. In worst case you lose all data since the backup and you have to ask the 
sources to send that data again - for most that is possible.

 

Currently you have multiple options, none of which are good.
 # Setup Kafka fault tolerant and with replication factors: Needs a larger 
server and does not prevent many types of problems like software bugs, deleting 
a topic by accident,...
 # Mirror Kafka: Very expensive.
 # Shutdown Kafka, disk copy, startup Kafka
 # Add a database before Kafka as a primary persistence: Very very expensive 
and forfeits the idea of Kafka

 

I wonder what really is needed for an online backup strategy. If I am not 
mistaken it is relatively little. 
 * A command that causes Kafka to switch to new files so that the file 
containing all past data do not change any longer.
 * Export of the current Zookeeper values, unless they can be recreated from 
the transaction log files anyhow.
 * Then you can backup the Kafka files
 * A command that tells that the backup is finished to cleanup things.
 * Later a way to merge the recovered backup instance with the Kafka log 
written since then up to a certain point. Example: Backup was taken at 
midnight, delete topic was done a 11:00. You start with the backup, apply the 
logs until 10:59 and then you bring up Kafka fully online again.

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)