[jira] [Commented] (KAFKA-6568) LogCleanerManager.doneDeleting() should check the partition state before deleting the in progress partition
[ https://issues.apache.org/jira/browse/KAFKA-6568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366569#comment-16366569 ] ASF GitHub Bot commented on KAFKA-6568: --- becketqin opened a new pull request #4580: KAFKA-6568; The log cleaner should check the partition state before r… URL: https://github.com/apache/kafka/pull/4580 …emoving it from the inProgress map. The log cleaner should not naively remove the partition from in progress map without checking the partition state. This may cause the other thread calling `LogCleanerManager.abortAndPauseCleaning()` to hang in definitely. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > LogCleanerManager.doneDeleting() should check the partition state before > deleting the in progress partition > --- > > Key: KAFKA-6568 > URL: https://issues.apache.org/jira/browse/KAFKA-6568 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Blocker > Fix For: 1.1.0 > > > {{LogCleanerManager.doneDeleting()}} removes the partition from the > {{inProgress}} map without checking if the partition is paused or not. This > will cause the paused partition state to be lost, and may also cause another > thread calling {{LogCleanerManager.abortAndPauseCleaning()}} to block > indefinitely waiting on the partition state to become paused. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4277) creating ephemeral node already exist
[ https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366546#comment-16366546 ] Jun Rao commented on KAFKA-4277: [~nico.meyer], do you know if the conflicting ephemeral node was there temporarily or permanently? From the ZK server commit log, was there any indication that the ephemeral node was removed after session expiration, and if so, when? > creating ephemeral node already exist > - > > Key: KAFKA-4277 > URL: https://issues.apache.org/jira/browse/KAFKA-4277 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.0 >Reporter: Feixiang Yan >Priority: Major > > I use zookeeper 3.4.6. > Zookeeper session time out, zkClient try reconnect failed. Then re-establish > the session and re-registering broker info in ZK, throws NODEEXISTS Exception. > I think it is because the ephemeral node which created by old session has > not removed. > I read the > [ZkUtils.scala|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/utils/ZkUtils.scala] > of 0.8.1, createEphemeralPathExpectConflictHandleZKBug try create node in a > while loop until create success. This can solve the issue. But in > [ZkUtils.scala|https://github.com/apache/kafka/blob/0.10.0.1/core/src/main/scala/kafka/utils/ZkUtils.scala] > 0.10.1 the function removed. > {noformat} > [2016-10-07 19:00:32,562] INFO Socket connection established to > 10.191.155.238/10.191.155.238:21819, initiating session > (org.apache.zookeeper.ClientCnxn) > [2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) > (org.I0Itec.zkclient.ZkClient) > [2016-10-07 19:00:32,564] INFO Unable to reconnect to ZooKeeper service, > session 0x1576b11f9b201bd has expired, closing socket connection > (org.apache.zookeeper.ClientCnxn) > [2016-10-07 19:00:32,564] INFO Initiating client connection, > connectString=10.191.155.237:21819,10.191.155.238:21819,10.191.155.239:21819/cluster2 > sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@ae71be2 > (org.apache.zookeeper.ZooKeeper) > [2016-10-07 19:00:32,566] INFO Opening socket connection to server > 10.191.155.237/10.191.155.237:21819. Will not attempt to authenticate using > SASL (unknown error) (org.apache.zookeeper.ClientCnxn) > [2016-10-07 19:00:32,566] INFO Socket connection established to > 10.191.155.237/10.191.155.237:21819, initiating session > (org.apache.zookeeper.ClientCnxn) > [2016-10-07 19:00:32,566] INFO EventThread shut down > (org.apache.zookeeper.ClientCnxn) > [2016-10-07 19:00:32,567] INFO Session establishment complete on server > 10.191.155.237/10.191.155.237:21819, sessionid = 0x1579ecd39c20006, > negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) > [2016-10-07 19:00:32,567] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > [2016-10-07 19:00:32,608] INFO re-registering broker info in ZK for broker 3 > (kafka.server.KafkaHealthcheck$SessionExpireListener) > [2016-10-07 19:00:32,610] INFO Creating /brokers/ids/3 (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > [2016-10-07 19:00:32,611] INFO Result of znode creation is: NODEEXISTS > (kafka.utils.ZKCheckedEphemeral) > [2016-10-07 19:00:32,614] ERROR Error handling event ZkEvent[New session > event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@324f1bc] > (org.I0Itec.zkclient.ZkEventThread) > java.lang.RuntimeException: A broker is already registered on the path > /brokers/ids/3. This probably indicates that you either have configured a > brokerid that is already in use, or else you have shutdown this broker and > restarted it faster than the zookeeper timeout so it appears to be > re-registering. > at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:305) > at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:291) > at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70) > at > kafka.server.KafkaHealthcheck$SessionExpireListener.handleNewSession(KafkaHealthcheck.scala:104) > at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366493#comment-16366493 ] Matthias J. Sax commented on KAFKA-6555: Your observation is correct. Using StandbyTasks is also tricky to get right and it requires to configure standby tasks in the first place... I guess, this is the reason why Guozhang suggested to have a KIP discussion. Also, if we see both approaches as complementary, the question is what should the "query order be" first try the StandbyTask? And if no StandbyTask available fall back to the restoring task? > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable resulting in the partition downtime. We > can make the state store partition queryable for the data already present in > the state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366286#comment-16366286 ] Guozhang Wang edited comment on KAFKA-5285 at 2/16/18 12:56 AM: [~davispw] [~xvrl] Thinking about this a bit more, I think we can provide a better heuristic to define the upper and lower bound for window store {{fetch(keyFrom, keyTo, timeFrom, timeTo)}} indeed, which is the following: 0. First of all, documentation wise we require users to make sure the serialized bytes of {{keyFrom}} is smaller than serialized bytes of {{keyTo}} lexicograpically. And inside this call, we first check the bytes for this condition. 1. For lower range, we just use the {{keyFrom bytes}}, instead of {{keyFrom + minSuffix (s)}}. Since we know there would be no data in between these two keys at all, we can save some overhead of bytes alloc and puts. 2. For upper range, we just consider the following two combinations: a). keyFrom + minSuffix b). keyTo + maxSuffix where minSuffix = timeFrom + seq0, maxSuffix = timeTo + seqMAX. And then we just pick the largest among these two. In all we would replace this with the current {{OrderedBytes}} implementations. Similarly, for session store {{fetch}}, we do the same thing, except for single key fetch we handle it more in a more optimized way: 0. For single key fetch, lower range = key + 0 + timeEndEarliest, upper range = key + timeStartLatest + MAX (this is because timeTo <= timeFrom, so they can be the bound of each other). 1. For range key fetch, lower range = keyFrom. 2. For range key fetch, upper range = MAX (keyFrom + 0 + timeEndEarliest, keyTo + timeStartLatest + MAX). WDYT? was (Author: guozhang): [~davispw] [~xvrl] Thinking about this a bit more, I think we can provide a better heuristic to define the upper and lower bound for window store {{fetch(keyFrom, keyTo, timeFrom, timeTo)}} indeed, which is the following: 0. First of all, documentation wise we require users to make sure the serialized bytes of {{keyFrom}} is smaller than serialized bytes of {{keyTo}} lexicograpically. And inside this call, we first check the bytes for this condition. 1. For lower range, we just use the {{keyFrom bytes}}, instead of {{keyFrom + minSuffix (s)}}. Since we know there would be no data in between these two keys at all, we can save some overhead of bytes alloc and puts. 2. For upper range, we just consider the following two combinations: a). keyFrom + minSuffix b). keyTo + maxSuffix where minSuffix = timeFrom + seq0, maxSuffix = timeTo + seqMAX. And then we just pick the largest among these two. In all we would replace this with the current {{OrderedBytes}} implementations. Similarly, for session store {{fetch}}, we do the same thing, except for single key fetch we handle it more in a more optimized way: 0. For single key fetch, lower range = key + timeFrom + timFrom, upper range = key + timeTo + timeTo (this is because timeTo <= timeFrom, so they can be the bound of each other). 1. For range key fetch, lower range = keyFrom. 2. For range key fetch, upper range = MAX (keyFrom + timeFrom + timeFrom, keyTo + timeTo + timeTo). WDYT? > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Guozhang Wang >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5802) ScramServerCallbackHandler#handle should check username not being null before calling credentialCache.get()
[ https://issues.apache.org/jira/browse/KAFKA-5802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302643#comment-16302643 ] Ted Yu edited comment on KAFKA-5802 at 2/16/18 12:52 AM: - +1 from me. was (Author: yuzhih...@gmail.com): +1 > ScramServerCallbackHandler#handle should check username not being null before > calling credentialCache.get() > --- > > Key: KAFKA-5802 > URL: https://issues.apache.org/jira/browse/KAFKA-5802 > Project: Kafka > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > String username = null; > for (Callback callback : callbacks) { > if (callback instanceof NameCallback) > username = ((NameCallback) callback).getDefaultName(); > else if (callback instanceof ScramCredentialCallback) > ((ScramCredentialCallback) > callback).scramCredential(credentialCache.get(username)); > {code} > Since ConcurrentHashMap, used by CredentialCache, doesn't allow null keys, we > should check that username is not null before calling credentialCache.get() -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366471#comment-16366471 ] Ashish Surana edited comment on KAFKA-6555 at 2/16/18 12:45 AM: Sure. Thanks [~mjsax]. This approach: Basically here we are allowing to query state-store in RESTORING state. I understand it's restoring and can have old data, but the same can't be guaranteed for replica's either. Finally we will have to open up replica's also for queries, but I believe this is complementary even with that. The approach that you are suggesting (KAFKA-6144) is to allow the queries from one of the replica's when the main-task is restoring. Few problems I can think of with this approach: * when there is no replica: the store will be down till main-task reaches running state * when there are 2 or more replica's: which replica will serve the requests (all might be at different states) Or we can make some of these decisions configurable and leave to the application developers to define for their use-cases. was (Author: asurana): Sure. Thanks [~mjsax]. This approach: Basically here we are allowing to query state-store in RESTORING state. I understand it's restoring and can have old data, but the same can't be guaranteed for replica's either. Finally we will have to open up replica's also for queries, but I believe this is complementary even with that. The approach that you are suggesting (KAFKA-6144) is to allow the queries from one of the replica's when the main-task is restoring. Few problems I can think of with this approach: * when there is no replica: the store will be down till main-task reaches running state * when there are 2 or more replica's: which replica will server the requests (all might be at different states) Or we can make some of these decisions configurable and leave to the application developers to define for their use-cases. > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable resulting in the partition downtime. We > can make the state store partition queryable for the data already present in > the state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366471#comment-16366471 ] Ashish Surana edited comment on KAFKA-6555 at 2/16/18 12:44 AM: Sure. Thanks [~mjsax]. This approach: Basically here we are allowing to query state-store in RESTORING state. I understand it's restoring and can have old data, but the same can't be guaranteed for replica's either. Finally we will have to open up replica's also for queries, but I believe this is complementary even with that. The approach that you are suggesting (KAFKA-6144) is to allow the queries from one of the replica's when the main-task is restoring. Few problems I can think of with this approach: * when there is no replica: the store will be down till main-task reaches running state * when there are 2 or more replica's: which replica will server the requests (all might be at different states) Or we can make some of these decisions configurable and leave to the application developers to define for their use-cases. was (Author: asurana): Sure. Thanks Matthias. This approach: Basically here we are allowing to query state-store in RESTORING state. I understand it's restoring and can have old data, but the same can't be guaranteed for replica's either. Finally we will have to open up replica's also for queries, but I believe this is complementary even with that. The approach that you are suggesting (KAFKA-6144) is to allow the queries from one of the replica's when the main-task is restoring. Few problems I can think of with this approach: * when there is no replica: the store will be down till main-task reaches running state * when there are 2 or more replica's: which replica will server the requests (all might be at different states) Or we can make some of these decisions configurable and leave to the application developers to define for their use-cases. > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable resulting in the partition downtime. We > can make the state store partition queryable for the data already present in > the state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366471#comment-16366471 ] Ashish Surana edited comment on KAFKA-6555 at 2/16/18 12:44 AM: Sure. Thanks Matthias. This approach: Basically here we are allowing to query state-store in RESTORING state. I understand it's restoring and can have old data, but the same can't be guaranteed for replica's either. Finally we will have to open up replica's also for queries, but I believe this is complementary even with that. The approach that you are suggesting (KAFKA-6144) is to allow the queries from one of the replica's when the main-task is restoring. Few problems I can think of with this approach: * when there is no replica: the store will be down till main-task reaches running state * when there are 2 or more replica's: which replica will server the requests (all might be at different states) Or we can make some of these decisions configurable and leave to the application developers to define for their use-cases. was (Author: asurana): Sure. This approach: Basically here we are allowing to query state-store in RESTORING state. I understand it's restoring and can have old data, but the same can't be guaranteed for replica's either. Finally we will have to open up replica's also for queries, but I believe this is complementary even with that. The approach that you are suggesting (KAFKA-6144) is to allow the queries from one of the replica's when the main-task is restoring. Few problems I can think of with this approach: * when there is no replica: the store will be down till main-task reaches running state * when there are 2 or more replica's: which replica will server the requests (all might be at different states) Or we can make some of these decisions configurable and leave to the application developers to define for their use-cases. > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable resulting in the partition downtime. We > can make the state store partition queryable for the data already present in > the state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366471#comment-16366471 ] Ashish Surana commented on KAFKA-6555: -- Sure. This approach: Basically here we are allowing to query state-store in RESTORING state. I understand it's restoring and can have old data, but the same can't be guaranteed for replica's either. Finally we will have to open up replica's also for queries, but I believe this is complementary even with that. The approach that you are suggesting (KAFKA-6144) is to allow the queries from one of the replica's when the main-task is restoring. Few problems I can think of with this approach: * when there is no replica: the store will be down till main-task reaches running state * when there are 2 or more replica's: which replica will server the requests (all might be at different states) Or we can make some of these decisions configurable and leave to the application developers to define for their use-cases. > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable resulting in the partition downtime. We > can make the state store partition queryable for the data already present in > the state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366462#comment-16366462 ] Matthias J. Sax commented on KAFKA-6555: [~asurana] I added you to the list of contributors and assigned the Jira to you. You can now assign Jiras to yourself, too. > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable resulting in the partition downtime. We > can make the state store partition queryable for the data already present in > the state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-6555: -- Assignee: Ashish Surana > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable resulting in the partition downtime. We > can make the state store partition queryable for the data already present in > the state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366459#comment-16366459 ] Matthias J. Sax commented on KAFKA-6555: I had a quick look at the PR and I am not sure, if we want to address the issue like this. You allow to query restoring tasks. This seems to be questionable from a semantical point of view. I guess, I know understand the difference to KAFKA-6144 – there, the idea is to allow to query StandbyTask is case the main task in restoring atm. The assumption is, that a StandbyTask hold data that is almost up-to-date while in you PR, you would allow to query very old data. Furthermore, as long as a StandbyTask would be queried, the data would not change – because the main task is restoring, the StandbyTask just sever the latest state. If we allow to query a task that is restoring, you would see different (old) data until processing resumes. Thus, it's quite different. Is this a correct summary? > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable resulting in the partition downtime. We > can make the state store partition queryable for the data already present in > the state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5660) Don't throw TopologyBuilderException during runtime
[ https://issues.apache.org/jira/browse/KAFKA-5660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5660: --- Description: {{TopologyBuilderException}} is a pre-runtime exception that should only be thrown before {{KafkaStreams#start()}} is called. However, we do throw {{TopologyBuilderException}} within - `SourceNodeFactory#getTopics` - `ProcessorContextImpl#getStateStore` - `StreamPartitionAssignor#prepareTopic ` (and maybe somewhere else: we should double check if there are other places in the code like those). We should replace those exception with either {{StreamsException}} or with a new exception type. was: {{TopologyBuilderException}} is a pre-runtime exception that should only be thrown {{KafkaStreams#start()}} is called. However, we do throw {{TopologyBuilderException}} within - `SourceNodeFactory#getTopics` - `ProcessorContextImpl#getStateStore` - `StreamPartitionAssignor#prepareTopic ` (and maybe somewhere else: we should double check if there are other places in the code like those). We should replace those exception with either {{StreamsException}} or with a new exception type. > Don't throw TopologyBuilderException during runtime > --- > > Key: KAFKA-5660 > URL: https://issues.apache.org/jira/browse/KAFKA-5660 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Matthias J. Sax >Assignee: Nick Afshartous >Priority: Major > > {{TopologyBuilderException}} is a pre-runtime exception that should only be > thrown before {{KafkaStreams#start()}} is called. > However, we do throw {{TopologyBuilderException}} within > - `SourceNodeFactory#getTopics` > - `ProcessorContextImpl#getStateStore` > - `StreamPartitionAssignor#prepareTopic ` > (and maybe somewhere else: we should double check if there are other places > in the code like those). > We should replace those exception with either {{StreamsException}} or with a > new exception type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5660) Don't throw TopologyBuilderException during runtime
[ https://issues.apache.org/jira/browse/KAFKA-5660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366448#comment-16366448 ] Matthias J. Sax commented on KAFKA-5660: Thanks, [~nafshartous]. I updated the description. > Don't throw TopologyBuilderException during runtime > --- > > Key: KAFKA-5660 > URL: https://issues.apache.org/jira/browse/KAFKA-5660 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Matthias J. Sax >Assignee: Nick Afshartous >Priority: Major > > {{TopologyBuilderException}} is a pre-runtime exception that should only be > thrown before {{KafkaStreams#start()}} is called. > However, we do throw {{TopologyBuilderException}} within > - `SourceNodeFactory#getTopics` > - `ProcessorContextImpl#getStateStore` > - `StreamPartitionAssignor#prepareTopic ` > (and maybe somewhere else: we should double check if there are other places > in the code like those). > We should replace those exception with either {{StreamsException}} or with a > new exception type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361515#comment-16361515 ] Ashish Surana edited comment on KAFKA-6555 at 2/15/18 11:42 PM: Hi Matthias, Thanks for pointing to KAFKA-6144. As I go through it, it looks very similar ticket but with minor difference: * I am suggesting to allow stale reads only from PARTITION_ASSIGNED (not from PARTITION_REVOKED) primarily as it's going to be the one in RUNNING state, and this is the minimum we need to do keep serving request for this partition. We still have only one instance doing read/write, and will have pure standby replicas. This approach is good if we continue with current design of one active write & read instance. I have made the changes, and can share it in few days. was (Author: asurana): Hi Matthias, Thanks for pointing to KAFKA-6144. As I go through it, it looks very similar ticket but with minor difference: * I am suggesting to allow stale reads only from PARTITION_ASSIGNED (not from PARTITION_REVOKED) primarily as it's going to be the one in RUNNING state, and this is the minimum we need to do keep serving request for this partition. We still have one instance doing write or read and still want to have pure standby replicas. This approach is good if we continue with current design of one active write/read instance. I have made the changes, and can share it in few days. > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable resulting in the partition downtime. We > can make the state store partition queryable for the data already present in > the state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish Surana updated KAFKA-6555: - Description: State store in Kafka streams are currently only queryable when StreamTask is in RUNNING state. The idea is to make it queryable even in the RESTORATION (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and making the data inaccessible during this time could be downtime not suitable for many applications. When the active partition goes down then one of the following occurs: # One of the standby replica partition gets promoted to active: Replica task has to restore the remaining state from the changelog topic before it can become RUNNING. The time taken for this depends on how much the replica is lagging behind. During this restoration time the state store for that partition is currently not queryable resulting in the partition downtime. We can make the state store partition queryable for the data already present in the state store. # When there is no replica or standby task, then active task will be started in one of the existing node. That node has to build the entire state from the changelog topic which can take lot of time depending on how big is the changelog topic, and keeping state store not queryable during this time is the downtime for the parition. It's very important improvement as it could simply improve the availability of microservices developed using kafka streams. I am working on a patch for this change. Any feedback or comments are welcome. was: State store in Kafka streams are currently only queryable when StreamTask is in RUNNING state. The idea is to make it queryable even in the RESTORATION (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and making the data inaccessible during this time could be downtime not suitable for many applications. When the active partition goes down then one of the following occurs: # One of the standby replica partition gets promoted to active: Replica task has to restore the remaining state from the changelog topic before it can become RUNNING. The time taken for this depends on how much the replica is lagging behind. During this restoration time the state store for that partition is currently not queryable giving making the partition down. We can make the state store partition queryable for the data already present in the state store. # When there is no replica or standby task, then active task will be started in one of the existing node. That node has to build the entire state from the changelog topic which can take lot of time depending on how big is the changelog topic, and keeping state store not queryable during this time is the downtime for the parition. It's very important improvement as it could simply improve the availability of microservices developed using kafka streams. I am working on a patch for this change. Any feedback or comments are welcome. > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable resulting in the partition downtime. We > can make the state store partition queryable for the data already present in > the state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread
[ https://issues.apache.org/jira/browse/KAFKA-6541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366412#comment-16366412 ] ASF GitHub Bot commented on KAFKA-6541: --- hachikuji closed pull request #4548: KAFKA-6541: Fixed a StackOverflow bug in kafka-coordinator-heartbeat-thread URL: https://github.com/apache/kafka/pull/4548 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 0747e8db146..a0d4ed8f57c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -57,6 +57,7 @@ // flag and the request completion queue below). private final Logger log; private final KafkaClient client; +private boolean disconnecting = false; private final UnsentRequests unsent = new UnsentRequests(); private final Metadata metadata; private final Time time; @@ -387,9 +388,13 @@ private void checkDisconnects(long now) { } public void disconnect(Node node) { -failUnsentRequests(node, DisconnectException.INSTANCE); - synchronized (this) { +if(disconnecting){ +return; +} +disconnecting=true; +failUnsentRequests(node, DisconnectException.INSTANCE); + client.disconnect(node.idString()); } @@ -483,6 +488,7 @@ public boolean connectionFailed(Node node) { public void tryConnect(Node node) { synchronized (this) { client.ready(node, time.milliseconds()); +disconnecting = false; } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread > -- > > Key: KAFKA-6541 > URL: https://issues.apache.org/jira/browse/KAFKA-6541 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0 > Environment: Linux >Reporter: Anh Le >Priority: Major > > There's something wrong with our client library when sending heart beats. > This bug seems to be identical to this one: > [http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E] > > Here's the log: > > {quote}2018-02-08 13:55:01,102 ERROR > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread > Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | > default-group': > java.lang.StackOverflowError: null > at java.lang.StringBuilder.append(StringBuilder.java:136) > at > org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302) > at > org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271) > at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233) > at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173) > at > ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293) > at > ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206) > at > ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223) > at > ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102) > at > ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84) > at > ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51) > at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270) > at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257) > at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421) > at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398) > at ch.qos.logback.classic.Logger.info(Logger.java:583) > at > org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341) > at >
[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366411#comment-16366411 ] Gunnar Morling commented on KAFKA-6566: --- +1 for more exploration whether calling stop() was the original intention. FWIW, several comments in Debezium indicate that this at least was the asumption on that side when the code was written (of course that assumption may have been right or wrong). Note that calling stop() would be the only chance for the task to clean up its resources after an exception on the KC side of the polling loop (while we could have our entire polling logic in a try/catch block, there's no chance to react to exceptions in WorkerSourceTask). > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Mapprops) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List
[jira] [Resolved] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread
[ https://issues.apache.org/jira/browse/KAFKA-6541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6541. Resolution: Duplicate Closing as duplicate for now. Please reopen if you think the issue has not been fixed. > StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread > -- > > Key: KAFKA-6541 > URL: https://issues.apache.org/jira/browse/KAFKA-6541 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0 > Environment: Linux >Reporter: Anh Le >Priority: Major > > There's something wrong with our client library when sending heart beats. > This bug seems to be identical to this one: > [http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E] > > Here's the log: > > {quote}2018-02-08 13:55:01,102 ERROR > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread > Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | > default-group': > java.lang.StackOverflowError: null > at java.lang.StringBuilder.append(StringBuilder.java:136) > at > org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302) > at > org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271) > at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233) > at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173) > at > ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293) > at > ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206) > at > ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223) > at > ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102) > at > ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84) > at > ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51) > at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270) > at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257) > at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421) > at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398) > at ch.qos.logback.classic.Logger.info(Logger.java:583) > at > org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388) > at >
[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366404#comment-16366404 ] Ted Yu commented on KAFKA-6566: --- Thanks for the comment, Randall. I did a quick search but haven't found whether not calling {{stop()}} was intentional. Will spend more time investigating related aspects. > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Mapprops) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List
[jira] [Commented] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor
[ https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366399#comment-16366399 ] Guozhang Wang commented on KAFKA-4969: -- Yup, re-opening to keep track. > State-store workload-aware StreamsPartitionAssignor > --- > > Key: KAFKA-4969 > URL: https://issues.apache.org/jira/browse/KAFKA-4969 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Bill Bejeck >Priority: Major > Fix For: 1.1.0, 1.0.1 > > > Currently, {{StreamPartitionsAssigner}} does not distinguish different > "types" of tasks. For example, task can be stateless of have one or multiple > stores. > This can lead to an suboptimal task placement: assume there are 2 stateless > and 2 stateful tasks and the app is running with 2 instances. To share the > "store load" it would be good to place one stateless and one stateful task > per instance. Right now, there is no guarantee about this, and it can happen, > that one instance processed both stateless tasks while the other processes > both stateful tasks. > We should improve {{StreamPartitionAssignor}} and introduce "task types" > including a cost model for task placement. We should consider the following > parameters: > - number of stores > - number of sources/sinks > - number of processors > - regular task vs standby task > This improvement should be backed by a design document in the project wiki > (no KIP required though) as it's a fairly complex change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor
[ https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reopened KAFKA-4969: -- > State-store workload-aware StreamsPartitionAssignor > --- > > Key: KAFKA-4969 > URL: https://issues.apache.org/jira/browse/KAFKA-4969 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Bill Bejeck >Priority: Major > Fix For: 1.1.0, 1.0.1 > > > Currently, {{StreamPartitionsAssigner}} does not distinguish different > "types" of tasks. For example, task can be stateless of have one or multiple > stores. > This can lead to an suboptimal task placement: assume there are 2 stateless > and 2 stateful tasks and the app is running with 2 instances. To share the > "store load" it would be good to place one stateless and one stateful task > per instance. Right now, there is no guarantee about this, and it can happen, > that one instance processed both stateless tasks while the other processes > both stateful tasks. > We should improve {{StreamPartitionAssignor}} and introduce "task types" > including a cost model for task placement. We should consider the following > parameters: > - number of stores > - number of sources/sinks > - number of processors > - regular task vs standby task > This improvement should be backed by a design document in the project wiki > (no KIP required though) as it's a fairly complex change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-5285: Assignee: Guozhang Wang (was: Xavier Léauté) > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Guozhang Wang >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366398#comment-16366398 ] Ashish Surana commented on KAFKA-6555: -- Raised PR @[https://github.com/apache/kafka/pull/4575] Please review and let me know your comments. These are the minimal changes required, so not sure if we need kip. > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable giving making the partition down. We can > make the state store partition queryable for the data already present in the > state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-6566: -- Affects Version/s: 1.0.0 > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Mapprops) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List
[jira] [Commented] (KAFKA-6560) Use single-point queries than range queries for windowed aggregation operators
[ https://issues.apache.org/jira/browse/KAFKA-6560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366392#comment-16366392 ] ASF GitHub Bot commented on KAFKA-6560: --- guozhangwang opened a new pull request #4578: KAFKA-6560: Replace range query with newly added single point query in Windowed Aggregation [WIP] URL: https://github.com/apache/kafka/pull/4578 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use single-point queries than range queries for windowed aggregation operators > -- > > Key: KAFKA-6560 > URL: https://issues.apache.org/jira/browse/KAFKA-6560 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Critical > Labels: needs-kip > > Today for windowed aggregations in Streams DSL, the underlying implementation > is leveraging the fetch(key, from, to) API to get all the related windows for > a single record to update. However, this is a very inefficient operation with > significant amount of CPU time iterating over window stores. On the other > hand, since the operator implementation itself have full knowledge of the > window specs it can actually translate this operation into multiple > single-point queries with the accurate window start timestamp, which would > largely reduce the overhead. > The proposed approach is to add a single fetch API to the WindowedStore and > use that in the KStreamWindowedAggregate / KStreamWindowedReduce operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366390#comment-16366390 ] Randall Hauch commented on KAFKA-6566: -- Also, if we're not calling {{stop()}} on the task, we should find out if that was the original intension. If so, then there's nothing to fix here, or that if we want to change the behavior we'll need a KIP. However, if that was the intention and we're simply no longer doing it, then it's a bug that we can fix without a KIP. Regardless, it seems like there's quite a bit more investigation required before we know how the code should be changed. > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Mapprops) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List
[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366382#comment-16366382 ] Randall Hauch commented on KAFKA-6566: -- [~te...@apache.org], maybe we should first understand what happens to the other tasks and the connector when the `WorkerSourceTask` throws an exception. For example, if {{task.stop()}} should be called when the connector and all tasks are cleaned up, then we won't also want it in the suggested location. > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Mapprops) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List
[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-6566: - Component/s: KafkaConnect > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Mapprops) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List
[jira] [Updated] (KAFKA-6568) LogCleanerManager.doneDeleting() should check the partition state before deleting the in progress partition
[ https://issues.apache.org/jira/browse/KAFKA-6568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-6568: Description: {{LogCleanerManager.doneDeleting()}} removes the partition from the {{inProgress}} map without checking if the partition is paused or not. This will cause the paused partition state to be lost, and may also cause another thread calling {{LogCleanerManager.abortAndPauseCleaning()}} to block indefinitely waiting on the partition state to become paused. (was: {{LogCleanerManager.doneDeleting()}} removes the partition from the {{inProgress}} map without checking if the partition is paused or not. This will cause the paused partition state to be lost. And may cause another thread calling {{LogCleanerManager.abortAndPauseCleaning()}} to block indefinitely waiting on the partition state to become paused.) > LogCleanerManager.doneDeleting() should check the partition state before > deleting the in progress partition > --- > > Key: KAFKA-6568 > URL: https://issues.apache.org/jira/browse/KAFKA-6568 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Critical > Fix For: 1.1.0 > > > {{LogCleanerManager.doneDeleting()}} removes the partition from the > {{inProgress}} map without checking if the partition is paused or not. This > will cause the paused partition state to be lost, and may also cause another > thread calling {{LogCleanerManager.abortAndPauseCleaning()}} to block > indefinitely waiting on the partition state to become paused. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6568) LogCleanerManager.doneDeleting() should check the partition state before deleting the in progress partition
Jiangjie Qin created KAFKA-6568: --- Summary: LogCleanerManager.doneDeleting() should check the partition state before deleting the in progress partition Key: KAFKA-6568 URL: https://issues.apache.org/jira/browse/KAFKA-6568 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.2, 1.0.0, 0.10.2.1, 0.10.1.1 Reporter: Jiangjie Qin Assignee: Jiangjie Qin Fix For: 1.1.0 {{LogCleanerManager.doneDeleting()}} removes the partition from the {{inProgress}} map without checking if the partition is paused or not. This will cause the paused partition state to be lost. And may cause another thread calling {{LogCleanerManager.abortAndPauseCleaning()}} to block indefinitely waiting on the partition state to become paused. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5660) Don't throw TopologyBuilderException during runtime
[ https://issues.apache.org/jira/browse/KAFKA-5660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366360#comment-16366360 ] Nick Afshartous commented on KAFKA-5660: Seems like the first sentence of the description is missing "before" should only be thrown *before* {{KafkaStreams#start()}} is called. > Don't throw TopologyBuilderException during runtime > --- > > Key: KAFKA-5660 > URL: https://issues.apache.org/jira/browse/KAFKA-5660 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Matthias J. Sax >Assignee: Nick Afshartous >Priority: Major > > {{TopologyBuilderException}} is a pre-runtime exception that should only be > thrown {{KafkaStreams#start()}} is called. > However, we do throw {{TopologyBuilderException}} within > - `SourceNodeFactory#getTopics` > - `ProcessorContextImpl#getStateStore` > - `StreamPartitionAssignor#prepareTopic ` > (and maybe somewhere else: we should double check if there are other places > in the code like those). > We should replace those exception with either {{StreamsException}} or with a > new exception type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366357#comment-16366357 ] ASF GitHub Bot commented on KAFKA-6566: --- tedyu opened a new pull request #4577: KAFKA-6566 SourceTask#stop() not called after exception raised in poll() URL: https://github.com/apache/kafka/pull/4577 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug >Reporter: Gunnar Morling >Priority: Major > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Mapprops) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List
[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366331#comment-16366331 ] ASF GitHub Bot commented on KAFKA-5285: --- guozhangwang opened a new pull request #4576: KAFKA-5285: Use optimized upper and lower bound for window key schema [WIP] URL: https://github.com/apache/kafka/pull/4576 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Xavier Léauté >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6567) KStreamWindowReduce can be replaced by KStreamWindowAggregate
Guozhang Wang created KAFKA-6567: Summary: KStreamWindowReduce can be replaced by KStreamWindowAggregate Key: KAFKA-6567 URL: https://issues.apache.org/jira/browse/KAFKA-6567 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang This is a tech debt worth cleaning up: KStreamWindowReduce should be able to be replaced by KStreamWindowAggregate. In fact, we have already done this for session windows, where in {{SessionWindowedKStreamImpl}} we use {{reduceInitializer}}, {{aggregatorForReducer}} and {{mergerForAggregator}} to replace reducer with aggregator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor
[ https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366299#comment-16366299 ] Matthias J. Sax commented on KAFKA-4969: Should we really close this JIRA? I think we could do even better and the description covers more than addressed so far. (As alternative, we can also create a new ticket, and keep this one with a limited scope). WDYT? > State-store workload-aware StreamsPartitionAssignor > --- > > Key: KAFKA-4969 > URL: https://issues.apache.org/jira/browse/KAFKA-4969 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Bill Bejeck >Priority: Major > Fix For: 1.1.0, 1.0.1 > > > Currently, {{StreamPartitionsAssigner}} does not distinguish different > "types" of tasks. For example, task can be stateless of have one or multiple > stores. > This can lead to an suboptimal task placement: assume there are 2 stateless > and 2 stateful tasks and the app is running with 2 instances. To share the > "store load" it would be good to place one stateless and one stateful task > per instance. Right now, there is no guarantee about this, and it can happen, > that one instance processed both stateless tasks while the other processes > both stateful tasks. > We should improve {{StreamPartitionAssignor}} and introduce "task types" > including a cost model for task placement. We should consider the following > parameters: > - number of stores > - number of sources/sinks > - number of processors > - regular task vs standby task > This improvement should be backed by a design document in the project wiki > (no KIP required though) as it's a fairly complex change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
Gunnar Morling created KAFKA-6566: - Summary: SourceTask#stop() not called after exception raised in poll() Key: KAFKA-6566 URL: https://issues.apache.org/jira/browse/KAFKA-6566 Project: Kafka Issue Type: Bug Reporter: Gunnar Morling Having discussed this with [~rhauch], it has been my assumption that {{SourceTask#stop()}} will be called by the Kafka Connect framework in case an exception has been raised in {{poll()}}. That's not the case, though. As an example see the connector and task below. Calling {{stop()}} after an exception in {{poll()}} seems like a very useful action to take, as it'll allow the task to clean up any resources such as releasing any database connections, right after that failure and not only once the connector is stopped. {code} package com.example; import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; public class TestConnector extends SourceConnector { @Override public String version() { return null; } @Override public void start(Mapprops) { } @Override public Class taskClass() { return TestTask.class; } @Override public List
[jira] [Updated] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor
[ https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-4969: - Fix Version/s: 1.0.1 > State-store workload-aware StreamsPartitionAssignor > --- > > Key: KAFKA-4969 > URL: https://issues.apache.org/jira/browse/KAFKA-4969 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Bill Bejeck >Priority: Major > Fix For: 1.1.0, 1.0.1 > > > Currently, {{StreamPartitionsAssigner}} does not distinguish different > "types" of tasks. For example, task can be stateless of have one or multiple > stores. > This can lead to an suboptimal task placement: assume there are 2 stateless > and 2 stateful tasks and the app is running with 2 instances. To share the > "store load" it would be good to place one stateless and one stateful task > per instance. Right now, there is no guarantee about this, and it can happen, > that one instance processed both stateless tasks while the other processes > both stateful tasks. > We should improve {{StreamPartitionAssignor}} and introduce "task types" > including a cost model for task placement. We should consider the following > parameters: > - number of stores > - number of sources/sinks > - number of processors > - regular task vs standby task > This improvement should be backed by a design document in the project wiki > (no KIP required though) as it's a fairly complex change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366286#comment-16366286 ] Guozhang Wang commented on KAFKA-5285: -- [~davispw] [~xvrl] Thinking about this a bit more, I think we can provide a better heuristic to define the upper and lower bound for window store {{fetch(keyFrom, keyTo, timeFrom, timeTo)}} indeed, which is the following: 0. First of all, documentation wise we require users to make sure the serialized bytes of {{keyFrom}} is smaller than serialized bytes of {{keyTo}} lexicograpically. And inside this call, we first check the bytes for this condition. 1. For lower range, we just use the {{keyFrom bytes}}, instead of {{keyFrom + minSuffix (s)}}. Since we know there would be no data in between these two keys at all, we can save some overhead of bytes alloc and puts. 2. For upper range, we just consider the following two combinations: a). keyFrom + minSuffix b). keyTo + maxSuffix where minSuffix = timeFrom + seq0, maxSuffix = timeTo + seqMAX. And then we just pick the largest among these two. In all we would replace this with the current {{OrderedBytes}} implementations. Similarly, for session store {{fetch}}, we do the same thing, except for single key fetch we handle it more in a more optimized way: 0. For single key fetch, lower range = key + timeFrom + timFrom, upper range = key + timeTo + timeTo (this is because timeTo <= timeFrom, so they can be the bound of each other). 1. For range key fetch, lower range = keyFrom. 2. For range key fetch, upper range = MAX (keyFrom + timeFrom + timeFrom, keyTo + timeTo + timeTo). WDYT? > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Xavier Léauté >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6549) Deadlock while processing Controller Events
[ https://issues.apache.org/jira/browse/KAFKA-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6549. --- Resolution: Fixed Fixed under KAFKA-6517. > Deadlock while processing Controller Events > --- > > Key: KAFKA-6549 > URL: https://issues.apache.org/jira/browse/KAFKA-6549 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Manikumar >Assignee: Manikumar >Priority: Blocker > Fix For: 1.1.0 > > Attachments: td.txt > > > Stack traces from a single node test cluster that was deadlocked while > processing controller Reelect and Expire events. Attached stack-trace. > {quote} > "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 > nid=0x7d03 waiting on condition [0x7278b000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bccadf30> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363) > at > org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531) > at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506) > Locked ownable synchronizers: > - <0x000780054860> (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > > "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 > nid=0xad03 waiting on condition [0x73fd3000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bcc584a0> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148) > at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1439) > at > kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1432) > at > kafka.zk.KafkaZkClient.registerZNodeChangeHandlerAndCheckExistence(KafkaZkClient.scala:1171) > at > kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1475) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) > at
[jira] [Updated] (KAFKA-6565) Sudden unresponsiveness from broker + file descriptor leak
[ https://issues.apache.org/jira/browse/KAFKA-6565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Noam Berman updated KAFKA-6565: --- Description: Running 3 kafka clusters v0.10.1.0 on aws, 5 brokers each (+5 zookeepers), in the past week about 2-3 times a day (on all clusters, no specific brokers) we encounter a situation in which a single broker stops responding to all requests from clients/other brokers. at this moment all produced messages fail, and other brokers start writing this to log repeatedly: [2018-02-15 17:56:08,949] WARN [ReplicaFetcherThread-0-878], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@772ad40c (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 878 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) producers (examples): 2018-02-13 21:30:51:010 thread=[kafka-producer-network-thread | catalog-index-0] WARN Got error produce response with correlation id 8147 on topic-partition catalog-1, retrying (49 attempts left). Error: REQUEST_TIMED_OUT 2018-02-13 21:31:06:221 thread=[kafka-producer-network-thread | catalog-index-0] WARN Got error produce response with correlation id 8166 on topic-partition catalog-index-18, retrying (48 attempts left). Error: NOT_LEADER_FOR_PARTITION Once this happens, file descriptors on the affected broker start increasing at a high rate until it reaches its maximum. no relevant info logs on the affected broker, we'll try to gather debug logs and attach them to the bug next time it happens. the big issue here is that although the broker is unresponsive, it stays in the cluster and its zookeeper node isn't cleared, so there's no rebalance whatsoever - all clients start failing until a manual shutdown occurs. was: Running 3 kafka clusters v0.10.1.0 on aws, 5 brokers each (+5 zookeepers), in the past week about 2-3 times a day (on all clusters, no specific brokers) we encounter a situation in which a single broker stops responding to all requests from clients/other brokers. at this moment all produced messages fail, and other brokers start writing this to log repeatedly: [2018-02-15 17:56:08,949] WARN [ReplicaFetcherThread-0-878], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@772ad40c (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 878 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238) at
[jira] [Updated] (KAFKA-6565) Sudden unresponsiveness from broker + file descriptor leak
[ https://issues.apache.org/jira/browse/KAFKA-6565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Noam Berman updated KAFKA-6565: --- Environment: 3 separate clusters running 0.10.1.0 with 5 nodes + 5 zookeepers. clients are mostly java 0.10.2.1, and a few nodejs clients (https://github.com/oleksiyk/kafka). throughput per broker: ~3.5MB per second. each broker is a leader of ~2500 partitions. Debian GNU/Linux 8.8 (jessie) java version “1.8.0_65” c4.4xlarge , 1500gb gp2 disks was: 3 separate clusters running 0.10.1.0 with 5 nodes + 5 zookeepers. clients are mostly java 0.10.2.1, and a few nodejs clients (https://github.com/oleksiyk/kafka). throughput per broker: ~3.5MB per second. each broker is a leader of ~2500 partitions. > Sudden unresponsiveness from broker + file descriptor leak > -- > > Key: KAFKA-6565 > URL: https://issues.apache.org/jira/browse/KAFKA-6565 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.0 > Environment: 3 separate clusters running 0.10.1.0 with 5 nodes + 5 > zookeepers. > clients are mostly java 0.10.2.1, and a few nodejs clients > (https://github.com/oleksiyk/kafka). > throughput per broker: ~3.5MB per second. each broker is a leader of ~2500 > partitions. > Debian GNU/Linux 8.8 (jessie) > java version “1.8.0_65” > c4.4xlarge , 1500gb gp2 disks >Reporter: Noam Berman >Priority: Blocker > > Running 3 kafka clusters v0.10.1.0 on aws, 5 brokers each (+5 zookeepers), > in the past week about 2-3 times a day (on all clusters, no specific brokers) > we encounter a situation in which a single broker stops responding to all > requests from clients/other brokers. at this moment all produced messages > fail, and other brokers start writing this to log repeatedly: > [2018-02-15 17:56:08,949] WARN [ReplicaFetcherThread-0-878], Error in fetch > kafka.server.ReplicaFetcherThread$FetchRequest@772ad40c > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 878 was disconnected before the response > was read > at > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115) > at > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112) > at scala.Option.foreach(Option.scala:257) > at > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112) > at > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108) > at > kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137) > at > kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) > at > kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108) > at > kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253) > at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238) > at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > > producers (examples): > 2018-02-13 21:30:51:010 thread=[kafka-producer-network-thread | > catalog-index-0] WARN Got error produce response with correlation id 8147 on > topic-partition catalog-1, retrying (49 attempts left). Error: > REQUEST_TIMED_OUT > 2018-02-13 21:31:06:221 thread=[kafka-producer-network-thread | > catalog-index-0] WARN Got error produce response with correlation id 8166 on > topic-partition catalog-index-18, retrying (48 attempts left). Error: > NOT_LEADER_FOR_PARTITION > > Once this happens, file descriptors on the affected broker start increasing > at a high rate until it reaches its maximum. > no relevant info logs on the affected broker, we'll try to gather debug logs > and attach them to the bug next time it happens. > the big issue here is that although the broker is unresponsive, it stays in > the cluster and its zookeeper node isn't cleared, so there's no rebalance > whatsoever - all clients start failing until a manual shutdown occurs. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6565) Sudden unresponsiveness from broker + file descriptor leak
Noam Berman created KAFKA-6565: -- Summary: Sudden unresponsiveness from broker + file descriptor leak Key: KAFKA-6565 URL: https://issues.apache.org/jira/browse/KAFKA-6565 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.10.1.0 Environment: 3 separate clusters running 0.10.1.0 with 5 nodes + 5 zookeepers. clients are mostly java 0.10.2.1, and a few nodejs clients (https://github.com/oleksiyk/kafka). throughput per broker: ~3.5MB per second. each broker is a leader of ~2500 partitions. Reporter: Noam Berman Running 3 kafka clusters v0.10.1.0 on aws, 5 brokers each (+5 zookeepers), in the past week about 2-3 times a day (on all clusters, no specific brokers) we encounter a situation in which a single broker stops responding to all requests from clients/other brokers. at this moment all produced messages fail, and other brokers start writing this to log repeatedly: [2018-02-15 17:56:08,949] WARN [ReplicaFetcherThread-0-878], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@772ad40c (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 878 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) Once this happens, file descriptors on the affected broker start increasing at a high rate until it reaches its maximum. no relevant info logs on the affected broker, we'll try to gather debug logs and attach them to the bug next time it happens. the big issue here is that although the broker is unresponsive, it stays in the cluster and its zookeeper node isn't cleared, so there's no rebalance whatsoever - all clients start failing until a manual shutdown occurs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6517) ZooKeeperClient holds a lock while waiting for responses, blocking shutdown
[ https://issues.apache.org/jira/browse/KAFKA-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366212#comment-16366212 ] ASF GitHub Bot commented on KAFKA-6517: --- rajinisivaram closed pull request #4551: KAFKA-6517: Avoid deadlock in ZooKeeperClient during session expiry URL: https://github.com/apache/kafka/pull/4551 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 9a1d16274df..3934fd0ad5d 100644 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -24,7 +24,7 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLat import com.yammer.metrics.core.{Gauge, MetricName} import kafka.metrics.KafkaMetricsGroup import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock} -import kafka.utils.Logging +import kafka.utils.{KafkaScheduler, Logging} import org.apache.kafka.common.utils.Time import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback, StatCallback, StringCallback, VoidCallback} import org.apache.zookeeper.KeeperException.Code @@ -59,6 +59,7 @@ class ZooKeeperClient(connectString: String, private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala private val inFlightRequests = new Semaphore(maxInFlightRequests) private val stateChangeHandlers = new ConcurrentHashMap[String, StateChangeHandler]().asScala + private[zookeeper] val expiryScheduler = new KafkaScheduler(0, "zk-session-expiry-handler") private val metricNames = Set[String]() @@ -90,6 +91,7 @@ class ZooKeeperClient(connectString: String, metricNames += "SessionState" + expiryScheduler.startup() waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS) override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = { @@ -122,7 +124,7 @@ class ZooKeeperClient(connectString: String, * response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). Otherwise, the most specific common supertype * will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]). */ - def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = inReadLock(initializationLock) { + def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { @@ -132,10 +134,12 @@ class ZooKeeperClient(connectString: String, requests.foreach { request => inFlightRequests.acquire() try { - send(request) { response => -responseQueue.add(response) -inFlightRequests.release() -countDownLatch.countDown() + inReadLock(initializationLock) { +send(request) { response => + responseQueue.add(response) + inFlightRequests.release() + countDownLatch.countDown() +} } } catch { case e: Throwable => @@ -148,7 +152,8 @@ class ZooKeeperClient(connectString: String, } } - private def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = { + // Visibility to override for testing + private[zookeeper] def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = { // Safe to cast as we always create a response of the right type def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response]) @@ -303,12 +308,18 @@ class ZooKeeperClient(connectString: String, stateChangeHandlers.clear() zooKeeper.close() metricNames.foreach(removeMetric(_)) +expiryScheduler.shutdown() info("Closed.") } def sessionId: Long = inReadLock(initializationLock) { zooKeeper.getSessionId } + + // Only for testing + private[zookeeper] def currentZooKeeper: ZooKeeper = inReadLock(initializationLock) { +zooKeeper + } private def initialize(): Unit = { if (!connectionState.isAlive) { @@ -352,12 +363,14 @@ class ZooKeeperClient(connectString: String, error("Auth failed.") stateChangeHandlers.values.foreach(_.onAuthFailure()) } else if (state == KeeperState.Expired) { -inWriteLock(initializationLock) { - info("Session expired.") - stateChangeHandlers.values.foreach(_.beforeInitializingSession()) - initialize() - stateChangeHandlers.values.foreach(_.afterInitializingSession()) -
[jira] [Issue Comment Deleted] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-5285: - Comment: was deleted (was: [~davispw] [~xvrl] Thinking about this a bit more, I think we can provide a better heuristic to define the upper and lower bound for window store {{fetch(keyFrom, keyTo, timeFrom, timeTo)}} indeed, which is to just consider the following four combinations: 1. keyFrom + minSuffix 2. keyTo + minSuffix 3. keyFrom + maxSuffix 4. keyTo + maxSuffix where minSuffix = timeFrom + seq0, and maxSuffix = timeTo + seqMAX. And then we just pick the smallest and largest among these four as lower and upper range, to replace the current {{OrderedBytes}} implementations. WDYT?) > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Xavier Léauté >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366165#comment-16366165 ] Guozhang Wang commented on KAFKA-5285: -- [~davispw] [~xvrl] Thinking about this a bit more, I think we can provide a better heuristic to define the upper and lower bound for window store {{fetch(keyFrom, keyTo, timeFrom, timeTo)}} indeed, which is to just consider the following four combinations: 1. keyFrom + minSuffix 2. keyTo + minSuffix 3. keyFrom + maxSuffix 4. keyTo + maxSuffix where minSuffix = timeFrom + seq0, and maxSuffix = timeTo + seqMAX. And then we just pick the smallest and largest among these four as lower and upper range, to replace the current {{OrderedBytes}} implementations. WDYT? > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Xavier Léauté >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-5285: - Summary: Optimize upper / lower byte range for key range scan on windowed stores (was: optimize upper / lower byte range for key range scan on windowed stores) > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Xavier Léauté >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6512) Java Producer: Excessive memory usage with compression enabled
[ https://issues.apache.org/jira/browse/KAFKA-6512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6512. --- Resolution: Fixed Fix Version/s: (was: 1.2.0) 1.1.0 Implemented options 1) and 2) from the description. > Java Producer: Excessive memory usage with compression enabled > -- > > Key: KAFKA-6512 > URL: https://issues.apache.org/jira/browse/KAFKA-6512 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0 > Environment: Windows 10 >Reporter: Kyle Tinker >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.0 > > Attachments: KafkaSender.java > > > h2. User Story > As a user of the Java producer, I want a predictable memory usage for the > Kafka client so that I can ensure that my system is sized appropriately and > will be stable even under heavy usage. > As a user of the Java producer, I want a smaller memory footprint so that my > systems don't consume as many resources. > h2. Acceptance Criteria > * Enabling Compression in Kafka should not significantly increase the memory > usage of Kafka > * The memory usage of Kafka's Java Producer should be roughly in line with > the buffer size (buffer.memory) and the number of producers declared. > h2. Additional Information > I've observed high memory usage in the producer when enabling compression > (gzip or lz4). I don't observe the behavior with compression off, but with > it on I'll run out of heap (2GB). Using a Java profiler, I see the data is > in the KafkaLZ4BlockOutputStream (or related class for gzip). I see that > MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but > is not successful. I'm most likely network bottlenecked, so I expect the > producer buffers to be full while the job is running and potentially a lot of > unacknowledged records. > I've tried using the default buffer.memory with 20 producers (across 20 > threads) and sending data as quickly as I can. I've also tried 1MB of > buffer.memory, which seemed to reduce memory consumption but I could still > run OOM in certain cases. I have max.in.flight.requests.per.connection set > to 1. In short, I should only have ~20 MB (20* 1MB) of data in buffers, but > I can easily exhaust 2000 MB used by Kafka. > In looking at the code more, it looks like the KafkaLZ4BlockOutputStream > doesn't clear the compressedBuffer or buffer when close() is called. In my > heap dump, both of those are ~65k size each, meaning that each batch is > taking up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000 > and messages are 1k each until the batch fills). > Kafka tries to manage memory usage by calling > MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release > resources required for record appends (e.g. compression buffers)". However, > this method doesn't actually clear those buffers because > KafkaLZ4BlockOutputStream.close() only writes the block and end mark and > closes the output stream. It doesn't actually clear the buffer and > compressedBuffer in KafkaLZ4BlockOutputStream. Those stay allocated in RAM > until the block is acknowledged by the broker, processed in > Sender:handleProduceResponse(), and the batch is deallocated. This memory > usage therefore increases, possibly without bound. In my test program, the > program died with approximately 345 unprocessed batches per producer (20 > producers), despite having max.in.flight.requests.per.connection=1. > h2. Steps to Reproduce > # Create a topic test with plenty of storage > # Use a connection with a very fast upload pipe and limited download. This > allows the outbound data to go out, but acknowledgements to be delayed > flowing in. > # Download KafkaSender.java (attached to this ticket) > # Set line 17 to reference your Kafka broker > # Run the program with a 1GB Xmx value > h2. Possible solutions > There are a few possible optimizations I can think of: > # We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as > non-final and null them in the close() method > # We could declare the MemoryRecordsBuilder.appendStream non-final and null > it in the closeForRecordAppends() method > # We could have the ProducerBatch discard the recordsBuilder in > closeForRecordAppends(), however, this is likely a bad idea because the > recordsBuilder contains significant metadata that is likely needed after the > stream is closed. It is also final. > # We could try to limit the number of non-acknowledged batches in flight. > This would bound the maximum memory usage but may negatively impact > performance. > > Fix #1 would only improve the LZ4 algorithm, and not any other
[jira] [Comment Edited] (KAFKA-4277) creating ephemeral node already exist
[ https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366008#comment-16366008 ] Nico Meyer edited comment on KAFKA-4277 at 2/15/18 5:47 PM: We also just encountered the same problem. Kafka 0.11.0.1 and ZooKeeper 3.4.6. It seems that Zookeeper does not guarantee that the ephemeral nodes are gone after a session expired. At least not if one connects to a different Zookeeper node, which is exactly what happened in our case: {code:java} 2018-02-15 13:43:03,464 INFO zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 172.20.0.206/172.20.0.206:2181. Will not attempt to authenticate using SASL (unknown error) 2018-02-15 13:43:04,462 INFO zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(876)) - Socket connection established to 172.20.0.206/172.20.0.206:2181, initiating session 2018-02-15 13:43:04,549 WARN zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1285)) - Unable to reconnect to ZooKeeper service, session 0x1614ad433b00b17 has expired 2018-02-15 13:43:04,549 INFO zookeeper.ClientCnxn (ClientCnxn.java:run(1154)) - Unable to reconnect to ZooKeeper service, session 0x1614ad433b00b17 has expired, closing socket connection 2018-02-15 13:43:04,549 INFO zkclient.ZkClient (ZkClient.java:processStateChanged(713)) - zookeeper state changed (Expired) 2018-02-15 13:43:04,550 INFO zookeeper.ZooKeeper (ZooKeeper.java:(438)) - Initiating client connection, connectString=172.20.0.215:2181,172.20.0.244:2181,172.20.0.204:2181,172.20.0.206:2181,172.20.0.208:2181/3rdparty/kafka08 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@199bd995 2018-02-15 13:43:04,550 INFO zookeeper.ClientCnxn (ClientCnxn.java:run(519)) - EventThread shut down for session: 0x1614ad433b00b17 2018-02-15 13:43:04,552 INFO zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 172.20.0.204/172.20.0.204:2181. Will not attempt to authenticate using SASL (unknown error) 2018-02-15 13:43:04,553 INFO zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(876)) - Socket connection established to 172.20.0.204/172.20.0.204:2181, initiating session 2018-02-15 13:43:04,553 INFO zookeeper.ClientCnxn (ClientCnxn.java:run(1158)) - Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect 2018-02-15 13:43:04,699 INFO zkclient.ZkClient (ZkClient.java:waitForKeeperState(936)) - Waiting for keeper state SyncConnected 2018-02-15 13:43:04,699 INFO zkclient.ZkClient (ZkClient.java:waitForKeeperState(936)) - Waiting for keeper state SyncConnected 2018-02-15 13:43:05,373 INFO zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 172.20.0.206/172.20.0.206:2181. Will not attempt to authenticate using SASL (unknown error) 2018-02-15 13:43:05,374 INFO zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(876)) - Socket connection established to 172.20.0.206/172.20.0.206:2181, initiating session 2018-02-15 13:43:06,575 WARN zookeeper.ClientCnxn (ClientCnxn.java:run(1108)) - Client session timed out, have not heard from server in 1201ms for sessionid 0x0 2018-02-15 13:43:06,575 INFO zookeeper.ClientCnxn (ClientCnxn.java:run(1156)) - Client session timed out, have not heard from server in 1201ms for sessionid 0x0, closing socket connection and attempting reconnect 2018-02-15 13:43:07,180 INFO zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 172.20.0.215/172.20.0.215:2181. Will not attempt to authenticate using SASL (unknown error) 2018-02-15 13:43:07,180 INFO zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(876)) - Socket connection established to 172.20.0.215/172.20.0.215:2181, initiating session 2018-02-15 13:43:07,182 INFO zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1299)) - Session establishment complete on server 172.20.0.215/172.20.0.215:2181, sessionid = 0x161997d55350158, negotiated timeout = 6000 2018-02-15 13:43:07,182 INFO zkclient.ZkClient (ZkClient.java:processStateChanged(713)) - zookeeper state changed (SyncConnected) 2018-02-15 13:43:07,184 INFO server.KafkaHealthcheck$SessionExpireListener (Logging.scala:info(70)) - re-registering broker info in ZK for broker 11 2018-02-15 13:43:07,203 INFO utils.ZKCheckedEphemeral (Logging.scala:info(70)) - Creating /brokers/ids/11 (is it secure? false) 2018-02-15 13:43:07,246 INFO utils.ZKCheckedEphemeral (Logging.scala:info(70)) - Result of znode creation is: NODEEXISTS 2018-02-15 13:43:07,304 ERROR zkclient.ZkEventThread (ZkEventThread.java:run(78)) - Error handling event ZkEvent[New session event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@bb373b3] java.lang.RuntimeException: A broker is already registered on the
[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records
[ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366010#comment-16366010 ] Matthias J. Sax commented on KAFKA-6502: If we really change the timestamp handling, and only look at the first record per partition to determine which record to pick next, this issue resolved itself, as we can to lazy deserialization for the first record per partition only. > Kafka streams deserialization handler not committing offsets on error records > - > > Key: KAFKA-6502 > URL: https://issues.apache.org/jira/browse/KAFKA-6502 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Soby Chacko >Priority: Minor > > See this StackOverflow issue: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler] > and this comment: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899] > I am trying to use the LogAndContinueExceptionHandler on deserialization. It > works fine when an error occurs by successfully logging and continuing. > However, on a continuous stream of errors, it seems like these messages are > not committed and on a restart of the application they reappear again. It is > more problematic if I try to send the messages in error to a DLQ. On a > restart, they are sent again to DLQ. As soon as I have a good record coming > in, it looks like the offset moves further and not seeing the already logged > messages again after a restart. > I reproduced this behavior by running the sample provided here: > [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java] > I changed the incoming value Serde to > {{Serdes.Integer().getClass().getName()}} to force a deserialization error on > input and reduced the commit interval to just 1 second. Also added the > following to the config. > {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class);}}. > It looks like when deserialization exceptions occur, this flag is never set > to be true here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228]. > It only becomes true once processing succeeds. That might be the reason why > commit is not happening even after I manually call processorContext#commit(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4277) creating ephemeral node already exist
[ https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366008#comment-16366008 ] Nico Meyer commented on KAFKA-4277: --- We also just encountered the same problem. Kafka 0.11.0.1 and ZooKeeper 3.4.6. It seems that Zookeeper does not guarantee that the ephemeral nodes are gone after a session expired. At least not if one connects to a different Zookeeper node, which is exactly what happened in our case: {code:java} 2018-02-15 13:43:03,464 INFO zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 172.20.0.206/172.20.0.206:2181. Will not attempt to authenticate using SASL (unknown error) 2018-02-15 13:43:04,462 INFO zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(876)) - Socket connection established to 172.20.0.206/172.20.0.206:2181, initiating session 2018-02-15 13:43:04,549 WARN zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1285)) - Unable to reconnect to ZooKeeper service, session 0x1614ad433b00b17 has expired 2018-02-15 13:43:04,549 INFO zookeeper.ClientCnxn (ClientCnxn.java:run(1154)) - Unable to reconnect to ZooKeeper service, session 0x1614ad433b00b17 has expired, closing socket connection 2018-02-15 13:43:04,549 INFO zkclient.ZkClient (ZkClient.java:processStateChanged(713)) - zookeeper state changed (Expired) 2018-02-15 13:43:04,550 INFO zookeeper.ZooKeeper (ZooKeeper.java:(438)) - Initiating client connection, connectString=172.20.0.215:2181,172.20.0.244:2181,172.20.0.204:2181,172.20.0.206:2181,172.20.0.208:2181/3rdparty/kafka08 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@199bd995 2018-02-15 13:43:04,550 INFO zookeeper.ClientCnxn (ClientCnxn.java:run(519)) - EventThread shut down for session: 0x1614ad433b00b17 2018-02-15 13:43:04,552 INFO zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 172.20.0.204/172.20.0.204:2181. Will not attempt to authenticate using SASL (unknown error) 2018-02-15 13:43:04,553 INFO zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(876)) - Socket connection established to 172.20.0.204/172.20.0.204:2181, initiating session 2018-02-15 13:43:04,553 INFO zookeeper.ClientCnxn (ClientCnxn.java:run(1158)) - Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect 2018-02-15 13:43:04,699 INFO zkclient.ZkClient (ZkClient.java:waitForKeeperState(936)) - Waiting for keeper state SyncConnected 2018-02-15 13:43:04,699 INFO zkclient.ZkClient (ZkClient.java:waitForKeeperState(936)) - Waiting for keeper state SyncConnected 2018-02-15 13:43:05,373 INFO zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 172.20.0.206/172.20.0.206:2181. Will not attempt to authenticate using SASL (unknown error) 2018-02-15 13:43:05,374 INFO zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(876)) - Socket connection established to 172.20.0.206/172.20.0.206:2181, initiating session 2018-02-15 13:43:06,575 WARN zookeeper.ClientCnxn (ClientCnxn.java:run(1108)) - Client session timed out, have not heard from server in 1201ms for sessionid 0x0 2018-02-15 13:43:06,575 INFO zookeeper.ClientCnxn (ClientCnxn.java:run(1156)) - Client session timed out, have not heard from server in 1201ms for sessionid 0x0, closing socket connection and attempting reconnect 2018-02-15 13:43:07,180 INFO zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 172.20.0.215/172.20.0.215:2181. Will not attempt to authenticate using SASL (unknown error) 2018-02-15 13:43:07,180 INFO zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(876)) - Socket connection established to 172.20.0.215/172.20.0.215:2181, initiating session 2018-02-15 13:43:07,182 INFO zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1299)) - Session establishment complete on server 172.20.0.215/172.20.0.215:2181, sessionid = 0x161997d55350158, negotiated timeout = 6000 2018-02-15 13:43:07,182 INFO zkclient.ZkClient (ZkClient.java:processStateChanged(713)) - zookeeper state changed (SyncConnected) 2018-02-15 13:43:07,184 INFO server.KafkaHealthcheck$SessionExpireListener (Logging.scala:info(70)) - re-registering broker info in ZK for broker 11 2018-02-15 13:43:07,203 INFO utils.ZKCheckedEphemeral (Logging.scala:info(70)) - Creating /brokers/ids/11 (is it secure? false) 2018-02-15 13:43:07,246 INFO utils.ZKCheckedEphemeral (Logging.scala:info(70)) - Result of znode creation is: NODEEXISTS 2018-02-15 13:43:07,304 ERROR zkclient.ZkEventThread (ZkEventThread.java:run(78)) - Error handling event ZkEvent[New session event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@bb373b3] java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/11. This probably indicates that
[jira] [Commented] (KAFKA-6512) Java Producer: Excessive memory usage with compression enabled
[ https://issues.apache.org/jira/browse/KAFKA-6512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365989#comment-16365989 ] ASF GitHub Bot commented on KAFKA-6512: --- rajinisivaram closed pull request #4570: KAFKA-6512: Discard references to buffers used for compression URL: https://github.com/apache/kafka/pull/4570 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java index 8cfc37be826..591ab169364 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.record; -import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -34,7 +33,7 @@ * * This class is not thread-safe. */ -public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { +public final class KafkaLZ4BlockOutputStream extends OutputStream { public static final int MAGIC = 0x184D2204; public static final int LZ4_MAX_HEADER_LENGTH = 19; @@ -52,9 +51,10 @@ private final boolean useBrokenFlagDescriptorChecksum; private final FLG flg; private final BD bd; -private final byte[] buffer; -private final byte[] compressedBuffer; private final int maxBlockSize; +private OutputStream out; +private byte[] buffer; +private byte[] compressedBuffer; private int bufferOffset; private boolean finished; @@ -71,7 +71,7 @@ * @throws IOException */ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException { -super(out); +this.out = out; compressor = LZ4Factory.fastestInstance().fastCompressor(); checksum = XXHashFactory.fastestInstance().hash32(); this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum; @@ -204,7 +204,6 @@ private void writeBlock() throws IOException { private void writeEndMark() throws IOException { ByteUtils.writeUnsignedIntLE(out, 0); // TODO implement content checksum, update flg.validate() -finished = true; } @Override @@ -259,15 +258,26 @@ private void ensureNotFinished() { @Override public void close() throws IOException { -if (!finished) { -// basically flush the buffer writing the last block -writeBlock(); -// write the end block and finish the stream -writeEndMark(); -} -if (out != null) { -out.close(); -out = null; +try { +if (!finished) { +// basically flush the buffer writing the last block +writeBlock(); +// write the end block +writeEndMark(); +} +} finally { +try { +if (out != null) { +try (OutputStream outStream = out) { +outStream.flush(); +} +} +} finally { +out = null; +buffer = null; +compressedBuffer = null; +finished = true; +} } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index a9b57ac22df..6f6404fa2d9 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -23,6 +23,7 @@ import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import static org.apache.kafka.common.utils.Utils.wrapNullable; @@ -38,11 +39,15 @@ */ public class MemoryRecordsBuilder { private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f; +private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() { +@Override +public void write(int b) throws IOException { +throw new IllegalStateException("MemoryRecordsBuilder is closed for record appends"); +} +}); private final TimestampType timestampType; private final CompressionType compressionType; -// Used to append records, may compress data on the fly -private final
[jira] [Resolved] (KAFKA-2167) ZkUtils updateEphemeralPath JavaDoc (spelling and correctness)
[ https://issues.apache.org/jira/browse/KAFKA-2167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-2167. -- Resolution: Fixed > ZkUtils updateEphemeralPath JavaDoc (spelling and correctness) > -- > > Key: KAFKA-2167 > URL: https://issues.apache.org/jira/browse/KAFKA-2167 > Project: Kafka > Issue Type: Bug >Reporter: Jon Bringhurst >Priority: Major > Labels: newbie > > I'm not 100% sure on this, but it seems like "persistent" should instead say > "ephemeral" in the JavaDoc. Also, note that "parrent" is misspelled. > {noformat} > /** >* Update the value of a persistent node with the given path and data. >* create parrent directory if necessary. Never throw NodeExistException. >*/ > def updateEphemeralPath(client: ZkClient, path: String, data: String): Unit > = { > try { > client.writeData(path, data) > } > catch { > case e: ZkNoNodeException => { > createParentPath(client, path) > client.createEphemeral(path, data) > } > case e2 => throw e2 > } > } > {noformat} > should be: > {noformat} > /** >* Update the value of an ephemeral node with the given path and data. >* create parent directory if necessary. Never throw NodeExistException. >*/ > def updateEphemeralPath(client: ZkClient, path: String, data: String): Unit > = { > try { > client.writeData(path, data) > } > catch { > case e: ZkNoNodeException => { > createParentPath(client, path) > client.createEphemeral(path, data) > } > case e2 => throw e2 > } > } > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6240) Support dynamic updates of frequently updated broker configs
[ https://issues.apache.org/jira/browse/KAFKA-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6240. --- Resolution: Fixed Fix Version/s: (was: 1.2.0) 1.1.0 > Support dynamic updates of frequently updated broker configs > > > Key: KAFKA-6240 > URL: https://issues.apache.org/jira/browse/KAFKA-6240 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.0 > > > See > [KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration] > for details. > Implementation will be done under sub-tasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6564) Fix broken links in Dockerfile
[ https://issues.apache.org/jira/browse/KAFKA-6564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365510#comment-16365510 ] Manikumar commented on KAFKA-6564: -- This was fixed in -KAFKA-6247- > Fix broken links in Dockerfile > -- > > Key: KAFKA-6564 > URL: https://issues.apache.org/jira/browse/KAFKA-6564 > Project: Kafka > Issue Type: Test >Reporter: Andriy Sorokhtey >Priority: Minor > > https://github.com/apache/kafka/blob/1.0.0/tests/docker/Dockerfile > {noformat} > # Install binary test dependencies. > ENV MIRROR="http://mirrors.ocf.berkeley.edu/apache/; > RUN mkdir -p "/opt/kafka-0.8.2.2" && curl -s > "${MIRROR}kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 > -C "/opt/kafka-0.8.2.2" > RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s > "${MIRROR}kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 > -C "/opt/kafka-0.9.0.1" > RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s > "${MIRROR}kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz" | tar xz > --strip-components=1 -C "/opt/kafka-0.10.0.1" > RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s > "${MIRROR}kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz" | tar xz > --strip-components=1 -C "/opt/kafka-0.10.1.1" > RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s > "${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz" | tar xz > --strip-components=1 -C "/opt/kafka-0.10.2.1" > RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s > "${MIRROR}kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz" | tar xz > --strip-components=1 -C "/opt/kafka-0.11.0.0" > {noformat} > This links seems to be broken and automated tests executed on docker fails > with error: > {noformat} > log: /bin/sh -c mkdir -p "/opt/kafka-0.8.2.2" && curl -s > "${MIRROR}kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 > -C "/opt/kafka-0.8.2.2"' returned a non-zero code: 2 > {noformat} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6564) Fix broken links in Dockerfile
Andriy Sorokhtey created KAFKA-6564: --- Summary: Fix broken links in Dockerfile Key: KAFKA-6564 URL: https://issues.apache.org/jira/browse/KAFKA-6564 Project: Kafka Issue Type: Test Reporter: Andriy Sorokhtey https://github.com/apache/kafka/blob/1.0.0/tests/docker/Dockerfile {noformat} # Install binary test dependencies. ENV MIRROR="http://mirrors.ocf.berkeley.edu/apache/; RUN mkdir -p "/opt/kafka-0.8.2.2" && curl -s "${MIRROR}kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2" RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s "${MIRROR}kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1" RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s "${MIRROR}kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1" RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "${MIRROR}kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1" RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1" RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s "${MIRROR}kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0" {noformat} This links seems to be broken and automated tests executed on docker fails with error: {noformat} log: /bin/sh -c mkdir -p "/opt/kafka-0.8.2.2" && curl -s "${MIRROR}kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2"' returned a non-zero code: 2 {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6536) Streams quickstart pom.xml is missing versions for a bunch of plugins
[ https://issues.apache.org/jira/browse/KAFKA-6536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365411#comment-16365411 ] Yaswanth Kumar commented on KAFKA-6536: --- Great thanks! > Streams quickstart pom.xml is missing versions for a bunch of plugins > - > > Key: KAFKA-6536 > URL: https://issues.apache.org/jira/browse/KAFKA-6536 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0, 0.11.0.2, 1.0.1 >Reporter: Ewen Cheslack-Postava >Assignee: Yaswanth Kumar >Priority: Major > Labels: newbie > Original Estimate: 1h > Remaining Estimate: 1h > > There are a bunch of plugins being used that maven helpfully warns you about > being unversioned: > {code:java} > > [INFO] Scanning for projects... > > [WARNING] > > [WARNING] Some problems were encountered while building the effective model > > for org.apache.kafka:streams-quickstart-java:maven-archetype:1.0.1 > > [WARNING] 'build.plugins.plugin.version' for > > org.apache.maven.plugins:maven-shade-plugin is missing. @ > > org.apache.kafka:streams-quickstart:1.0.1, > > /Users/ewencp/kafka.git/.release_work_dir/kafka/streams/quickstart/pom.xml, > > line 64, column 21 > > [WARNING] 'build.plugins.plugin.version' for > > com.github.siom79.japicmp:japicmp-maven-plugin is missing. @ > > org.apache.kafka:streams-quickstart:1.0.1, > > /Users/ewencp/kafka.git/.release_work_dir/kafka/streams/quickstart/pom.xml, > > line 74, column 21 > > [WARNING] > > [WARNING] Some problems were encountered while building the effective model > > for org.apache.kafka:streams-quickstart:pom:1.0.1 > > [WARNING] 'build.plugins.plugin.version' for > > org.apache.maven.plugins:maven-shade-plugin is missing. @ line 64, column 21 > > [WARNING] 'build.plugins.plugin.version' for > > com.github.siom79.japicmp:japicmp-maven-plugin is missing. @ line 74, > > column 21 > > [WARNING] > > [WARNING] It is highly recommended to fix these problems because they > > threaten the stability of your build. > > [WARNING] > > [WARNING] For this reason, future Maven versions might no longer support > > building such malformed projects.{code} > Unversioned dependencies are dangerous as they make the build > non-reproducible. In fact, a released version may become very difficult to > build as the user would have to track down the working versions of the > plugins. This seems particularly bad for the quickstart as it's likely to be > copy/pasted into people's own projects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6563) Kafka online backup
Werner Daehn created KAFKA-6563: --- Summary: Kafka online backup Key: KAFKA-6563 URL: https://issues.apache.org/jira/browse/KAFKA-6563 Project: Kafka Issue Type: Improvement Components: core Reporter: Werner Daehn If you consider Kafka to be a "database" with "transaction logs", you need a way for backup/recovery just like databases do. The beauty of such a solution would be to enable Kafka for smaller scenarios where you do not want to have a large cluster. You could even use a single node Kafka. In worst case you lose all data since the backup and you have to ask the sources to send that data again - for most that is possible. Currently you have multiple options, none of which are good. # Setup Kafka fault tolerant and with replication factors: Needs a larger server and does not prevent many types of problems like software bugs, deleting a topic by accident,... # Mirror Kafka: Very expensive. # Shutdown Kafka, disk copy, startup Kafka # Add a database before Kafka as a primary persistence: Very very expensive and forfeits the idea of Kafka I wonder what really is needed for an online backup strategy. If I am not mistaken it is relatively little. * A command that causes Kafka to switch to new files so that the file containing all past data do not change any longer. * Export of the current Zookeeper values, unless they can be recreated from the transaction log files anyhow. * Then you can backup the Kafka files * A command that tells that the backup is finished to cleanup things. * Later a way to merge the recovered backup instance with the Kafka log written since then up to a certain point. Example: Backup was taken at midnight, delete topic was done a 11:00. You start with the backup, apply the logs until 10:59 and then you bring up Kafka fully online again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)