[jira] [Commented] (KAFKA-7694) Support ZooKeeper based master/secret key management for delegation tokens
[ https://issues.apache.org/jira/browse/KAFKA-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718532#comment-16718532 ] Satish Duggana commented on KAFKA-7694: --- This is about giving pluggable interfaces for delegation token storage and master key management and have zookeeper storage as the default implementation. I am working on a KIP for the above, I will send it out when it is ready. > Support ZooKeeper based master/secret key management for delegation tokens > --- > > Key: KAFKA-7694 > URL: https://issues.apache.org/jira/browse/KAFKA-7694 > Project: Kafka > Issue Type: Sub-task >Reporter: Manikumar >Assignee: Satish Duggana >Priority: Major > > Master/secret key is used to generate and verify delegation tokens. > currently, master key/secret is stored as plain text in server.properties > config file. Same key must be configured across all the brokers. We require a > re-deployment when the secret needs to be rotated. > This JIRA is to explore and implement a ZooKeeper based master/secret key > management to automate secret key generation and expiration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7723) Kafka Connect support override worker kafka api configuration with connector configuration that post by rest api
[ https://issues.apache.org/jira/browse/KAFKA-7723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718427#comment-16718427 ] ASF GitHub Bot commented on KAFKA-7723: --- sweat123 opened a new pull request #6026: KAFKA-7723: Support override kafka connect worker api configuration with rest api URL: https://github.com/apache/kafka/pull/6026 More details https://issues.apache.org/jira/browse/KAFKA-7723 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Connect support override worker kafka api configuration with connector > configuration that post by rest api > > > Key: KAFKA-7723 > URL: https://issues.apache.org/jira/browse/KAFKA-7723 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: laomei >Priority: Minor > > I'm using kafka sink connect; "auto.offset.reset" is set in > connect-distributed*.properties; > It works for all connector which in one worker; So the consumer will poll > records from latest or earliest; I can not control the auto.offset.reset in > connector configs post with rest api; > So I think is necessary to override worker kafka api configs with connector > configs; > Like this > {code:java} > { > "name": "test", > "config": { > "consumer.auto.offset.reset": "latest", > "consumer.xxx" > "connector.class": "com.laomei.sis.solr.SolrConnector", > "tasks.max": "1", > "poll.interval.ms": "100", > "connect.timeout.ms": "6", > "topics": "test" > } > } > {code} > We can override kafka consumer auto offset reset in sink connector; -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7707) Some code is not necessary
[ https://issues.apache.org/jira/browse/KAFKA-7707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718420#comment-16718420 ] huangyiming edited comment on KAFKA-7707 at 12/12/18 3:19 AM: -- Hi [~sliebau] ,thank you review, in the lock area,don't have another thread add Condition to the waiters. the waiters only have one condition {code:java} public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer freeUp(size); this.nonPooledAvailableMemory -= size; } else { // we are out of memory and will have to block int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); {code} and finally we have remove the Condition: {code:java} } finally { // When this loop was not able to successfully terminate don't loose available memory this.nonPooledAvailableMemory += accumulated; this.waiters.remove(moreMemory); } {code} so i think in the last finally wo don't need this.waiters.peekFirst() any more. {code:java} } finally { // signal any additional waiters if there is more memory left // over for them try { if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise find bugs complains lock.unlock(); } } {code} can modify like this: {code:java} } finally { lock.unlock(); } {code} was (Author: huangyimingha...@163.com): Hi [~sliebau] ,thank you review, in the lock area,don't have another thread add Condition to the waiters. the waiters only have one condition {code:java} public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer freeUp(size); this.nonPooledAvailableMemory -= size; } else { // we are out of memory and will have to block int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); {code} and finally we have remove the Condition: {code:java} } finally { // When this loop was not able to successfully terminate don't loose available memory this.nonPooledAvailableMemory += accumulated; this.waiters.remove(moreMemory); } {code} so i think in the last finally: {code:java} } finally { // signal any additional waiters if there is more memory left // over for them try { if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise find bugs complains lock.unlock(); } } {code} can modify like this: {code:java} } finally { lock.unlock(); } {code} > Some code is not necessary > -- > > Key: KAFKA-7707 > URL: https://issues.apache.org/jira/browse/KAFKA-7707 > Project: Kafka > Issue Type: Improvement >Reporter: huangyiming >Priority: Minor > Attachments: image-2018-12-05-18-01-46-886.png > > > In the trunk branch in > [BufferPool.java|https://github.com/apache/kafka/blob/578205cadd0bf64d671c6c162229c4975081a9d6/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java#L174], > i think the code can clean,is not necessary,it will never execute > {code:java} > if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && >
[jira] [Comment Edited] (KAFKA-7707) Some code is not necessary
[ https://issues.apache.org/jira/browse/KAFKA-7707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718420#comment-16718420 ] huangyiming edited comment on KAFKA-7707 at 12/12/18 3:16 AM: -- Hi [~sliebau] ,thank you review, in the lock area,don't have another thread add Condition to the waiters. the waiters only have one condition {code:java} public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer freeUp(size); this.nonPooledAvailableMemory -= size; } else { // we are out of memory and will have to block int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); {code} and finally we have remove the Condition: {code:java} } finally { // When this loop was not able to successfully terminate don't loose available memory this.nonPooledAvailableMemory += accumulated; this.waiters.remove(moreMemory); } {code} so i think in the last finally: {code:java} } finally { // signal any additional waiters if there is more memory left // over for them try { if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise find bugs complains lock.unlock(); } } {code} can modify like this: {code:java} } finally { lock.unlock(); } {code} was (Author: huangyimingha...@163.com): Hi [~sliebau] ,thank you review, in the lock area,don't have another thread add Condition to the waiters. the waiters only have one condition {code:java} public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer freeUp(size); this.nonPooledAvailableMemory -= size; } else { // we are out of memory and will have to block int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); {code} and finally we have remove the Condition: {code:java} } finally { // When this loop was not able to successfully terminate don't loose available memory this.nonPooledAvailableMemory += accumulated; this.waiters.remove(moreMemory); } {code} so i think in the last finally: {code:java} } finally { // signal any additional waiters if there is more memory left // over for them try { if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise find bugs complains lock.unlock(); } } {code} can modify like this: {code:java} } finally { lock.unlock(); } {code} > Some code is not necessary > -- > > Key: KAFKA-7707 > URL: https://issues.apache.org/jira/browse/KAFKA-7707 > Project: Kafka > Issue Type: Improvement >Reporter: huangyiming >Priority: Minor > Attachments: image-2018-12-05-18-01-46-886.png > > > In the trunk branch in > [BufferPool.java|https://github.com/apache/kafka/blob/578205cadd0bf64d671c6c162229c4975081a9d6/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java#L174], > i think the code can clean,is not necessary,it will never execute > {code:java} > if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && > !this.waiters.isEmpty()) > this.waiters.peekFirst().signal();
[jira] [Commented] (KAFKA-7707) Some code is not necessary
[ https://issues.apache.org/jira/browse/KAFKA-7707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718420#comment-16718420 ] huangyiming commented on KAFKA-7707: Hi [~sliebau] ,thank you review, in the lock area,don't have another thread add Condition to the waiters. the waiters only have one condition {code:java} public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer freeUp(size); this.nonPooledAvailableMemory -= size; } else { // we are out of memory and will have to block int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); {code} and finally we have remove the Condition: {code:java} } finally { // When this loop was not able to successfully terminate don't loose available memory this.nonPooledAvailableMemory += accumulated; this.waiters.remove(moreMemory); } {code} so i think in the last finally: {code:java} } finally { // signal any additional waiters if there is more memory left // over for them try { if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise find bugs complains lock.unlock(); } } {code} can modify like this: {code:java} } finally { lock.unlock(); } {code} > Some code is not necessary > -- > > Key: KAFKA-7707 > URL: https://issues.apache.org/jira/browse/KAFKA-7707 > Project: Kafka > Issue Type: Improvement >Reporter: huangyiming >Priority: Minor > Attachments: image-2018-12-05-18-01-46-886.png > > > In the trunk branch in > [BufferPool.java|https://github.com/apache/kafka/blob/578205cadd0bf64d671c6c162229c4975081a9d6/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java#L174], > i think the code can clean,is not necessary,it will never execute > {code:java} > if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && > !this.waiters.isEmpty()) > this.waiters.peekFirst().signal(); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7723) Kafka Connect support override worker kafka api configuration with connector configuration that post by rest api
laomei created KAFKA-7723: - Summary: Kafka Connect support override worker kafka api configuration with connector configuration that post by rest api Key: KAFKA-7723 URL: https://issues.apache.org/jira/browse/KAFKA-7723 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: laomei I'm using kafka sink connect; "auto.offset.reset" is set in connect-distributed*.properties; It works for all connector which in one worker; So the consumer will poll records from latest or earliest; I can not control the auto.offset.reset in connector configs post with rest api; So I think is necessary to override worker kafka api configs with connector configs; Like this {code:java} { "name": "test", "config": { "consumer.auto.offset.reset": "latest", "consumer.xxx" "connector.class": "com.laomei.sis.solr.SolrConnector", "tasks.max": "1", "poll.interval.ms": "100", "connect.timeout.ms": "6", "topics": "test" } } {code} We can override kafka consumer auto offset reset in sink connector; -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7715) Connect should have a parameter to disable WADL output for OPTIONS method
[ https://issues.apache.org/jira/browse/KAFKA-7715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718352#comment-16718352 ] ASF GitHub Bot commented on KAFKA-7715: --- avocader opened a new pull request #6025: KAFKA-7715: Added a configuration parameter to Connect which disables WADL output for OPTIONS method. URL: https://github.com/apache/kafka/pull/6025 Currently, Connect REST endpoint replies to `OPTIONS` request with verbose WADL information, which could be used for an attack. It's not recommended for the production system to expose that information, but for the backward-compatibility reasons, it may still be available by default, with a possibility to turn it off by setting `rest.wadl.enable=false`. Added unit tests in `RestServerTest`, which asserts that `Content-type` is either `application/vnd.sun.wadl+xml` if `rest.wadl.enable=true` or `rest.wadl.enable` is not set; or `text/plain` otherwise. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connect should have a parameter to disable WADL output for OPTIONS method > - > > Key: KAFKA-7715 > URL: https://issues.apache.org/jira/browse/KAFKA-7715 > Project: Kafka > Issue Type: Improvement > Components: config, security >Affects Versions: 2.1.0 >Reporter: Oleksandr Diachenko >Priority: Critical > Fix For: 2.1.1 > > > Currently, Connect REST API exposes WADL output on OPTIONS method: > {code:bash} > curl -i -X OPTIONS http://localhost:8083/connectors > HTTP/1.1 200 OK > Date: Fri, 07 Dec 2018 22:51:53 GMT > Content-Type: application/vnd.sun.wadl+xml > Allow: HEAD,POST,GET,OPTIONS > Last-Modified: Fri, 07 Dec 2018 14:51:53 PST > Content-Length: 1331 > Server: Jetty(9.4.12.v20180830) > > http://wadl.dev.java.net/2009/02;> > http://jersey.java.net/; jersey:generatedBy="Jersey: 2.27 > 2018-04-10 07:34:57"/> > > http://localhost:8083/application.wadl/xsd0.xsd;> > > > > http://localhost:8083/;> > > > > http://www.w3.org/2001/XMLSchema; name="forward" > style="query" type="xs:boolean"/> > > > > > > > > > http://www.w3.org/2001/XMLSchema; name="forward" > style="query" type="xs:boolean"/> > > > > > > > > > {code} > This can be a potential vulnerability, so it makes sense to have a > configuration parameter, which disables WADL output. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5061) client.id should be set for Connect producers/consumers
[ https://issues.apache.org/jira/browse/KAFKA-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718311#comment-16718311 ] Paul Davidson commented on KAFKA-5061: -- Thanks for the response [~mageshn] - I will create a KIP. > client.id should be set for Connect producers/consumers > --- > > Key: KAFKA-5061 > URL: https://issues.apache.org/jira/browse/KAFKA-5061 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.1 >Reporter: Ewen Cheslack-Postava >Priority: Major > Labels: needs-kip, newbie++ > > In order to properly monitor individual tasks using the producer and consumer > metrics, we need to have the framework disambiguate them. Currently when we > create producers > (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L362) > and create consumers > (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L371-L394) > the client ID is not being set. You can override it for the entire worker > via worker-level producer/consumer overrides, but you can't get per-task > metrics. > There are a couple of things we might want to consider doing here: > 1. Provide default client IDs based on the worker group ID + task ID > (providing uniqueness for multiple connect clusters up to the scope of the > Kafka cluster they are operating on). This seems ideal since it's a good > default; however it is a public-facing change and may need a KIP. Normally I > would be less worried about this, but some folks may be relying on picking up > metrics without this being set, in which case such a change would break their > monitoring. > 2. Allow overriding client.id on a per-connector basis. I'm not sure if this > will really be useful or not -- it lets you differentiate between metrics for > different connectors' tasks, but within a connector, all metrics would go to > a single client.id. On the other hand, this makes the tasks act as a single > group from the perspective of broker handling of client IDs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5061) client.id should be set for Connect producers/consumers
[ https://issues.apache.org/jira/browse/KAFKA-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718276#comment-16718276 ] Magesh kumar Nandakumar commented on KAFKA-5061: [~pdavidson] I had a quick look at the PR and it appears that we are introducing a new configuration to handle this. We would probably require a small KIP since its a public facing change. > client.id should be set for Connect producers/consumers > --- > > Key: KAFKA-5061 > URL: https://issues.apache.org/jira/browse/KAFKA-5061 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.1 >Reporter: Ewen Cheslack-Postava >Priority: Major > Labels: needs-kip, newbie++ > > In order to properly monitor individual tasks using the producer and consumer > metrics, we need to have the framework disambiguate them. Currently when we > create producers > (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L362) > and create consumers > (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L371-L394) > the client ID is not being set. You can override it for the entire worker > via worker-level producer/consumer overrides, but you can't get per-task > metrics. > There are a couple of things we might want to consider doing here: > 1. Provide default client IDs based on the worker group ID + task ID > (providing uniqueness for multiple connect clusters up to the scope of the > Kafka cluster they are operating on). This seems ideal since it's a good > default; however it is a public-facing change and may need a KIP. Normally I > would be less worried about this, but some folks may be relying on picking up > metrics without this being set, in which case such a change would break their > monitoring. > 2. Allow overriding client.id on a per-connector basis. I'm not sure if this > will really be useful or not -- it lets you differentiate between metrics for > different connectors' tasks, but within a connector, all metrics would go to > a single client.id. On the other hand, this makes the tasks act as a single > group from the perspective of broker handling of client IDs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7681) new metric for request thread utilization by request type
[ https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718138#comment-16718138 ] Mayuresh Gharat commented on KAFKA-7681: Hi [~junrao], # IIUC, After inspecting the code and your suggestion above, this seems doable by adding a new metric type like "localTimeRate" or "RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() like we have for {code:java} totalProduceRequestRate {code}. This will give us the rate of local time for each request which ~ usage of the RequestHandlerThreadPool. # I was thinking more on the lines of having a ratio, that would give us instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", but for each request type wherein the {code:java} value = (Total sampled Local Time of A Request) / (Total of sampled local times of all the requests) {code} > new metric for request thread utilization by request type > - > > Key: KAFKA-7681 > URL: https://issues.apache.org/jira/browse/KAFKA-7681 > Project: Kafka > Issue Type: Improvement >Reporter: Jun Rao >Assignee: Mayuresh Gharat >Priority: Major > > When the request thread pool is saturated, it's often useful to know which > type request is using the thread pool the most. It would be useful to add a > metric that tracks the fraction of request thread pool usage by request type. > This would be equivalent to (request rate) * (request local time ms) / 1000, > but will be more direct. This would require a new KIP. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7681) new metric for request thread utilization by request type
[ https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718138#comment-16718138 ] Mayuresh Gharat edited comment on KAFKA-7681 at 12/11/18 10:57 PM: --- Hi [~junrao], 1) IIUC, After inspecting the code and your suggestion above, this seems doable by adding a new metric type like "localTimeRate" or "RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() like we have for {code:java} totalProduceRequestRate {code} This will give us the rate of local time for each request which ~ usage of the RequestHandlerThreadPool. 2) I was thinking more on the lines of having a ratio, that would give us instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", but for each request type wherein the {code:java} value = (Total sampled Local Time of A Request) / (Total of sampled local times of all the requests) {code} I can start putting up a KIP, if we think that 1) would suffice. was (Author: mgharat): Hi [~junrao], 1) IIUC, After inspecting the code and your suggestion above, this seems doable by adding a new metric type like "localTimeRate" or "RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() like we have for {code:java} totalProduceRequestRate {code}. This will give us the rate of local time for each request which ~ usage of the RequestHandlerThreadPool. 2) I was thinking more on the lines of having a ratio, that would give us instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", but for each request type wherein the {code:java} value = (Total sampled Local Time of A Request) / (Total of sampled local times of all the requests) {code} I can start putting up a KIP, if we think that 1) would suffice. > new metric for request thread utilization by request type > - > > Key: KAFKA-7681 > URL: https://issues.apache.org/jira/browse/KAFKA-7681 > Project: Kafka > Issue Type: Improvement >Reporter: Jun Rao >Assignee: Mayuresh Gharat >Priority: Major > > When the request thread pool is saturated, it's often useful to know which > type request is using the thread pool the most. It would be useful to add a > metric that tracks the fraction of request thread pool usage by request type. > This would be equivalent to (request rate) * (request local time ms) / 1000, > but will be more direct. This would require a new KIP. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7681) new metric for request thread utilization by request type
[ https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718138#comment-16718138 ] Mayuresh Gharat edited comment on KAFKA-7681 at 12/11/18 10:56 PM: --- Hi [~junrao], 1) IIUC, After inspecting the code and your suggestion above, this seems doable by adding a new metric type like "localTimeRate" or "RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() like we have for {code:java} totalProduceRequestRate {code}. This will give us the rate of local time for each request which ~ usage of the RequestHandlerThreadPool. 2) I was thinking more on the lines of having a ratio, that would give us instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", but for each request type wherein the {code:java} value = (Total sampled Local Time of A Request) / (Total of sampled local times of all the requests) {code} I can start putting up a KIP, if we think that 1) would suffice. was (Author: mgharat): Hi [~junrao], # IIUC, After inspecting the code and your suggestion above, this seems doable by adding a new metric type like "localTimeRate" or "RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() like we have for {code:java} totalProduceRequestRate {code}. This will give us the rate of local time for each request which ~ usage of the RequestHandlerThreadPool. # I was thinking more on the lines of having a ratio, that would give us instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", but for each request type wherein the {code:java} value = (Total sampled Local Time of A Request) / (Total of sampled local times of all the requests) {code} > new metric for request thread utilization by request type > - > > Key: KAFKA-7681 > URL: https://issues.apache.org/jira/browse/KAFKA-7681 > Project: Kafka > Issue Type: Improvement >Reporter: Jun Rao >Assignee: Mayuresh Gharat >Priority: Major > > When the request thread pool is saturated, it's often useful to know which > type request is using the thread pool the most. It would be useful to add a > metric that tracks the fraction of request thread pool usage by request type. > This would be equivalent to (request rate) * (request local time ms) / 1000, > but will be more direct. This would require a new KIP. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7722) Increase ProducerPerformance precision by using nanoTime
Kevin Lu created KAFKA-7722: --- Summary: Increase ProducerPerformance precision by using nanoTime Key: KAFKA-7722 URL: https://issues.apache.org/jira/browse/KAFKA-7722 Project: Kafka Issue Type: New Feature Components: tools Reporter: Kevin Lu Assignee: Kevin Lu https://cwiki.apache.org/confluence/display/KAFKA/KIP-403%3A+Increase+ProducerPerformance+precision+by+using+nanoTime -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6706) NETWORK_EXCEPTION and REQUEST_TIMED_OUT in mirrormaker producer after 1.0 broker upgrade
[ https://issues.apache.org/jira/browse/KAFKA-6706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718052#comment-16718052 ] David van Geest commented on KAFKA-6706: FWIW upgrading to Kafka 1.1.1 solved these problems for us. > NETWORK_EXCEPTION and REQUEST_TIMED_OUT in mirrormaker producer after 1.0 > broker upgrade > > > Key: KAFKA-6706 > URL: https://issues.apache.org/jira/browse/KAFKA-6706 > Project: Kafka > Issue Type: Bug > Components: core, network >Affects Versions: 1.0.0 >Reporter: Di Shang >Priority: Blocker > Labels: mirror-maker > > We have 2 clusters A and B with 4 brokers each, we use mirrormaker to > replicate topics from A to B. > We recently upgraded our brokers from 0.10.2.0 to 1.0.0, after the upgrade > we started seeing the mirrormaker task showing producer errors and > intermittently dying. > We tried using 1.0.0 and 0.10.2.0 mirrormaker, both have the same problem. > Downgrading cluster B brokers back to 0.10.2.0 and the problem went away, so > we think it's a server side problem. > There are 2 types of errors: REQUEST_TIMED_OUT and NETWORK_EXCEPTION. For > testing, I used a topic *logging* with 20 partitions and 3 replicas (same on > cluster A and B), the source topic has 50+ million msg. > (this is from mirrormaker 1.0 at info level, the 0.10.2.0 log is very similar) > {noformat} > 22 Mar 2018 02:16:07.407 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 35122 on > topic-partition logging-7, retrying (2147483646 attempts left). Error: > REQUEST_TIMED_OUT > 22 Mar 2018 02:17:49.731 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 51572 on > topic-partition logging-7, retrying (2147483646 attempts left). Error: > REQUEST_TIMED_OUT > 22 Mar 2018 02:18:33.903 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 57785 on > topic-partition logging-5, retrying (2147483646 attempts left). Error: > REQUEST_TIMED_OUT > 22 Mar 2018 02:21:21.399 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 85406 on > topic-partition logging-18, retrying (2147483646 attempts left). Error: > REQUEST_TIMED_OUT > 22 Mar 2018 02:25:22.278 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 128047 on > topic-partition logging-5, retrying (2147483646 attempts left). Error: > REQUEST_TIMED_OUT > 22 Mar 2018 02:26:17.154 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 137049 on > topic-partition logging-18, retrying (2147483646 attempts left). Error: > REQUEST_TIMED_OUT > 22 Mar 2018 02:27:57.358 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 153976 on > topic-partition logging-5, retrying (2147483646 attempts left). Error: > REQUEST_TIMED_OUT > 22 Mar 2018 02:27:57.779 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 154077 on > topic-partition logging-2, retrying (2147483646 attempts left). Error: > NETWORK_EXCEPTION > 22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 154077 on > topic-partition logging-10, retrying (2147483646 attempts left). Error: > NETWORK_EXCEPTION > 22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 154077 on > topic-partition logging-18, retrying (2147483646 attempts left). Error: > NETWORK_EXCEPTION > 22 Mar 2018 02:27:57.781 [kafka-producer-network-thread |
[jira] [Commented] (KAFKA-7640) Kafka stream interactive query not returning data when state is backed by rocksdb
[ https://issues.apache.org/jira/browse/KAFKA-7640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718018#comment-16718018 ] hitesh gollahalli bachanna commented on KAFKA-7640: --- Hello [~vvcephei] The rest api return 200 status code, but no data 1. Client is on 2.0.0 version but the server is 1.1.1 2. We use Virtual IP (to load balance ) as the kafka broker address in client code. 3. I have 36 partitions in the topic, for which there are 36 consumer on different machines. 4. I have about 360 million messages on the topic. I see rebalance happening, because i see a bunch of `revoked at the beginning of consumer rebalance`. Things I see in the when DEBUG is enabled. Using older server API v7 to send FETCHUsing older server API v5 to send METADATA \{topics=[],allow_auto_topic_creation=false} Using older server API v1 to send HEARTBEAT Is there anything in particular I need to look for in the logs. > Kafka stream interactive query not returning data when state is backed by > rocksdb > - > > Key: KAFKA-7640 > URL: https://issues.apache.org/jira/browse/KAFKA-7640 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: hitesh gollahalli bachanna >Priority: Major > > I have a kafka stream app running with 36 different instance (one for each > partition). Each instance come up one after the other. And I am building rest > service on top of the state to access the data. > Here some code that I use: > {code:java} > StreamsMetadata metadata = streams.metadataForKey(store, key, serializer); > --> call this find ouy which host has the key > if (localSelf.host().equals(hostStoreInfo.getHost())) { > get the key from local store > } > else { > call the remote host using restTemplate > }{code} > The problem now is `metadata` object returned has a different host/ip but the > data is on a different node. I was able to see using some application logs I > printed. This happens every time I start my application. > The `allMetadata` method in `KafkaStreams` class says the value will be > update as when the partition get reassigned. But its not happening in this > case. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6706) NETWORK_EXCEPTION and REQUEST_TIMED_OUT in mirrormaker producer after 1.0 broker upgrade
[ https://issues.apache.org/jira/browse/KAFKA-6706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718049#comment-16718049 ] Joseph Niemiec commented on KAFKA-6706: --- We have all the same error messages occurring in our production clusters. It started after we upgraded from 0.8 to 1.0.0. {code:java} // 2018-12-11 16:40:43 DEBUG NetworkClient:183 - [Producer clientId=KafkaExampleProducer] Disconnecting from node 1 due to request timeout. 2018-12-11 16:40:43 WARN Sender:251 - [Producer clientId=KafkaExampleProducer] Got error produce response with correlation id 21193 on topic-partition debug_dev_r2k-3, retrying (4 attempts left). Error: REQUEST_TIMED_OUT 2018-12-11 16:40:43 WARN Sender:251 - [Producer clientId=KafkaExampleProducer] Got error produce response with correlation id 21203 on topic-partition debug_dev_r2k-0, retrying (4 attempts left). Error: NETWORK_EXCEPTION 2018-12-11 16:40:43 WARN Sender:251 - [Producer clientId=KafkaExampleProducer] Got error produce response with correlation id 21203 on topic-partition debug_dev_r2k-3, retrying (4 attempts left). Error: NETWORK_EXCEPTION 2018-12-11 16:40:43 WARN Sender:251 - [Producer clientId=KafkaExampleProducer] Got error produce response with correlation id 21203 on topic-partition debug_dev_r2k-6, retrying (4 attempts left). Error: NETWORK_EXCEPTION {code} > NETWORK_EXCEPTION and REQUEST_TIMED_OUT in mirrormaker producer after 1.0 > broker upgrade > > > Key: KAFKA-6706 > URL: https://issues.apache.org/jira/browse/KAFKA-6706 > Project: Kafka > Issue Type: Bug > Components: core, network >Affects Versions: 1.0.0 >Reporter: Di Shang >Priority: Blocker > Labels: mirror-maker > > We have 2 clusters A and B with 4 brokers each, we use mirrormaker to > replicate topics from A to B. > We recently upgraded our brokers from 0.10.2.0 to 1.0.0, after the upgrade > we started seeing the mirrormaker task showing producer errors and > intermittently dying. > We tried using 1.0.0 and 0.10.2.0 mirrormaker, both have the same problem. > Downgrading cluster B brokers back to 0.10.2.0 and the problem went away, so > we think it's a server side problem. > There are 2 types of errors: REQUEST_TIMED_OUT and NETWORK_EXCEPTION. For > testing, I used a topic *logging* with 20 partitions and 3 replicas (same on > cluster A and B), the source topic has 50+ million msg. > (this is from mirrormaker 1.0 at info level, the 0.10.2.0 log is very similar) > {noformat} > 22 Mar 2018 02:16:07.407 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 35122 on > topic-partition logging-7, retrying (2147483646 attempts left). Error: > REQUEST_TIMED_OUT > 22 Mar 2018 02:17:49.731 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 51572 on > topic-partition logging-7, retrying (2147483646 attempts left). Error: > REQUEST_TIMED_OUT > 22 Mar 2018 02:18:33.903 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 57785 on > topic-partition logging-5, retrying (2147483646 attempts left). Error: > REQUEST_TIMED_OUT > 22 Mar 2018 02:21:21.399 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 85406 on > topic-partition logging-18, retrying (2147483646 attempts left). Error: > REQUEST_TIMED_OUT > 22 Mar 2018 02:25:22.278 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 128047 on > topic-partition logging-5, retrying (2147483646 attempts left). Error: > REQUEST_TIMED_OUT > 22 Mar 2018 02:26:17.154 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 137049 on > topic-partition logging-18, retrying (2147483646 attempts left). Error: > REQUEST_TIMED_OUT > 22 Mar 2018 02:27:57.358 [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer > clientId=producer-1] Got error produce response with correlation id 153976 on > topic-partition logging-5, retrying
[jira] [Commented] (KAFKA-7713) producer io-wait-ratio > 1
[ https://issues.apache.org/jira/browse/KAFKA-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718002#comment-16718002 ] Jun Rao commented on KAFKA-7713: io-wait-ratio is calculated as the ratio of accumulated selector waiting time over the length of the metric window. So as the metric window rolls, those accumulated waiting time falling into the rolled window will cause the metric to go above 1. Not sure what's the best way to address this. One way is to treat the Rate/Meter metric special. When we record the value, we split the value into the corresponding sample windows and bound the value by the window size. > producer io-wait-ratio > 1 > -- > > Key: KAFKA-7713 > URL: https://issues.apache.org/jira/browse/KAFKA-7713 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 >Reporter: dan norwood >Priority: Major > > i am running a test on a streams application and gathering jmx measurements > to determine what is causing some lag. using `kafka.tools.JmxTool` i was > gathering the following attributes `'io-ratio', 'io-wait-ratio', > 'select-rate', 'batch-size-avg', 'compression-rate-avg', 'record-size-avg', > 'records-per-request-avg'` on my streams instances producers. i noticed that > i was getting `io-wait-ratio > 1`, but according to docs it is "The fraction > of time the I/O thread spent waiting." > > some example lines from jmxtool > |StreamThread-8-producer:batch-size-avg|StreamThread-8-producer:compression-rate-avg|StreamThread-8-producer:io-ratio|*StreamThread-8-producer:io-wait-ratio*|StreamThread-8-producer:record-size-avg|StreamThread-8-producer:records-per-request-avg|StreamThread-8-producer:select-rate| > |662.2613636|0.3425814926|1.01E-04|*1.178371974*|172.2045455|38.7167|3.855527588| -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7331) Kafka does not detect broker loss in the event of a network partition within the cluster
[ https://issues.apache.org/jira/browse/KAFKA-7331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717919#comment-16717919 ] Kevin Li commented on KAFKA-7331: - feel free to close this > Kafka does not detect broker loss in the event of a network partition within > the cluster > > > Key: KAFKA-7331 > URL: https://issues.apache.org/jira/browse/KAFKA-7331 > Project: Kafka > Issue Type: Bug > Components: controller, network >Affects Versions: 1.0.1 >Reporter: Kevin Li >Priority: Major > > We ran into this issue on our production cluster and had to manually remove > the broker and enable unclean leader elections to get the cluster working > again. Ideally, Kafka itself could handle network partitions without manual > intervention. > The issue is reproducible with the following cross datacenter Kafka cluster > setup: > DC 1: Kafka brokers + ZK nodes > DC 2: Kafka brokers + ZK nodes > DC 3: Kafka brokers + ZK nodes > Introduce a network partition on a Kafka broker (brokerA) in DC 1 where it > cannot reach any hosts (brokers and ZK nodes) in the other 2 datacenters. The > cluster goes into a state where partitions that brokerA is a leader for will > only contain brokerA in its ISR. Since brokerA is still reachable by ZK nodes > in DC 1, it still shows up when querying ZK. The controller thinks brokerA is > still up and does not elect new leaders for partitions that brokerA is a > leader for. This causes all those partitions to be down until brokerA is back > or completely removed from the cluster (in which case unclean leader election > can elect new leaders for those partitions). > A faster recovery scenario could be for a majority of hosts (zk nodes?) to > realize that brokerA is unreachable, and mark it as down so elections for > partitions it is a leader for could be triggered. This avoids waiting > indefinitely for the broker to come back or taking action to remove the > broker from the cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7695) Cannot override StreamsPartitionAssignor in configuration
[ https://issues.apache.org/jira/browse/KAFKA-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717882#comment-16717882 ] John Roesler commented on KAFKA-7695: - Hi [~zirx], Just checking in... It sounds from the discussion above that this ticket, should probably be closed: Either 2.1 will work for you (with or without the TimestampExtractor trick above), or you could open a new Jira specifically for the behavior you desire. Do you agree? I'm just trying to close the loop, as it seems the conversation has petered out by now. Thanks, -John > Cannot override StreamsPartitionAssignor in configuration > -- > > Key: KAFKA-7695 > URL: https://issues.apache.org/jira/browse/KAFKA-7695 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0, 2.0.1 >Reporter: Dmitry Buykin >Priority: Major > Labels: configuration > > Cannot override StreamsPartitionAssignor by changing property > partition.assignment.strategy in KStreams 2.0.1 because the streams are > crashing inside KafkaClientSupplier.getGlobalConsumer. This GlobalConsumer > works only with RangeAssignor which configured by default. > Could be reproduced by setting up > `props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, > StreamsPartitionAssignor.class.getName());` > For me it looks like a bug. > Opened a discussion here > https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1543395977453700 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7223) KIP-328: Add in-memory Suppression
[ https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717599#comment-16717599 ] ASF GitHub Bot commented on KAFKA-7223: --- vvcephei opened a new pull request #6024: KAFKA-7223: document suppression buffer metrics URL: https://github.com/apache/kafka/pull/6024 Document the new metrics added in #5795 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KIP-328: Add in-memory Suppression > -- > > Key: KAFKA-7223 > URL: https://issues.apache.org/jira/browse/KAFKA-7223 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > As described in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.] > > This ticket is to implement Suppress, but only for in-memory buffers. > (depends on KAFKA-7222) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7712) Handle exceptions from immediately connected channels in Selector
[ https://issues.apache.org/jira/browse/KAFKA-7712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717411#comment-16717411 ] ASF GitHub Bot commented on KAFKA-7712: --- rajinisivaram opened a new pull request #6023: KAFKA-7712: Remove channel from Selector before propagating exception URL: https://github.com/apache/kafka/pull/6023 Ensure that channel and selection keys are removed from `Selector` collections before propagating connect exceptions. They are currently cleared on the next `poll()`, but we can't ensure that callers (NetworkClient for example) wont try to connect again before the next `poll` and hence we should clear the collections before re-throwing exceptions from `connect()`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle exceptions from immediately connected channels in Selector > - > > Key: KAFKA-7712 > URL: https://issues.apache.org/jira/browse/KAFKA-7712 > Project: Kafka > Issue Type: Bug > Components: network >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.2.0 > > > We try to handle all possible exceptions in Selector to ensure that channels > are always closed and their states kept consistent. For immediately connected > channels, we should ensure that any exception during connection results in > the channel being closed properly and removed from all maps. This is a very > unlikely scenario, but we do already handle the exception. We should clean up > properly in the catch block. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7680) fetching a refilled chunk of log can cause log divergence
[ https://issues.apache.org/jira/browse/KAFKA-7680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nikolay Izhikov reassigned KAFKA-7680: -- Assignee: Nikolay Izhikov > fetching a refilled chunk of log can cause log divergence > - > > Key: KAFKA-7680 > URL: https://issues.apache.org/jira/browse/KAFKA-7680 > Project: Kafka > Issue Type: Bug >Reporter: Jun Rao >Assignee: Nikolay Izhikov >Priority: Major > > We use FileRecords.writeTo to send a fetch response for a follower. A log > could be truncated and refilled in the middle of the send process (due to > leader change). Then it's possible for the follower to append some > uncommitted messages followed by committed messages. Those uncommitted > messages may never be removed, causing log divergence. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7086) Kafka server process dies after try deleting old log files under Windows 10
[ https://issues.apache.org/jira/browse/KAFKA-7086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717293#comment-16717293 ] Hans Schuell edited comment on KAFKA-7086 at 12/11/18 2:47 PM: --- I have a similar issue - see KAFKA-6188 - last commnt The english translation of the system error above is: *{color:#33}the process cannot access the file because it is being used by another process{color}.* As mention there, the log files cannot be deleted, because of the different behaviour of the Windows OS file system. Under Windows a file cannot be moved (in the case above) or deleted, when there is still an open file handle of the running current process or of another process. This makes a Windows installation of Kafka 2.0.0 currently unusable, because sooner or later this issue happens in a running or starting broker. was (Author: gira1): I have a similar issue - see KAFKA-6188 Comment The english translation of the system error above is: *{color:#33}the process cannot access the file because it is being used by another process{color}.* As mention there, the log files cannot be deleted, because of the different behaviour of the Windows OS file system. Under Windows a file cannot be moved (in the case above) or deleted, when there is still an open file handle of the running current process or of another process. This makes a Windows installation of Kafka 2.0.0 currently unusable, because sooner or later this issue happens in a running or starting broker. > Kafka server process dies after try deleting old log files under Windows 10 > --- > > Key: KAFKA-7086 > URL: https://issues.apache.org/jira/browse/KAFKA-7086 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.0 > Environment: Windows 10, Windows Server 2012 R2 >Reporter: Cezary Wagner >Priority: Major > Labels: windows > > Kafka after achieving log.retention.hours dies every time with error. > {noformat} > # Log Retention Policy > # > # The following configurations control the disposal of log segments. The > policy can > # be set to delete segments after a period of time, or after a given size has > accumulated. > # A segment will be deleted whenever *either* of these criteria are met. > Deletion always happens > # from the end of the log. > # The minimum age of a log file to be eligible for deletion due to age > log.retention.hours=168 > # A size-based retention policy for logs. Segments are pruned from the log > unless the remaining > # segments drop below log.retention.bytes. Functions independently of > log.retention.hours. > #log.retention.bytes=1073741824 > # The maximum size of a log segment file. When this size is reached a new log > segment will be created. > log.segment.bytes=1073741824 > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=30{noformat} > Exception raised: > {noformat} > > C:\root\kafka_2.12-1.1.0\data\__consumer_offsets-3\.log.swap: > > Proces nie mo┐e uzyskaŠ dostŕpu do pliku, poniewa┐ jest on u┐ywany przez > > inny proces. > at sun.nio.fs.WindowsException.translateToIOException(Unknown Source) > at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source) > at sun.nio.fs.WindowsFileCopy.move(Unknown Source) > at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source) > at java.nio.file.Files.move(Unknown Source) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697) > at > org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212) > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415) > at kafka.log.Log.replaceSegments(Log.scala:1644) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535) > at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462) > at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461) > at scala.collection.immutable.List.foreach(List.scala:389) > at kafka.log.Cleaner.doClean(LogCleaner.scala:461) > at kafka.log.Cleaner.clean(LogCleaner.scala:438) > at > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Suppressed: java.nio.file.FileSystemException: > C:\root\kafka_2.12-1.1.0\data\__consumer_offsets-3\.log.cleaned > -> >
[jira] [Comment Edited] (KAFKA-7086) Kafka server process dies after try deleting old log files under Windows 10
[ https://issues.apache.org/jira/browse/KAFKA-7086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717293#comment-16717293 ] Hans Schuell edited comment on KAFKA-7086 at 12/11/18 2:47 PM: --- I have a similar issue - see KAFKA-6188 - last comment The english translation of the system error above is: *{color:#33}the process cannot access the file because it is being used by another process{color}.* As mention there, the log files cannot be deleted, because of the different behaviour of the Windows OS file system. Under Windows a file cannot be moved (in the case above) or deleted, when there is still an open file handle of the running current process or of another process. This makes a Windows installation of Kafka 2.0.0 currently unusable, because sooner or later this issue happens in a running or starting broker. was (Author: gira1): I have a similar issue - see KAFKA-6188 - last commnt The english translation of the system error above is: *{color:#33}the process cannot access the file because it is being used by another process{color}.* As mention there, the log files cannot be deleted, because of the different behaviour of the Windows OS file system. Under Windows a file cannot be moved (in the case above) or deleted, when there is still an open file handle of the running current process or of another process. This makes a Windows installation of Kafka 2.0.0 currently unusable, because sooner or later this issue happens in a running or starting broker. > Kafka server process dies after try deleting old log files under Windows 10 > --- > > Key: KAFKA-7086 > URL: https://issues.apache.org/jira/browse/KAFKA-7086 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.0 > Environment: Windows 10, Windows Server 2012 R2 >Reporter: Cezary Wagner >Priority: Major > Labels: windows > > Kafka after achieving log.retention.hours dies every time with error. > {noformat} > # Log Retention Policy > # > # The following configurations control the disposal of log segments. The > policy can > # be set to delete segments after a period of time, or after a given size has > accumulated. > # A segment will be deleted whenever *either* of these criteria are met. > Deletion always happens > # from the end of the log. > # The minimum age of a log file to be eligible for deletion due to age > log.retention.hours=168 > # A size-based retention policy for logs. Segments are pruned from the log > unless the remaining > # segments drop below log.retention.bytes. Functions independently of > log.retention.hours. > #log.retention.bytes=1073741824 > # The maximum size of a log segment file. When this size is reached a new log > segment will be created. > log.segment.bytes=1073741824 > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=30{noformat} > Exception raised: > {noformat} > > C:\root\kafka_2.12-1.1.0\data\__consumer_offsets-3\.log.swap: > > Proces nie mo┐e uzyskaŠ dostŕpu do pliku, poniewa┐ jest on u┐ywany przez > > inny proces. > at sun.nio.fs.WindowsException.translateToIOException(Unknown Source) > at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source) > at sun.nio.fs.WindowsFileCopy.move(Unknown Source) > at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source) > at java.nio.file.Files.move(Unknown Source) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697) > at > org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212) > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415) > at kafka.log.Log.replaceSegments(Log.scala:1644) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535) > at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462) > at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461) > at scala.collection.immutable.List.foreach(List.scala:389) > at kafka.log.Cleaner.doClean(LogCleaner.scala:461) > at kafka.log.Cleaner.clean(LogCleaner.scala:438) > at > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Suppressed: java.nio.file.FileSystemException: > C:\root\kafka_2.12-1.1.0\data\__consumer_offsets-3\.log.cleaned > -> >
[jira] [Commented] (KAFKA-7086) Kafka server process dies after try deleting old log files under Windows 10
[ https://issues.apache.org/jira/browse/KAFKA-7086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717293#comment-16717293 ] Hans Schuell commented on KAFKA-7086: - I have a similar issue - see KAFKA-6188 Comment The english translation of the system error above is: *{color:#33}the process cannot access the file because it is being used by another process{color}.* As mention there, the log files cannot be deleted, because of the different behaviour of the Windows OS file system. Under Windows a file cannot be moved (in the case above) or deleted, when there is still an open file handle of the running current process or of another process. This makes a Windows installation of Kafka 2.0.0 currently unusable, because sooner or later this issue happens in a running or starting broker. > Kafka server process dies after try deleting old log files under Windows 10 > --- > > Key: KAFKA-7086 > URL: https://issues.apache.org/jira/browse/KAFKA-7086 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.0 > Environment: Windows 10, Windows Server 2012 R2 >Reporter: Cezary Wagner >Priority: Major > Labels: windows > > Kafka after achieving log.retention.hours dies every time with error. > {noformat} > # Log Retention Policy > # > # The following configurations control the disposal of log segments. The > policy can > # be set to delete segments after a period of time, or after a given size has > accumulated. > # A segment will be deleted whenever *either* of these criteria are met. > Deletion always happens > # from the end of the log. > # The minimum age of a log file to be eligible for deletion due to age > log.retention.hours=168 > # A size-based retention policy for logs. Segments are pruned from the log > unless the remaining > # segments drop below log.retention.bytes. Functions independently of > log.retention.hours. > #log.retention.bytes=1073741824 > # The maximum size of a log segment file. When this size is reached a new log > segment will be created. > log.segment.bytes=1073741824 > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=30{noformat} > Exception raised: > {noformat} > > C:\root\kafka_2.12-1.1.0\data\__consumer_offsets-3\.log.swap: > > Proces nie mo┐e uzyskaŠ dostŕpu do pliku, poniewa┐ jest on u┐ywany przez > > inny proces. > at sun.nio.fs.WindowsException.translateToIOException(Unknown Source) > at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source) > at sun.nio.fs.WindowsFileCopy.move(Unknown Source) > at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source) > at java.nio.file.Files.move(Unknown Source) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697) > at > org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212) > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415) > at kafka.log.Log.replaceSegments(Log.scala:1644) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535) > at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462) > at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461) > at scala.collection.immutable.List.foreach(List.scala:389) > at kafka.log.Cleaner.doClean(LogCleaner.scala:461) > at kafka.log.Cleaner.clean(LogCleaner.scala:438) > at > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Suppressed: java.nio.file.FileSystemException: > C:\root\kafka_2.12-1.1.0\data\__consumer_offsets-3\.log.cleaned > -> > C:\root\kafka_2.12-1.1.0\data\__consumer_offsets-3\.log.swap: > Proces nie mo┐e uzyskaŠ dostŕpu do pliku, poniewa┐ jest on u┐ywany przez > inny proces. > at sun.nio.fs.WindowsException.translateToIOException(Unknown > Source) > at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown > Source) > at sun.nio.fs.WindowsFileCopy.move(Unknown Source) > at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source) > at java.nio.file.Files.move(Unknown Source) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694) > ... 12 more > [2018-06-21 13:06:34,196] INFO [ReplicaManager broker=0] Stopping serving > replicas
[jira] [Updated] (KAFKA-7721) Connection to zookeeper refused
[ https://issues.apache.org/jira/browse/KAFKA-7721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mohammad Etemad updated KAFKA-7721: --- Component/s: (was: KafkaConnect) > Connection to zookeeper refused > --- > > Key: KAFKA-7721 > URL: https://issues.apache.org/jira/browse/KAFKA-7721 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 > Environment: Dockerized containers - kubernetes 1.9 >Reporter: Mohammad Etemad >Priority: Major > > Kafka throws exception when trying to connect to zookeeper. This happens when > zookeeper connection is lost and recovered. Kafka seems to be stuck in a loop > that cannot renew the connection. Here are the logs: > 2018-12-11 13:52:52,905] INFO Opening socket connection to server > zookeeper-0.zookeeper.logging.svc.cluster.local/10.38.128.12:2181. Will not > attempt to authenticate using SASL (unknown error) > (org.apache.zookeeper.ClientCnxn) > [2018-12-11 13:52:52,906] WARN Session 0x1001443ad77000f for server null, > unexpected error, closing socket connection and attempting reconnect > (org.apache.zookeeper.ClientCnxn) > java.net.ConnectException: Connection refused > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at > org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) > On the zookeeper side it can be seen that kafka connection is established. > Here are the logs: > 2018-12-11 13:53:44,969 [myid:] - INFO > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - > Accepted socket connection from /10.38.128.8:46066 > 2018-12-11 13:53:44,976 [myid:] - INFO > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@938] - Client > attempting to establish new session at /10.38.128.8:46066 > 2018-12-11 13:53:45,005 [myid:] - INFO [SyncThread:0:ZooKeeperServer@683] - > Established session 0x10060ff12a58dc0 with negotiated timeout 3 for > client /10.38.128.8:46066 > 2018-12-11 13:53:45,071 [myid:] - INFO [ProcessThread(sid:0 > cport:2181)::PrepRequestProcessor@487] - Processed session termination for > sessionid: 0x10060ff12a58dc0 > 2018-12-11 13:53:45,077 [myid:] - INFO > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed > socket connection for client /10.38.128.8:46066 which had sessionid > 0x10060ff12a58dc0 > 2018-12-11 13:53:47,119 [myid:] - INFO > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - > Accepted socket connection from /10.36.0.8:48798 > 2018-12-11 13:53:47,124 [myid:] - INFO > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@938] - Client > attempting to establish new session at /10.36.0.8:48798 > 2018-12-11 13:53:47,134 [myid:] - INFO [SyncThread:0:ZooKeeperServer@683] - > Established session 0x10060ff12a58dc1 with negotiated timeout 3 for > client /10.36.0.8:48798 > 2018-12-11 13:53:47,582 [myid:] - INFO [ProcessThread(sid:0 > cport:2181)::PrepRequestProcessor@487] - Processed session termination for > sessionid: 0x10060ff12a58dc1 > 2018-12-11 13:53:47,592 [myid:] - INFO > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed > socket connection for client /10.36.0.8:48798 which had sessionid > 0x10060ff12a58dc1 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7721) Connection to zookeeper refused
Mohammad Etemad created KAFKA-7721: -- Summary: Connection to zookeeper refused Key: KAFKA-7721 URL: https://issues.apache.org/jira/browse/KAFKA-7721 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.1.0 Environment: Dockerized containers - kubernetes 1.9 Reporter: Mohammad Etemad Kafka throws exception when trying to connect to zookeeper. This happens when zookeeper connection is lost and recovered. Kafka seems to be stuck in a loop that cannot renew the connection. Here are the logs: 2018-12-11 13:52:52,905] INFO Opening socket connection to server zookeeper-0.zookeeper.logging.svc.cluster.local/10.38.128.12:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2018-12-11 13:52:52,906] WARN Session 0x1001443ad77000f for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) On the zookeeper side it can be seen that kafka connection is established. Here are the logs: 2018-12-11 13:53:44,969 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /10.38.128.8:46066 2018-12-11 13:53:44,976 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@938] - Client attempting to establish new session at /10.38.128.8:46066 2018-12-11 13:53:45,005 [myid:] - INFO [SyncThread:0:ZooKeeperServer@683] - Established session 0x10060ff12a58dc0 with negotiated timeout 3 for client /10.38.128.8:46066 2018-12-11 13:53:45,071 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x10060ff12a58dc0 2018-12-11 13:53:45,077 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /10.38.128.8:46066 which had sessionid 0x10060ff12a58dc0 2018-12-11 13:53:47,119 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /10.36.0.8:48798 2018-12-11 13:53:47,124 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@938] - Client attempting to establish new session at /10.36.0.8:48798 2018-12-11 13:53:47,134 [myid:] - INFO [SyncThread:0:ZooKeeperServer@683] - Established session 0x10060ff12a58dc1 with negotiated timeout 3 for client /10.36.0.8:48798 2018-12-11 13:53:47,582 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x10060ff12a58dc1 2018-12-11 13:53:47,592 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /10.36.0.8:48798 which had sessionid 0x10060ff12a58dc1 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7719) Improve fairness in SocketServer processors
[ https://issues.apache.org/jira/browse/KAFKA-7719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716853#comment-16716853 ] ASF GitHub Bot commented on KAFKA-7719: --- rajinisivaram opened a new pull request #6022: KAFKA-7719: Improve fairness in SocketServer processors (KIP-402) URL: https://github.com/apache/kafka/pull/6022 Limit the number of new connections processed in each iteration in `SocketServer` on each `Processor`. Block `Acceptor` if the connection queue is full on all Processors. Added a metric to track accept idle time percent. See KIP-402 for details. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve fairness in SocketServer processors > --- > > Key: KAFKA-7719 > URL: https://issues.apache.org/jira/browse/KAFKA-7719 > Project: Kafka > Issue Type: Improvement > Components: network >Affects Versions: 2.1.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.2.0 > > > See > https://cwiki.apache.org/confluence/display/KAFKA/KIP-402%3A+Improve+fairness+in+SocketServer+processors > for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7720) kafka-configs script should also describe default broker entries
[ https://issues.apache.org/jira/browse/KAFKA-7720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716782#comment-16716782 ] Edoardo Comar commented on KAFKA-7720: -- cc [~mimaison] > kafka-configs script should also describe default broker entries > > > Key: KAFKA-7720 > URL: https://issues.apache.org/jira/browse/KAFKA-7720 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Edoardo Comar >Priority: Major > > Running the configs tool to describe the broker configs only appears to print > dynamically added entries. > Running > {{bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe > --entity-default --entity-type brokers}} > on a broker without a prior added configs > {{--alter --add-config 'key=value'}} > will otherwise print an empty list. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7719) Improve fairness in SocketServer processors
Rajini Sivaram created KAFKA-7719: - Summary: Improve fairness in SocketServer processors Key: KAFKA-7719 URL: https://issues.apache.org/jira/browse/KAFKA-7719 Project: Kafka Issue Type: Improvement Components: network Affects Versions: 2.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.2.0 See https://cwiki.apache.org/confluence/display/KAFKA/KIP-402%3A+Improve+fairness+in+SocketServer+processors for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7720) kafka-configs script should also describe default broker entries
Edoardo Comar created KAFKA-7720: Summary: kafka-configs script should also describe default broker entries Key: KAFKA-7720 URL: https://issues.apache.org/jira/browse/KAFKA-7720 Project: Kafka Issue Type: Improvement Components: tools Reporter: Edoardo Comar Running the configs tool to describe the broker configs only appears to print dynamically added entries. Running {{bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-default --entity-type brokers}} on a broker without a prior added configs {{--alter --add-config 'key=value'}} will otherwise print an empty list. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user
[ https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716756#comment-16716756 ] Matthias J. Sax commented on KAFKA-7580: I agree that it is extremely hard. Unfortunately, I don't have any good advice how to tackle it... :( > Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when > run as root user > -- > > Key: KAFKA-7580 > URL: https://issues.apache.org/jira/browse/KAFKA-7580 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2 > Environment: Ubuntu 16.04.3 LTS >Reporter: Sarvesh Tamba >Priority: Minor > > Created a non-root user and ran the following command to execute the failiing > unit test:- > ./gradlew streams:unitTest --tests > org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir > For a root user, the test case fails:- > = > > Task :streams:testClasses UP-TO-DATE > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED > java.lang.AssertionError: Expected exception: > org.apache.kafka.streams.errors.ProcessorStateException > 1 test completed, 1 failed > > Task :streams:unitTest FAILED > FAILURE: Build failed with an exception. > * What went wrong: > Execution failed for task ':streams:unitTest'. > > There were failing tests. See the report at: > > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html > * Try: > Run with --stacktrace option to get the stack trace. Run with --info or > --debug option to get more log output. Run with --scan to get full insights. > * Get more help at https://help.gradle.org > BUILD FAILED in 20s > 26 actionable tasks: 2 executed, 24 up-to-date > = > However, for a non-root user the test cass passes as success:- > = > > Task :streams:testClasses > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED > BUILD SUCCESSFUL in 45s > 26 actionable tasks: 4 executed, 22 up-to-date > = > The failing unit test - > "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary > file directory and sets it as readOnly. The unit test is intended to throw an > exception - "ProcessorStateException", when the readOnly temporary file > directory is opened/accessed. > By default, non-root users opening/accessing readOnly file directory is not > allowed and it rightly throws up an error/exception in the unit test(which is > the intention of the unit test and it passes for non-root users). > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent > mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied > > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/ > ls: cannot access '/tmp/readOnlyDir/..': Permission denied > ls: cannot access '/tmp/readOnlyDir/.': Permission denied > ls: cannot access '/tmp/readOnlyDir/kid': Permission denied > ls: cannot access '/tmp/readOnlyDir/child': Permission denied > total 0 > d? ? ? ? ? ? ./ > d? ? ? ? ? ? ../ > d? ? ? ? ? ? child/ > d? ? ? ? ? ? kid/ > However, by default, root user can access any file in the system.:- > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 112 > dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > > root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent > > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 116 > dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/ > Hence the unit test does not throw an exception - "ProcessorStateException" > when the readOnly temporary file directory is opened, and the unit test > rightly fails for a root user. > Two approaches for resolving this failing unit test case:- > 1.) Run the unit tests as non-root users(simplest). > 2.) If running the unit test as root user, make the temporary file directory > as immutable in the unit test code and then test for exception(needs code > changes in the unit tests):- > root@p006vm18:/tmp# chattr +i /tmp/readOnlyDir/ > root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/grandparent
[jira] [Commented] (KAFKA-7708) [kafka-streams-scala] Invalid signature for KTable join in 2.12
[ https://issues.apache.org/jira/browse/KAFKA-7708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716720#comment-16716720 ] ASF GitHub Bot commented on KAFKA-7708: --- lodamar closed pull request #6019: KAFKA-7708: Fixed KTable tests using KStream API in scala tests URL: https://github.com/apache/kafka/pull/6019 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala index dc080f13310..0ef50e383c9 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala @@ -34,26 +34,32 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { val sourceTopic = "source" val sinkTopic = "sink" -val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count() -table.filter((_, value) => value > 1).toStream.to(sinkTopic) +val table = builder.table[String, String](sourceTopic) +table.mapValues(_.length).filter((_, value) => value > 5).toStream.to(sinkTopic) val testDriver = createTestDriver(builder) { - testDriver.pipeRecord(sourceTopic, ("1", "value1")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "firstvalue")) + val record = testDriver.readRecord[String, Int](sinkTopic) record.key shouldBe "1" - record.value shouldBe (null: java.lang.Long) + record.value shouldBe 10 +} +{ + testDriver.pipeRecord(sourceTopic, ("1", "secondvalue")) + val record = testDriver.readRecord[String, Int](sinkTopic) + record.key shouldBe "1" + record.value shouldBe 11 } { - testDriver.pipeRecord(sourceTopic, ("1", "value2")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "short")) + val record = testDriver.readRecord[String, Int](sinkTopic) record.key shouldBe "1" - record.value shouldBe 2 + record.value shouldBe (null: java.lang.Long) } { - testDriver.pipeRecord(sourceTopic, ("2", "value1")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("2", "val3")) + val record = testDriver.readRecord[String, Int](sinkTopic) record.key shouldBe "2" record.value shouldBe (null: java.lang.Long) } @@ -67,30 +73,36 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { val sourceTopic = "source" val sinkTopic = "sink" -val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count() -table.filterNot((_, value) => value > 1).toStream.to(sinkTopic) +val table = builder.table[String, String](sourceTopic) +table.filterNot((_, value) => value.exists(_.isUpper)).toStream.to(sinkTopic) val testDriver = createTestDriver(builder) { - testDriver.pipeRecord(sourceTopic, ("1", "value1")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "FirstValue")) + val record = testDriver.readRecord[String, String](sinkTopic) record.key shouldBe "1" - record.value shouldBe 1 + record.value shouldBe (null: java.lang.String) } { - testDriver.pipeRecord(sourceTopic, ("1", "value2")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "secondvalue")) + val record = testDriver.readRecord[String, String](sinkTopic) record.key shouldBe "1" - record.value shouldBe (null: java.lang.Long) + record.value shouldBe "secondvalue" } { - testDriver.pipeRecord(sourceTopic, ("2", "value1")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "Short")) + val record = testDriver.readRecord[String, String](sinkTopic) + record.key shouldBe "1" + record.value shouldBe (null: java.lang.String) +} +{ + testDriver.pipeRecord(sourceTopic, ("2", "val")) + val record = testDriver.readRecord[String, String](sinkTopic) record.key shouldBe "2" - record.value shouldBe 1 + record.value shouldBe "val" } -testDriver.readRecord[String, Long](sinkTopic) shouldBe null +testDriver.readRecord[String, String](sinkTopic) shouldBe null testDriver.close() } @@ -101,17 +113,17
[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors
[ https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716687#comment-16716687 ] ASF GitHub Bot commented on KAFKA-6970: --- mjsax closed pull request #6016: KAFKA-6970: All standard state stores guarded with read only wrapper URL: https://github.com/apache/kafka/pull/6016 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java index ff3ef44894b..99ba0f6ce06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java @@ -50,9 +50,18 @@ private CachedStateStore cachedStateStore(final StateStore store) { if (store instanceof CachedStateStore) { return (CachedStateStore) store; -} else if (store instanceof WrappedStateStore -&& ((WrappedStateStore) store).wrappedStore() instanceof CachedStateStore) { -return (CachedStateStore) ((WrappedStateStore) store).wrappedStore(); +} else if (store instanceof WrappedStateStore) { +StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + +while (wrapped instanceof WrappedStateStore && !(wrapped instanceof CachedStateStore)) { +wrapped = ((WrappedStateStore) wrapped).wrappedStore(); +} + +if (!(wrapped instanceof CachedStateStore)) { +return null; +} + +return (CachedStateStore) wrapped; } return null; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index c79ec35328a..e7dd4dbc42a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.ApiUtils; @@ -37,6 +38,7 @@ import java.time.Duration; import java.util.List; +import org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; @@ -102,7 +104,16 @@ public StateStore getStateStore(final String name) { "please file a bug report at https://issues.apache.org/jira/projects/KAFKA.;); } -return stateManager.getStore(name); +final StateStore store = stateManager.getStore(name); +if (store instanceof KeyValueStore) { +return new KeyValueStoreReadWriteDecorator((KeyValueStore) store); +} else if (store instanceof WindowStore) { +return new WindowStoreReadWriteDecorator((WindowStore) store); +} else if (store instanceof SessionStore) { +return new SessionStoreReadWriteDecorator((SessionStore) store); +} + +return store; } @SuppressWarnings("unchecked") @@ -196,23 +207,16 @@ public long streamTime() { return streamTimeSupplier.get(); } -private abstract static class StateStoreReadOnlyDecorator implements StateStore { +private abstract static class StateStoreReadOnlyDecorator extends AbstractStateStore { static final String ERROR_MESSAGE = "Global store is read only"; -final T underlying; - -StateStoreReadOnlyDecorator(final T underlying) { -this.underlying = underlying; -} - -@Override -public String name() { -return underlying.name(); +StateStoreReadOnlyDecorator(final T inner) { +super(inner); } -@Override -public void init(final ProcessorContext context, final StateStore root) { -underlying.init(context, root); +@SuppressWarnings("unchecked") +T getInner() { +return (T) wrappedStore(); } @Override @@ -221,44 +225,39 @@ public void flush() { } @Override -public void close() { -underlying.close(); -} - -@Override -public boolean persistent() { -return underlying.persistent(); +
[jira] [Updated] (KAFKA-7443) OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic
[ https://issues.apache.org/jira/browse/KAFKA-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7443: --- Affects Version/s: 1.1.0 1.1.1 2.0.1 > OffsetOutOfRangeException in restoring state store from changelog topic when > start offset of local checkpoint is smaller than that of changelog topic > - > > Key: KAFKA-7443 > URL: https://issues.apache.org/jira/browse/KAFKA-7443 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0 >Reporter: linyue li >Assignee: linyue li >Priority: Major > Labels: feather > Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2 > > Attachments: KAFKA-7443.url > > > When restoring local state store from a changelog topic in EOS, kafka stream > will sometimes throw out the OffsetOutOfRangeException such as: > {quote}Restoring StreamTasks failed. Deleting StreamTasks stores to recreate > from scratch. > org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of > range with no configured reset policy for partitions: > {AuditTrailBatch_PROD3-Dedup-key-store-changelog-32=75465112} > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:950) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:470) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1249) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1157) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:89) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:765) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:734) > > {quote} > This scenario occurs when changelog topic deleted the expired log segments > according to the retention.ms, but the start offset in the local .checkpoint > file is the position when the task last exits from this instance, which may > be smaller than the updated beginning offset of changelog topic. Restoring > store from start offset in checkpoint file will throw exception. > It can be reproduced as below (Kafka Stream runs in EOS): > # task for topic partition test-1 is running on instance A. When task exits, > kafka stream writes the last committed offset 100 for test-1 in checkpoint > file. > # task test-1 transfer to instance B. > # During this time, the remote changelog topic for test-1 updates its start > offset to 120 as the old log segment reaches retention time and is deleted. > # After a while, task test-1 exits from instance B and resumes on instance > A, and task restores local state store of A from checkpoint offset 100, which > is smaller than the valid offset 120 of changelog topic. Such exception > throws out. > When this exception occurs, kafka stream tries to reinitialize the task and > intends to restore from beginning in catch block below. Unfortunately, this > handle not work and the task keeps throwing OffsetOutOfRangeException in the > following restoring processes. > {code:java} > //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java > //handle for OffsetOutOfRangeException in kafka stream > catch (final InvalidOffsetException recoverableException) { > log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to > recreate from scratch.", recoverableException); > final Set partitions = recoverableException.partitions(); > for (final TopicPartition partition : partitions) { >final StreamTask task = active.restoringTaskFor(partition); >log.info("Reinitializing StreamTask {} for changelog {}", task, partition); >needsInitializing.remove(partition); >needsRestoring.remove(partition); > > task.reinitializeStateStoresForPartitions(recoverableException.partitions()); > } > restoreConsumer.seekToBeginning(partitions); > }{code} > > Investigate why the handle for this exception not work, I found the root > cause: > Kafka stream registered state restorers in the variable stateRestorers, > which is used to read /update the start and end offset for restoring local > state store. > {code:java} >
[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user
[ https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716588#comment-16716588 ] Sarvesh Tamba commented on KAFKA-7580: -- Reproducing "pure virtual method" issue is extremely hard, since it happens intermittently and for any random unit test(not the same unit test will fail next time). The ones noted above were some of the failing unit tests observed. Note that the status next to the test shows PASSED(is this correct or misleading?). > Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when > run as root user > -- > > Key: KAFKA-7580 > URL: https://issues.apache.org/jira/browse/KAFKA-7580 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2 > Environment: Ubuntu 16.04.3 LTS >Reporter: Sarvesh Tamba >Priority: Minor > > Created a non-root user and ran the following command to execute the failiing > unit test:- > ./gradlew streams:unitTest --tests > org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir > For a root user, the test case fails:- > = > > Task :streams:testClasses UP-TO-DATE > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED > java.lang.AssertionError: Expected exception: > org.apache.kafka.streams.errors.ProcessorStateException > 1 test completed, 1 failed > > Task :streams:unitTest FAILED > FAILURE: Build failed with an exception. > * What went wrong: > Execution failed for task ':streams:unitTest'. > > There were failing tests. See the report at: > > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html > * Try: > Run with --stacktrace option to get the stack trace. Run with --info or > --debug option to get more log output. Run with --scan to get full insights. > * Get more help at https://help.gradle.org > BUILD FAILED in 20s > 26 actionable tasks: 2 executed, 24 up-to-date > = > However, for a non-root user the test cass passes as success:- > = > > Task :streams:testClasses > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED > BUILD SUCCESSFUL in 45s > 26 actionable tasks: 4 executed, 22 up-to-date > = > The failing unit test - > "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary > file directory and sets it as readOnly. The unit test is intended to throw an > exception - "ProcessorStateException", when the readOnly temporary file > directory is opened/accessed. > By default, non-root users opening/accessing readOnly file directory is not > allowed and it rightly throws up an error/exception in the unit test(which is > the intention of the unit test and it passes for non-root users). > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent > mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied > > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/ > ls: cannot access '/tmp/readOnlyDir/..': Permission denied > ls: cannot access '/tmp/readOnlyDir/.': Permission denied > ls: cannot access '/tmp/readOnlyDir/kid': Permission denied > ls: cannot access '/tmp/readOnlyDir/child': Permission denied > total 0 > d? ? ? ? ? ? ./ > d? ? ? ? ? ? ../ > d? ? ? ? ? ? child/ > d? ? ? ? ? ? kid/ > However, by default, root user can access any file in the system.:- > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 112 > dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > > root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent > > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 116 > dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/ > Hence the unit test does not throw an exception - "ProcessorStateException" > when the readOnly temporary file directory is opened, and the unit test > rightly fails for a root user. > Two approaches for resolving this failing unit test case:- > 1.) Run the unit tests as non-root users(simplest). > 2.) If running the unit test as root user, make the temporary file directory >
[jira] [Updated] (KAFKA-7443) OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic
[ https://issues.apache.org/jira/browse/KAFKA-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] linyue li updated KAFKA-7443: - Fix Version/s: 2.1.1 > OffsetOutOfRangeException in restoring state store from changelog topic when > start offset of local checkpoint is smaller than that of changelog topic > - > > Key: KAFKA-7443 > URL: https://issues.apache.org/jira/browse/KAFKA-7443 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0, 2.1.0 >Reporter: linyue li >Assignee: linyue li >Priority: Major > Labels: feather > Fix For: 2.1.1 > > Attachments: KAFKA-7443.url > > > When restoring local state store from a changelog topic in EOS, kafka stream > will sometimes throw out the OffsetOutOfRangeException such as: > {quote}Restoring StreamTasks failed. Deleting StreamTasks stores to recreate > from scratch. > org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of > range with no configured reset policy for partitions: > {AuditTrailBatch_PROD3-Dedup-key-store-changelog-32=75465112} > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:950) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:470) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1249) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1157) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:89) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:765) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:734) > > {quote} > This scenario occurs when changelog topic deleted the expired log segments > according to the retention.ms, but the start offset in the local .checkpoint > file is the position when the task last exits from this instance, which may > be smaller than the updated beginning offset of changelog topic. Restoring > store from start offset in checkpoint file will throw exception. > It can be reproduced as below (Kafka Stream runs in EOS): > # task for topic partition test-1 is running on instance A. When task exits, > kafka stream writes the last committed offset 100 for test-1 in checkpoint > file. > # task test-1 transfer to instance B. > # During this time, the remote changelog topic for test-1 updates its start > offset to 120 as the old log segment reaches retention time and is deleted. > # After a while, task test-1 exits from instance B and resumes on instance > A, and task restores local state store of A from checkpoint offset 100, which > is smaller than the valid offset 120 of changelog topic. Such exception > throws out. > When this exception occurs, kafka stream tries to reinitialize the task and > intends to restore from beginning in catch block below. Unfortunately, this > handle not work and the task keeps throwing OffsetOutOfRangeException in the > following restoring processes. > {code:java} > //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java > //handle for OffsetOutOfRangeException in kafka stream > catch (final InvalidOffsetException recoverableException) { > log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to > recreate from scratch.", recoverableException); > final Set partitions = recoverableException.partitions(); > for (final TopicPartition partition : partitions) { >final StreamTask task = active.restoringTaskFor(partition); >log.info("Reinitializing StreamTask {} for changelog {}", task, partition); >needsInitializing.remove(partition); >needsRestoring.remove(partition); > > task.reinitializeStateStoresForPartitions(recoverableException.partitions()); > } > restoreConsumer.seekToBeginning(partitions); > }{code} > > Investigate why the handle for this exception not work, I found the root > cause: > Kafka stream registered state restorers in the variable stateRestorers, > which is used to read /update the start and end offset for restoring local > state store. > {code:java} > //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java > private final Map stateRestorers = new > HashMap<>();{code} > When the OffsetOutOfRangeException occurs,
[jira] [Commented] (KAFKA-7443) OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic
[ https://issues.apache.org/jira/browse/KAFKA-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716567#comment-16716567 ] ASF GitHub Bot commented on KAFKA-7443: --- mjsax closed pull request #5946: KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic URL: https://github.com/apache/kafka/pull/5946 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index e43c292e6a3..f877f9d13a4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -188,6 +188,10 @@ public synchronized void unsubscribe() { if (!subscriptions.isPaused(entry.getKey())) { final List> recs = entry.getValue(); for (final ConsumerRecord rec : recs) { +if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > subscriptions.position(entry.getKey())) { +throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), subscriptions.position(entry.getKey(; +} + if (assignment().contains(entry.getKey()) && rec.offset() >= subscriptions.position(entry.getKey())) { results.computeIfAbsent(entry.getKey(), partition -> new ArrayList<>()).add(rec); subscriptions.position(entry.getKey(), rec.offset() + 1); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 34e6e5cdb6f..fdd9d6c303c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -107,6 +107,8 @@ public void register(final StateRestorer restorer) { needsInitializing.remove(partition); needsRestoring.remove(partition); +final StateRestorer restorer = stateRestorers.get(partition); +restorer.setCheckpointOffset(StateRestorer.NO_CHECKPOINT); task.reinitializeStateStoresForPartitions(recoverableException.partitions()); } restoreConsumer.seekToBeginning(partitions); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 34f0a32b88c..d08f0d7360d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -157,6 +157,42 @@ public void shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() { assertThat(callback.restored.size(), equalTo(messages)); } +@Test +public void shouldRecoverFromOffsetOutOfRangeExceptionAndRestoreFromStart() { +final int messages = 10; +final int startOffset = 5; +final long expiredCheckpoint = 1L; +assignPartition(messages, topicPartition); + consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, (long) startOffset)); +consumer.updateEndOffsets(Collections.singletonMap(topicPartition, (long) (messages + startOffset))); + +addRecords(messages, topicPartition, startOffset); +consumer.assign(Collections.emptyList()); + +final StateRestorer stateRestorer = new StateRestorer( +topicPartition, +restoreListener, +expiredCheckpoint, +Long.MAX_VALUE, +true, +"storeName"); +changelogReader.register(stateRestorer); + + EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); +EasyMock.replay(active, task); + +// first restore call "fails" since OffsetOutOfRangeException but we should not die with an exception +assertEquals(0, changelogReader.restore(active).size()); +//the starting offset for stateRestorer is set to NO_CHECKPOINT +assertThat(stateRestorer.checkpoint(), equalTo(-1L)); + +//restore the active task again +
[jira] [Updated] (KAFKA-7523) TransformerSupplier/ProcessorSupplier enhancements
[ https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7523: --- Labels: kip (was: needs-kip) > TransformerSupplier/ProcessorSupplier enhancements > -- > > Key: KAFKA-7523 > URL: https://issues.apache.org/jira/browse/KAFKA-7523 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paul Whalen >Assignee: Paul Whalen >Priority: Minor > Labels: kip > > I have found that when writing "low level" {{Processors}} and > {{Transformers}} that are stateful, often I want these processors to "own" > one or more state stores, the details of which are not important to the > business logic of the application. However, when incorporating these into > the topologies defined by the high level API, using {{KStream::transform}} or > {{KStream::process}}, I'm forced to specify the stores so the topology is > wired up correctly. This creates an unfortunate pattern where the > {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the > pattern I've been following) holds the information about the name of the > state stores, must be defined above the "high level" "fluent API"-style > pipeline, which makes it hard to understand the business logic data flow. > > What I currently have to do: > {code:java} > TransformerSupplier transformerSupplier = new > TransformerSupplierWithState(topology, val -> businessLogic(val)); > builder.stream("in.topic") > .transform(transformerSupplier, transformerSupplier.stateStoreNames()) > .to("out.topic");{code} > I have to both define the {{TransformerSupplier}} above the "fluent block", > and pass the topology in so I can call {{topology.addStateStore()}} inside > the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what > the state store names are for that point in the topology. The lambda {{val -> > businessLogic(val)}} is really what I want to see in-line because that's the > crux of what is happening, along with the name of some factory method > describing what the transformer is doing for me internally. This issue is > obviously exacerbated when the "fluent block" is much longer than this > example - It gets worse the farther away {{val -> businessLogic(val)}} is > from {{KStream::transform}}. > > An improvement: > {code:java} > builder.stream("in.topic") > .transform(transformerSupplierWithState(topology, val -> > businessLogic(val))) > .to("out.topic");{code} > Which implies the existence of a {{KStream::transform}} that takes a single > argument that adheres to this interface: > {code:java} > interface TransformerSupplierWithState { > Transformer get(); > String[] stateStoreNames(); > }{code} > Or better yet, I wouldn't have to pass in the topology, the caller of > {{TransformerSupplierWithState}} could also handle the job of "adding" its > state stores to the topology: > {code:java} > interface TransformerSupplierWithState { > Transformer get(); > Map stateStores(); > }{code} > Which would enable my ideal: > {code:java} > builder.stream("in.topic") > .transform(transformerSupplierWithState(val -> businessLogic(val))) > .to("out.topic");{code} > I think this would be a huge improvement in the usability of low-level > processors with the high-level DSL. > Please let me know if I'm missing something as to why this cannot or should > not happen, or if there is a better forum for this suggestion (presumably it > would require a KIP?). I'd be happy to build it as well if there is a chance > of it being merged, it doesn't seem like a huge challenge to me. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user
[ https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716433#comment-16716433 ] Matthias J. Sax commented on KAFKA-7580: "pure virtual method" indicates an issue with RocksDB. Maybe some binaries are not compatible on your OS? It could also be a bug in Streams code base itself thought. It's pretty hard to debug those. Can you reproduce it reliably? Do you know test fails at which point? > Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when > run as root user > -- > > Key: KAFKA-7580 > URL: https://issues.apache.org/jira/browse/KAFKA-7580 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2 > Environment: Ubuntu 16.04.3 LTS >Reporter: Sarvesh Tamba >Priority: Minor > > Created a non-root user and ran the following command to execute the failiing > unit test:- > ./gradlew streams:unitTest --tests > org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir > For a root user, the test case fails:- > = > > Task :streams:testClasses UP-TO-DATE > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED > java.lang.AssertionError: Expected exception: > org.apache.kafka.streams.errors.ProcessorStateException > 1 test completed, 1 failed > > Task :streams:unitTest FAILED > FAILURE: Build failed with an exception. > * What went wrong: > Execution failed for task ':streams:unitTest'. > > There were failing tests. See the report at: > > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html > * Try: > Run with --stacktrace option to get the stack trace. Run with --info or > --debug option to get more log output. Run with --scan to get full insights. > * Get more help at https://help.gradle.org > BUILD FAILED in 20s > 26 actionable tasks: 2 executed, 24 up-to-date > = > However, for a non-root user the test cass passes as success:- > = > > Task :streams:testClasses > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED > BUILD SUCCESSFUL in 45s > 26 actionable tasks: 4 executed, 22 up-to-date > = > The failing unit test - > "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary > file directory and sets it as readOnly. The unit test is intended to throw an > exception - "ProcessorStateException", when the readOnly temporary file > directory is opened/accessed. > By default, non-root users opening/accessing readOnly file directory is not > allowed and it rightly throws up an error/exception in the unit test(which is > the intention of the unit test and it passes for non-root users). > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent > mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied > > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/ > ls: cannot access '/tmp/readOnlyDir/..': Permission denied > ls: cannot access '/tmp/readOnlyDir/.': Permission denied > ls: cannot access '/tmp/readOnlyDir/kid': Permission denied > ls: cannot access '/tmp/readOnlyDir/child': Permission denied > total 0 > d? ? ? ? ? ? ./ > d? ? ? ? ? ? ../ > d? ? ? ? ? ? child/ > d? ? ? ? ? ? kid/ > However, by default, root user can access any file in the system.:- > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 112 > dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > > root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent > > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 116 > dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/ > Hence the unit test does not throw an exception - "ProcessorStateException" > when the readOnly temporary file directory is opened, and the unit test > rightly fails for a root user. > Two approaches for resolving this failing unit test case:- > 1.) Run the unit tests as non-root users(simplest). > 2.) If running the unit test as root user, make the temporary file directory > as immutable in the unit test code and then