[jira] [Commented] (KAFKA-7694) Support ZooKeeper based master/secret key management for delegation tokens

2018-12-11 Thread Satish Duggana (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718532#comment-16718532
 ] 

Satish Duggana commented on KAFKA-7694:
---

This is about giving pluggable interfaces for delegation token storage and 
master key management and have zookeeper storage as the default implementation.

I am working on a KIP for the above, I will send it out when it is ready.

>  Support ZooKeeper based master/secret key management for delegation tokens
> ---
>
> Key: KAFKA-7694
> URL: https://issues.apache.org/jira/browse/KAFKA-7694
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Manikumar
>Assignee: Satish Duggana
>Priority: Major
>
> Master/secret key is used to generate and verify delegation tokens. 
> currently, master key/secret is stored as plain text in server.properties 
> config file. Same key must be configured across all the brokers. We require a 
> re-deployment when the secret needs to be rotated.
> This JIRA is to explore and implement a ZooKeeper based master/secret key 
> management to automate secret key generation and expiration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7723) Kafka Connect support override worker kafka api configuration with connector configuration that post by rest api

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718427#comment-16718427
 ] 

ASF GitHub Bot commented on KAFKA-7723:
---

sweat123 opened a new pull request #6026: KAFKA-7723: Support override kafka 
connect worker api configuration with rest api
URL: https://github.com/apache/kafka/pull/6026
 
 
   More details https://issues.apache.org/jira/browse/KAFKA-7723
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect support override worker kafka api configuration with connector 
> configuration that post by rest api
> 
>
> Key: KAFKA-7723
> URL: https://issues.apache.org/jira/browse/KAFKA-7723
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: laomei
>Priority: Minor
>
> I'm using kafka sink connect; "auto.offset.reset" is set in 
> connect-distributed*.properties; 
> It works for all connector which in one worker; So the consumer will poll 
> records from latest or earliest; I can not control the auto.offset.reset in 
> connector configs post with rest api;
> So I think is necessary to override worker kafka api configs with connector 
> configs;  
> Like this
> {code:java}
>   {
> "name": "test",
> "config": {
> "consumer.auto.offset.reset": "latest",
> "consumer.xxx"
> "connector.class": "com.laomei.sis.solr.SolrConnector",
> "tasks.max": "1",
> "poll.interval.ms": "100",
> "connect.timeout.ms": "6",
> "topics": "test"
> }
>   }
> {code}
> We can override kafka consumer auto offset reset in sink connector;



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7707) Some code is not necessary

2018-12-11 Thread huangyiming (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718420#comment-16718420
 ] 

huangyiming edited comment on KAFKA-7707 at 12/12/18 3:19 AM:
--

Hi  [~sliebau] ,thank you review, in the lock area,don't have another thread  
add Condition to the waiters. the waiters only  have one condition
{code:java}
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws 
InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);

{code}
and finally we have remove the Condition:
{code:java}
} finally { // When this loop was not able to successfully terminate don't 
loose available memory this.nonPooledAvailableMemory += accumulated; 
this.waiters.remove(moreMemory); }
{code}
so i think  in the last finally wo don't need this.waiters.peekFirst() any more.
{code:java}
} finally { // signal any additional waiters if there is more memory left // 
over for them try { if (!(this.nonPooledAvailableMemory == 0 && 
this.free.isEmpty()) && !this.waiters.isEmpty()) 
this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise 
find bugs complains lock.unlock(); } }
{code}
can modify like this:
{code:java}
} finally {  lock.unlock();  }
{code}


was (Author: huangyimingha...@163.com):
Hi  [~sliebau] ,thank you review, in the lock area,don't have another thread  
add Condition to the waiters. the waiters only  have one condition
{code:java}
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws 
InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);

{code}
and finally we have remove the Condition:
{code:java}
} finally { // When this loop was not able to successfully terminate don't 
loose available memory this.nonPooledAvailableMemory += accumulated; 
this.waiters.remove(moreMemory); }
{code}
so i think  in the last finally:
{code:java}
} finally { // signal any additional waiters if there is more memory left // 
over for them try { if (!(this.nonPooledAvailableMemory == 0 && 
this.free.isEmpty()) && !this.waiters.isEmpty()) 
this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise 
find bugs complains lock.unlock(); } }
{code}
can modify like this:
{code:java}
} finally {  lock.unlock();  }
{code}

> Some code is not necessary
> --
>
> Key: KAFKA-7707
> URL: https://issues.apache.org/jira/browse/KAFKA-7707
> Project: Kafka
>  Issue Type: Improvement
>Reporter: huangyiming
>Priority: Minor
> Attachments: image-2018-12-05-18-01-46-886.png
>
>
> In the trunk branch in 
> [BufferPool.java|https://github.com/apache/kafka/blob/578205cadd0bf64d671c6c162229c4975081a9d6/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java#L174],
>  i think the code can clean,is not necessary,it will never execute
> {code:java}
> if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && 
> 

[jira] [Comment Edited] (KAFKA-7707) Some code is not necessary

2018-12-11 Thread huangyiming (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718420#comment-16718420
 ] 

huangyiming edited comment on KAFKA-7707 at 12/12/18 3:16 AM:
--

Hi  [~sliebau] ,thank you review, in the lock area,don't have another thread  
add Condition to the waiters. the waiters only  have one condition
{code:java}
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws 
InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);

{code}
and finally we have remove the Condition:
{code:java}
} finally { // When this loop was not able to successfully terminate don't 
loose available memory this.nonPooledAvailableMemory += accumulated; 
this.waiters.remove(moreMemory); }
{code}
so i think  in the last finally:
{code:java}
} finally { // signal any additional waiters if there is more memory left // 
over for them try { if (!(this.nonPooledAvailableMemory == 0 && 
this.free.isEmpty()) && !this.waiters.isEmpty()) 
this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise 
find bugs complains lock.unlock(); } }
{code}
can modify like this:
{code:java}
} finally {  lock.unlock();  }
{code}


was (Author: huangyimingha...@163.com):
Hi  [~sliebau] ,thank you review, in the lock area,don't have another thread  
add Condition to the waiters. the waiters only  have one condition
{code:java}
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws 
InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);

{code}
and finally we have remove the Condition:
{code:java}
} finally { // When this loop was not able to successfully terminate don't 
loose available memory this.nonPooledAvailableMemory += accumulated; 
this.waiters.remove(moreMemory); }
{code}
so i think  in the last finally:
{code:java}
} finally { // signal any additional waiters if there is more memory left // 
over for them try { if (!(this.nonPooledAvailableMemory == 0 && 
this.free.isEmpty()) && !this.waiters.isEmpty()) 
this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise 
find bugs complains lock.unlock(); } }
{code}
can modify like this:
{code:java}
} finally {  lock.unlock();  }
{code}

> Some code is not necessary
> --
>
> Key: KAFKA-7707
> URL: https://issues.apache.org/jira/browse/KAFKA-7707
> Project: Kafka
>  Issue Type: Improvement
>Reporter: huangyiming
>Priority: Minor
> Attachments: image-2018-12-05-18-01-46-886.png
>
>
> In the trunk branch in 
> [BufferPool.java|https://github.com/apache/kafka/blob/578205cadd0bf64d671c6c162229c4975081a9d6/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java#L174],
>  i think the code can clean,is not necessary,it will never execute
> {code:java}
> if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && 
> !this.waiters.isEmpty())
> this.waiters.peekFirst().signal();

[jira] [Commented] (KAFKA-7707) Some code is not necessary

2018-12-11 Thread huangyiming (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718420#comment-16718420
 ] 

huangyiming commented on KAFKA-7707:


Hi  [~sliebau] ,thank you review, in the lock area,don't have another thread  
add Condition to the waiters. the waiters only  have one condition
{code:java}
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws 
InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);

{code}
and finally we have remove the Condition:
{code:java}
} finally { // When this loop was not able to successfully terminate don't 
loose available memory this.nonPooledAvailableMemory += accumulated; 
this.waiters.remove(moreMemory); }
{code}
so i think  in the last finally:
{code:java}
} finally { // signal any additional waiters if there is more memory left // 
over for them try { if (!(this.nonPooledAvailableMemory == 0 && 
this.free.isEmpty()) && !this.waiters.isEmpty()) 
this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise 
find bugs complains lock.unlock(); } }
{code}
can modify like this:
{code:java}
} finally {  lock.unlock();  }
{code}

> Some code is not necessary
> --
>
> Key: KAFKA-7707
> URL: https://issues.apache.org/jira/browse/KAFKA-7707
> Project: Kafka
>  Issue Type: Improvement
>Reporter: huangyiming
>Priority: Minor
> Attachments: image-2018-12-05-18-01-46-886.png
>
>
> In the trunk branch in 
> [BufferPool.java|https://github.com/apache/kafka/blob/578205cadd0bf64d671c6c162229c4975081a9d6/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java#L174],
>  i think the code can clean,is not necessary,it will never execute
> {code:java}
> if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && 
> !this.waiters.isEmpty())
> this.waiters.peekFirst().signal();
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7723) Kafka Connect support override worker kafka api configuration with connector configuration that post by rest api

2018-12-11 Thread laomei (JIRA)
laomei created KAFKA-7723:
-

 Summary: Kafka Connect support override worker kafka api 
configuration with connector configuration that post by rest api
 Key: KAFKA-7723
 URL: https://issues.apache.org/jira/browse/KAFKA-7723
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: laomei


I'm using kafka sink connect; "auto.offset.reset" is set in 
connect-distributed*.properties; 
It works for all connector which in one worker; So the consumer will poll 
records from latest or earliest; I can not control the auto.offset.reset in 
connector configs post with rest api;

So I think is necessary to override worker kafka api configs with connector 
configs;  

Like this

{code:java}
  {
"name": "test",
"config": {
"consumer.auto.offset.reset": "latest",
"consumer.xxx"
"connector.class": "com.laomei.sis.solr.SolrConnector",
"tasks.max": "1",
"poll.interval.ms": "100",
"connect.timeout.ms": "6",
"topics": "test"
}
  }
{code}

We can override kafka consumer auto offset reset in sink connector;




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7715) Connect should have a parameter to disable WADL output for OPTIONS method

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718352#comment-16718352
 ] 

ASF GitHub Bot commented on KAFKA-7715:
---

avocader opened a new pull request #6025: KAFKA-7715: Added a configuration 
parameter to Connect which disables WADL output for OPTIONS method.
URL: https://github.com/apache/kafka/pull/6025
 
 
   Currently, Connect REST endpoint replies to `OPTIONS` request with verbose 
WADL information, which could be used for an attack.
   It's not recommended for the production system to expose that information, 
but for the backward-compatibility reasons, it may still be available by 
default, with a possibility to turn it off by setting `rest.wadl.enable=false`.
   
   Added unit tests in `RestServerTest`, which asserts that `Content-type` is 
either `application/vnd.sun.wadl+xml` if `rest.wadl.enable=true` or 
`rest.wadl.enable` is not set;  or `text/plain` otherwise.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connect should have a parameter to disable WADL output for OPTIONS method
> -
>
> Key: KAFKA-7715
> URL: https://issues.apache.org/jira/browse/KAFKA-7715
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, security
>Affects Versions: 2.1.0
>Reporter: Oleksandr Diachenko
>Priority: Critical
> Fix For: 2.1.1
>
>
> Currently, Connect REST API exposes WADL output on OPTIONS method:
> {code:bash}
> curl -i -X OPTIONS http://localhost:8083/connectors
> HTTP/1.1 200 OK
> Date: Fri, 07 Dec 2018 22:51:53 GMT
> Content-Type: application/vnd.sun.wadl+xml
> Allow: HEAD,POST,GET,OPTIONS
> Last-Modified: Fri, 07 Dec 2018 14:51:53 PST
> Content-Length: 1331
> Server: Jetty(9.4.12.v20180830)
> 
> http://wadl.dev.java.net/2009/02;>
> http://jersey.java.net/; jersey:generatedBy="Jersey: 2.27 
> 2018-04-10 07:34:57"/>
> 
> http://localhost:8083/application.wadl/xsd0.xsd;>
> 
> 
> 
> http://localhost:8083/;>
> 
> 
> 
> http://www.w3.org/2001/XMLSchema; name="forward" 
> style="query" type="xs:boolean"/>
> 
> 
> 
> 
> 
> 
> 
> 
> http://www.w3.org/2001/XMLSchema; name="forward" 
> style="query" type="xs:boolean"/>
> 
> 
> 
> 
> 
> 
> 
> 
> {code}
> This can be a potential vulnerability, so it makes sense to have a 
> configuration parameter, which disables WADL output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5061) client.id should be set for Connect producers/consumers

2018-12-11 Thread Paul Davidson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718311#comment-16718311
 ] 

Paul Davidson commented on KAFKA-5061:
--

Thanks for the response [~mageshn] - I will create a KIP.

> client.id should be set for Connect producers/consumers
> ---
>
> Key: KAFKA-5061
> URL: https://issues.apache.org/jira/browse/KAFKA-5061
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: needs-kip, newbie++
>
> In order to properly monitor individual tasks using the producer and consumer 
> metrics, we need to have the framework disambiguate them. Currently when we 
> create producers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L362)
>  and create consumers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L371-L394)
>  the client ID is not being set. You can override it for the entire worker 
> via worker-level producer/consumer overrides, but you can't get per-task 
> metrics.
> There are a couple of things we might want to consider doing here:
> 1. Provide default client IDs based on the worker group ID + task ID 
> (providing uniqueness for multiple connect clusters up to the scope of the 
> Kafka cluster they are operating on). This seems ideal since it's a good 
> default; however it is a public-facing change and may need a KIP. Normally I 
> would be less worried about this, but some folks may be relying on picking up 
> metrics without this being set, in which case such a change would break their 
> monitoring.
> 2. Allow overriding client.id on a per-connector basis. I'm not sure if this 
> will really be useful or not -- it lets you differentiate between metrics for 
> different connectors' tasks, but within a connector, all metrics would go to 
> a single client.id. On the other hand, this makes the tasks act as a single 
> group from the perspective of broker handling of client IDs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5061) client.id should be set for Connect producers/consumers

2018-12-11 Thread Magesh kumar Nandakumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718276#comment-16718276
 ] 

Magesh kumar Nandakumar commented on KAFKA-5061:


[~pdavidson] I had a quick look at the PR and it appears that we are 
introducing a new configuration to handle this. We would probably require a 
small KIP since its a public facing change. 

> client.id should be set for Connect producers/consumers
> ---
>
> Key: KAFKA-5061
> URL: https://issues.apache.org/jira/browse/KAFKA-5061
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: needs-kip, newbie++
>
> In order to properly monitor individual tasks using the producer and consumer 
> metrics, we need to have the framework disambiguate them. Currently when we 
> create producers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L362)
>  and create consumers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L371-L394)
>  the client ID is not being set. You can override it for the entire worker 
> via worker-level producer/consumer overrides, but you can't get per-task 
> metrics.
> There are a couple of things we might want to consider doing here:
> 1. Provide default client IDs based on the worker group ID + task ID 
> (providing uniqueness for multiple connect clusters up to the scope of the 
> Kafka cluster they are operating on). This seems ideal since it's a good 
> default; however it is a public-facing change and may need a KIP. Normally I 
> would be less worried about this, but some folks may be relying on picking up 
> metrics without this being set, in which case such a change would break their 
> monitoring.
> 2. Allow overriding client.id on a per-connector basis. I'm not sure if this 
> will really be useful or not -- it lets you differentiate between metrics for 
> different connectors' tasks, but within a connector, all metrics would go to 
> a single client.id. On the other hand, this makes the tasks act as a single 
> group from the perspective of broker handling of client IDs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7681) new metric for request thread utilization by request type

2018-12-11 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718138#comment-16718138
 ] 

Mayuresh Gharat commented on KAFKA-7681:


Hi [~junrao],

#  IIUC, After inspecting the code and your suggestion above, this seems doable 
by adding a new metric type like "localTimeRate" or 
"RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() 
like we have for {code:java} totalProduceRequestRate {code}.
This will give us the rate of local time for each request which ~ usage of the 
RequestHandlerThreadPool.

# I was thinking more on the lines of having a ratio, that would give us 
instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", 
but for each request type wherein the 
{code:java}
value = (Total sampled Local Time of A Request) / (Total of sampled local times 
of all the requests) 
{code}






> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7681) new metric for request thread utilization by request type

2018-12-11 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718138#comment-16718138
 ] 

Mayuresh Gharat edited comment on KAFKA-7681 at 12/11/18 10:57 PM:
---

Hi [~junrao],

1)
IIUC, After inspecting the code and your suggestion above, this seems doable by 
adding a new metric type like "localTimeRate" or 
"RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() 
like we have for {code:java} totalProduceRequestRate {code}
This will give us the rate of local time for each request which ~ usage of the 
RequestHandlerThreadPool.

2)
I was thinking more on the lines of having a ratio, that would give us 
instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", 
but for each request type wherein the 
{code:java}
value = (Total sampled Local Time of A Request) / (Total of sampled local times 
of all the requests) 
{code}

I can start putting up a KIP, if we think that 1) would suffice.




was (Author: mgharat):
Hi [~junrao],

1)
IIUC, After inspecting the code and your suggestion above, this seems doable by 
adding a new metric type like "localTimeRate" or 
"RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() 
like we have for {code:java} totalProduceRequestRate {code}.
This will give us the rate of local time for each request which ~ usage of the 
RequestHandlerThreadPool.

2)
I was thinking more on the lines of having a ratio, that would give us 
instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", 
but for each request type wherein the 
{code:java}
value = (Total sampled Local Time of A Request) / (Total of sampled local times 
of all the requests) 
{code}

I can start putting up a KIP, if we think that 1) would suffice.



> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7681) new metric for request thread utilization by request type

2018-12-11 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718138#comment-16718138
 ] 

Mayuresh Gharat edited comment on KAFKA-7681 at 12/11/18 10:56 PM:
---

Hi [~junrao],

1)
IIUC, After inspecting the code and your suggestion above, this seems doable by 
adding a new metric type like "localTimeRate" or 
"RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() 
like we have for {code:java} totalProduceRequestRate {code}.
This will give us the rate of local time for each request which ~ usage of the 
RequestHandlerThreadPool.

2)
I was thinking more on the lines of having a ratio, that would give us 
instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", 
but for each request type wherein the 
{code:java}
value = (Total sampled Local Time of A Request) / (Total of sampled local times 
of all the requests) 
{code}

I can start putting up a KIP, if we think that 1) would suffice.




was (Author: mgharat):
Hi [~junrao],

#  IIUC, After inspecting the code and your suggestion above, this seems doable 
by adding a new metric type like "localTimeRate" or 
"RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() 
like we have for {code:java} totalProduceRequestRate {code}.
This will give us the rate of local time for each request which ~ usage of the 
RequestHandlerThreadPool.

# I was thinking more on the lines of having a ratio, that would give us 
instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", 
but for each request type wherein the 
{code:java}
value = (Total sampled Local Time of A Request) / (Total of sampled local times 
of all the requests) 
{code}






> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7722) Increase ProducerPerformance precision by using nanoTime

2018-12-11 Thread Kevin Lu (JIRA)
Kevin Lu created KAFKA-7722:
---

 Summary: Increase ProducerPerformance precision by using nanoTime
 Key: KAFKA-7722
 URL: https://issues.apache.org/jira/browse/KAFKA-7722
 Project: Kafka
  Issue Type: New Feature
  Components: tools
Reporter: Kevin Lu
Assignee: Kevin Lu


https://cwiki.apache.org/confluence/display/KAFKA/KIP-403%3A+Increase+ProducerPerformance+precision+by+using+nanoTime



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6706) NETWORK_EXCEPTION and REQUEST_TIMED_OUT in mirrormaker producer after 1.0 broker upgrade

2018-12-11 Thread David van Geest (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718052#comment-16718052
 ] 

David van Geest commented on KAFKA-6706:


FWIW upgrading to Kafka 1.1.1 solved these problems for us.

> NETWORK_EXCEPTION and REQUEST_TIMED_OUT in mirrormaker producer after 1.0 
> broker upgrade
> 
>
> Key: KAFKA-6706
> URL: https://issues.apache.org/jira/browse/KAFKA-6706
> Project: Kafka
>  Issue Type: Bug
>  Components: core, network
>Affects Versions: 1.0.0
>Reporter: Di Shang
>Priority: Blocker
>  Labels: mirror-maker
>
> We have 2 clusters A and B with 4 brokers each, we use mirrormaker to 
> replicate topics from A to B. 
>  We recently upgraded our brokers from 0.10.2.0 to 1.0.0, after the upgrade 
> we started seeing the mirrormaker task showing producer errors and 
> intermittently dying. 
>  We tried using 1.0.0 and 0.10.2.0 mirrormaker, both have the same problem. 
> Downgrading cluster B brokers back to 0.10.2.0 and the problem went away, so 
> we think it's a server side problem.
> There are 2 types of errors: REQUEST_TIMED_OUT and NETWORK_EXCEPTION. For 
> testing, I used a topic *logging* with 20 partitions and 3 replicas (same on 
> cluster A and B), the source topic has 50+ million msg.
> (this is from mirrormaker 1.0 at info level, the 0.10.2.0 log is very similar)
> {noformat}
> 22 Mar 2018 02:16:07.407 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 35122 on 
> topic-partition logging-7, retrying (2147483646 attempts left). Error: 
> REQUEST_TIMED_OUT
>  22 Mar 2018 02:17:49.731 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 51572 on 
> topic-partition logging-7, retrying (2147483646 attempts left). Error: 
> REQUEST_TIMED_OUT
>  22 Mar 2018 02:18:33.903 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 57785 on 
> topic-partition logging-5, retrying (2147483646 attempts left). Error: 
> REQUEST_TIMED_OUT
>  22 Mar 2018 02:21:21.399 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 85406 on 
> topic-partition logging-18, retrying (2147483646 attempts left). Error: 
> REQUEST_TIMED_OUT
>  22 Mar 2018 02:25:22.278 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 128047 on 
> topic-partition logging-5, retrying (2147483646 attempts left). Error: 
> REQUEST_TIMED_OUT
>  22 Mar 2018 02:26:17.154 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 137049 on 
> topic-partition logging-18, retrying (2147483646 attempts left). Error: 
> REQUEST_TIMED_OUT
>  22 Mar 2018 02:27:57.358 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 153976 on 
> topic-partition logging-5, retrying (2147483646 attempts left). Error: 
> REQUEST_TIMED_OUT
>  22 Mar 2018 02:27:57.779 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 154077 on 
> topic-partition logging-2, retrying (2147483646 attempts left). Error: 
> NETWORK_EXCEPTION
>  22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 154077 on 
> topic-partition logging-10, retrying (2147483646 attempts left). Error: 
> NETWORK_EXCEPTION
>  22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 154077 on 
> topic-partition logging-18, retrying (2147483646 attempts left). Error: 
> NETWORK_EXCEPTION
>  22 Mar 2018 02:27:57.781 [kafka-producer-network-thread | 

[jira] [Commented] (KAFKA-7640) Kafka stream interactive query not returning data when state is backed by rocksdb

2018-12-11 Thread hitesh gollahalli bachanna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718018#comment-16718018
 ] 

hitesh gollahalli bachanna commented on KAFKA-7640:
---

Hello [~vvcephei]

The rest api return 200 status code, but no data

1. Client is on 2.0.0 version but the server is 1.1.1
2. We use Virtual IP (to load balance ) as the kafka broker address in client 
code.
3. I have 36 partitions in the topic, for which there are 36 consumer on 
different machines.
4. I have about 360 million messages on the topic.

I see rebalance happening, because i see a bunch of `revoked at the beginning 
of consumer rebalance`.
Things I see in the when DEBUG is enabled.
Using older server API v7 to send FETCHUsing older server API v5 to send 
METADATA \{topics=[],allow_auto_topic_creation=false}
Using older server API v1 to send HEARTBEAT


Is there anything in particular I need to look for in the logs.

> Kafka stream interactive query not returning data when state is backed by 
> rocksdb
> -
>
> Key: KAFKA-7640
> URL: https://issues.apache.org/jira/browse/KAFKA-7640
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: hitesh gollahalli bachanna
>Priority: Major
>
> I have a kafka stream app running with 36 different instance (one for each 
> partition). Each instance come up one after the other. And I am building rest 
> service on top of the state to access the data.
> Here some code that I use:
> {code:java}
> StreamsMetadata metadata = streams.metadataForKey(store, key, serializer); 
> --> call this find ouy which host has the key
> if (localSelf.host().equals(hostStoreInfo.getHost())) {
> get the key from local store
> }
> else {
> call the remote host using restTemplate
> }{code}
> The problem now is `metadata` object returned has a different host/ip but the 
> data is on a different node. I was able to see using some application logs I 
> printed. This happens every time I start my application.
> The `allMetadata` method in `KafkaStreams` class says the value will be 
> update as when the partition get reassigned. But its not happening in this 
> case. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6706) NETWORK_EXCEPTION and REQUEST_TIMED_OUT in mirrormaker producer after 1.0 broker upgrade

2018-12-11 Thread Joseph Niemiec (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718049#comment-16718049
 ] 

Joseph Niemiec commented on KAFKA-6706:
---

We have all the same error messages occurring in our production clusters. It 
started after we upgraded from 0.8 to 1.0.0. 

 
{code:java}
// 2018-12-11 16:40:43 DEBUG NetworkClient:183 - [Producer 
clientId=KafkaExampleProducer] Disconnecting from node 1 due to request timeout.
2018-12-11 16:40:43 WARN Sender:251 - [Producer clientId=KafkaExampleProducer] 
Got error produce response with correlation id 21193 on topic-partition 
debug_dev_r2k-3, retrying (4 attempts left). Error: REQUEST_TIMED_OUT
2018-12-11 16:40:43 WARN Sender:251 - [Producer clientId=KafkaExampleProducer] 
Got error produce response with correlation id 21203 on topic-partition 
debug_dev_r2k-0, retrying (4 attempts left). Error: NETWORK_EXCEPTION
2018-12-11 16:40:43 WARN Sender:251 - [Producer clientId=KafkaExampleProducer] 
Got error produce response with correlation id 21203 on topic-partition 
debug_dev_r2k-3, retrying (4 attempts left). Error: NETWORK_EXCEPTION
2018-12-11 16:40:43 WARN Sender:251 - [Producer clientId=KafkaExampleProducer] 
Got error produce response with correlation id 21203 on topic-partition 
debug_dev_r2k-6, retrying (4 attempts left). Error: NETWORK_EXCEPTION

{code}

> NETWORK_EXCEPTION and REQUEST_TIMED_OUT in mirrormaker producer after 1.0 
> broker upgrade
> 
>
> Key: KAFKA-6706
> URL: https://issues.apache.org/jira/browse/KAFKA-6706
> Project: Kafka
>  Issue Type: Bug
>  Components: core, network
>Affects Versions: 1.0.0
>Reporter: Di Shang
>Priority: Blocker
>  Labels: mirror-maker
>
> We have 2 clusters A and B with 4 brokers each, we use mirrormaker to 
> replicate topics from A to B. 
>  We recently upgraded our brokers from 0.10.2.0 to 1.0.0, after the upgrade 
> we started seeing the mirrormaker task showing producer errors and 
> intermittently dying. 
>  We tried using 1.0.0 and 0.10.2.0 mirrormaker, both have the same problem. 
> Downgrading cluster B brokers back to 0.10.2.0 and the problem went away, so 
> we think it's a server side problem.
> There are 2 types of errors: REQUEST_TIMED_OUT and NETWORK_EXCEPTION. For 
> testing, I used a topic *logging* with 20 partitions and 3 replicas (same on 
> cluster A and B), the source topic has 50+ million msg.
> (this is from mirrormaker 1.0 at info level, the 0.10.2.0 log is very similar)
> {noformat}
> 22 Mar 2018 02:16:07.407 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 35122 on 
> topic-partition logging-7, retrying (2147483646 attempts left). Error: 
> REQUEST_TIMED_OUT
>  22 Mar 2018 02:17:49.731 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 51572 on 
> topic-partition logging-7, retrying (2147483646 attempts left). Error: 
> REQUEST_TIMED_OUT
>  22 Mar 2018 02:18:33.903 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 57785 on 
> topic-partition logging-5, retrying (2147483646 attempts left). Error: 
> REQUEST_TIMED_OUT
>  22 Mar 2018 02:21:21.399 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 85406 on 
> topic-partition logging-18, retrying (2147483646 attempts left). Error: 
> REQUEST_TIMED_OUT
>  22 Mar 2018 02:25:22.278 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 128047 on 
> topic-partition logging-5, retrying (2147483646 attempts left). Error: 
> REQUEST_TIMED_OUT
>  22 Mar 2018 02:26:17.154 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 137049 on 
> topic-partition logging-18, retrying (2147483646 attempts left). Error: 
> REQUEST_TIMED_OUT
>  22 Mar 2018 02:27:57.358 [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
> clientId=producer-1] Got error produce response with correlation id 153976 on 
> topic-partition logging-5, retrying 

[jira] [Commented] (KAFKA-7713) producer io-wait-ratio > 1

2018-12-11 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718002#comment-16718002
 ] 

Jun Rao commented on KAFKA-7713:


io-wait-ratio is calculated as the ratio of accumulated selector waiting time 
over the length of the metric window. So as the metric window rolls, those 
accumulated waiting time falling into the rolled window will cause the metric 
to go above 1. Not sure what's the best way to address this. One way is to 
treat the Rate/Meter metric special. When we record the value, we split the 
value into the corresponding sample windows and bound the value by the window 
size.

> producer io-wait-ratio > 1
> --
>
> Key: KAFKA-7713
> URL: https://issues.apache.org/jira/browse/KAFKA-7713
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: dan norwood
>Priority: Major
>
> i am running a test on a streams application and gathering jmx measurements 
> to determine what is causing some lag. using `kafka.tools.JmxTool` i was 
> gathering the following attributes `'io-ratio', 'io-wait-ratio', 
> 'select-rate', 'batch-size-avg', 'compression-rate-avg', 'record-size-avg', 
> 'records-per-request-avg'` on my streams instances producers. i noticed that 
> i was getting `io-wait-ratio > 1`, but according to docs it is "The fraction 
> of time the I/O thread spent waiting." 
>  
> some example lines from jmxtool
> |StreamThread-8-producer:batch-size-avg|StreamThread-8-producer:compression-rate-avg|StreamThread-8-producer:io-ratio|*StreamThread-8-producer:io-wait-ratio*|StreamThread-8-producer:record-size-avg|StreamThread-8-producer:records-per-request-avg|StreamThread-8-producer:select-rate|
> |662.2613636|0.3425814926|1.01E-04|*1.178371974*|172.2045455|38.7167|3.855527588|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7331) Kafka does not detect broker loss in the event of a network partition within the cluster

2018-12-11 Thread Kevin Li (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717919#comment-16717919
 ] 

Kevin Li commented on KAFKA-7331:
-

feel free to close this

> Kafka does not detect broker loss in the event of a network partition within 
> the cluster
> 
>
> Key: KAFKA-7331
> URL: https://issues.apache.org/jira/browse/KAFKA-7331
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, network
>Affects Versions: 1.0.1
>Reporter: Kevin Li
>Priority: Major
>
> We ran into this issue on our production cluster and had to manually remove 
> the broker and enable unclean leader elections to get the cluster working 
> again. Ideally, Kafka itself could handle network partitions without manual 
> intervention.
> The issue is reproducible with the following cross datacenter Kafka cluster 
> setup:
> DC 1: Kafka brokers + ZK nodes
> DC 2: Kafka brokers + ZK nodes
> DC 3: Kafka brokers + ZK nodes
> Introduce a network partition on a Kafka broker (brokerA) in DC 1 where it 
> cannot reach any hosts (brokers and ZK nodes) in the other 2 datacenters. The 
> cluster goes into a state where partitions that brokerA is a leader for will 
> only contain brokerA in its ISR. Since brokerA is still reachable by ZK nodes 
> in DC 1, it still shows up when querying ZK. The controller thinks brokerA is 
> still up and does not elect new leaders for partitions that brokerA is a 
> leader for. This causes all those partitions to be down until brokerA is back 
> or completely removed from the cluster (in which case unclean leader election 
> can elect new leaders for those partitions).
> A faster recovery scenario could be for a majority of hosts (zk nodes?) to 
> realize that brokerA is unreachable, and mark it as down so elections for 
> partitions it is a leader for could be triggered. This avoids waiting 
> indefinitely for the broker to come back or taking action to remove the 
> broker from the cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7695) Cannot override StreamsPartitionAssignor in configuration

2018-12-11 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717882#comment-16717882
 ] 

John Roesler commented on KAFKA-7695:
-

Hi [~zirx],

Just checking in...

It sounds from the discussion above that this ticket, should probably be closed:

Either 2.1 will work for you (with or without the TimestampExtractor trick 
above), or you could open a new Jira specifically for the behavior you desire.

Do you agree? I'm just trying to close the loop, as it seems the conversation 
has petered out by now.

Thanks,

-John

> Cannot override StreamsPartitionAssignor in configuration 
> --
>
> Key: KAFKA-7695
> URL: https://issues.apache.org/jira/browse/KAFKA-7695
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Dmitry Buykin
>Priority: Major
>  Labels: configuration
>
> Cannot override StreamsPartitionAssignor by changing property 
> partition.assignment.strategy in KStreams 2.0.1 because the streams are 
> crashing inside KafkaClientSupplier.getGlobalConsumer. This GlobalConsumer 
> works only with RangeAssignor which configured by default.
> Could be reproduced by setting up
> `props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> StreamsPartitionAssignor.class.getName());`
> For me it looks like a bug.
> Opened a discussion here 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1543395977453700
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7223) KIP-328: Add in-memory Suppression

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717599#comment-16717599
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---

vvcephei opened a new pull request #6024: KAFKA-7223: document suppression 
buffer metrics
URL: https://github.com/apache/kafka/pull/6024
 
 
   Document the new metrics added in #5795 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-328: Add in-memory Suppression
> --
>
> Key: KAFKA-7223
> URL: https://issues.apache.org/jira/browse/KAFKA-7223
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.]
>  
> This ticket is to implement Suppress, but only for in-memory buffers.
> (depends on KAFKA-7222)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7712) Handle exceptions from immediately connected channels in Selector

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717411#comment-16717411
 ] 

ASF GitHub Bot commented on KAFKA-7712:
---

rajinisivaram opened a new pull request #6023: KAFKA-7712: Remove channel from 
Selector before propagating exception
URL: https://github.com/apache/kafka/pull/6023
 
 
   Ensure that channel and selection keys are removed from `Selector` 
collections before propagating connect exceptions. They are currently cleared 
on the next `poll()`, but we can't ensure that callers (NetworkClient for 
example) wont try to connect again before the next `poll` and hence we should 
clear the collections before re-throwing exceptions from `connect()`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle exceptions from immediately connected channels in Selector
> -
>
> Key: KAFKA-7712
> URL: https://issues.apache.org/jira/browse/KAFKA-7712
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.2.0
>
>
> We try to handle all possible exceptions in Selector to ensure that channels 
> are always closed and their states kept consistent. For immediately connected 
> channels, we should ensure that any exception during connection results in 
> the channel being closed properly and removed from all maps. This is a very 
> unlikely scenario, but we do already handle the exception. We should clean up 
> properly in the catch block.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7680) fetching a refilled chunk of log can cause log divergence

2018-12-11 Thread Nikolay Izhikov (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikolay Izhikov reassigned KAFKA-7680:
--

Assignee: Nikolay Izhikov

> fetching a refilled chunk of log can cause log divergence
> -
>
> Key: KAFKA-7680
> URL: https://issues.apache.org/jira/browse/KAFKA-7680
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Nikolay Izhikov
>Priority: Major
>
> We use FileRecords.writeTo to send a fetch response for a follower. A log 
> could be truncated and refilled in the middle of the send process (due to 
> leader change). Then it's possible for the follower to append some 
> uncommitted messages followed by committed messages. Those uncommitted 
> messages may never be removed, causing log divergence.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7086) Kafka server process dies after try deleting old log files under Windows 10

2018-12-11 Thread Hans Schuell (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717293#comment-16717293
 ] 

Hans Schuell edited comment on KAFKA-7086 at 12/11/18 2:47 PM:
---

I have a similar issue - see KAFKA-6188 - last commnt

The english translation of the system error above is: *{color:#33}the 
process cannot access the file because it is being used by another 
process{color}.*

As mention there, the log files cannot be deleted, because of the different 
behaviour of the Windows OS file system. Under Windows a file cannot be moved 
(in the case above) or deleted, when there is still an open file handle of the 
running current process or of another process.

This makes a Windows installation of Kafka 2.0.0 currently unusable, because 
sooner or later this issue happens in a running or starting broker.


was (Author: gira1):
I have a similar issue - see KAFKA-6188 Comment

The english translation of the system error above is: *{color:#33}the 
process cannot access the file because it is being used by another 
process{color}.*

As mention there, the log files cannot be deleted, because of the different 
behaviour of the Windows OS file system. Under Windows a file cannot be moved 
(in the case above) or deleted, when there is still an open file handle of the 
running current process or of another process.

This makes a Windows installation of Kafka 2.0.0 currently unusable, because 
sooner or later this issue happens in a running or starting broker.

> Kafka server process dies after try deleting old log files under Windows 10
> ---
>
> Key: KAFKA-7086
> URL: https://issues.apache.org/jira/browse/KAFKA-7086
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
> Environment: Windows 10, Windows Server 2012 R2
>Reporter: Cezary Wagner
>Priority: Major
>  Labels: windows
>
> Kafka after achieving log.retention.hours dies every time with error.
> {noformat}
> # Log Retention Policy 
> #
> # The following configurations control the disposal of log segments. The 
> policy can
> # be set to delete segments after a period of time, or after a given size has 
> accumulated.
> # A segment will be deleted whenever *either* of these criteria are met. 
> Deletion always happens
> # from the end of the log.
> # The minimum age of a log file to be eligible for deletion due to age
> log.retention.hours=168
> # A size-based retention policy for logs. Segments are pruned from the log 
> unless the remaining
> # segments drop below log.retention.bytes. Functions independently of 
> log.retention.hours.
> #log.retention.bytes=1073741824
> # The maximum size of a log segment file. When this size is reached a new log 
> segment will be created.
> log.segment.bytes=1073741824
> # The interval at which log segments are checked to see if they can be 
> deleted according
> # to the retention policies
> log.retention.check.interval.ms=30{noformat}
> Exception raised:
> {noformat}
> > C:\root\kafka_2.12-1.1.0\data\__consumer_offsets-3\.log.swap:
> >  Proces nie mo┐e uzyskaŠ dostŕpu do pliku, poniewa┐ jest on u┐ywany przez 
> > inny proces.
>     at sun.nio.fs.WindowsException.translateToIOException(Unknown Source)
>     at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
>     at sun.nio.fs.WindowsFileCopy.move(Unknown Source)
>     at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source)
>     at java.nio.file.Files.move(Unknown Source)
>     at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
>     at 
> org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
>     at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
>     at kafka.log.Log.replaceSegments(Log.scala:1644)
>     at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535)
>     at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462)
>     at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461)
>     at scala.collection.immutable.List.foreach(List.scala:389)
>     at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>     at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>     at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>     at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>     Suppressed: java.nio.file.FileSystemException: 
> C:\root\kafka_2.12-1.1.0\data\__consumer_offsets-3\.log.cleaned
>  -> 
> 

[jira] [Comment Edited] (KAFKA-7086) Kafka server process dies after try deleting old log files under Windows 10

2018-12-11 Thread Hans Schuell (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717293#comment-16717293
 ] 

Hans Schuell edited comment on KAFKA-7086 at 12/11/18 2:47 PM:
---

I have a similar issue - see KAFKA-6188 - last comment

The english translation of the system error above is: *{color:#33}the 
process cannot access the file because it is being used by another 
process{color}.*

As mention there, the log files cannot be deleted, because of the different 
behaviour of the Windows OS file system. Under Windows a file cannot be moved 
(in the case above) or deleted, when there is still an open file handle of the 
running current process or of another process.

This makes a Windows installation of Kafka 2.0.0 currently unusable, because 
sooner or later this issue happens in a running or starting broker.


was (Author: gira1):
I have a similar issue - see KAFKA-6188 - last commnt

The english translation of the system error above is: *{color:#33}the 
process cannot access the file because it is being used by another 
process{color}.*

As mention there, the log files cannot be deleted, because of the different 
behaviour of the Windows OS file system. Under Windows a file cannot be moved 
(in the case above) or deleted, when there is still an open file handle of the 
running current process or of another process.

This makes a Windows installation of Kafka 2.0.0 currently unusable, because 
sooner or later this issue happens in a running or starting broker.

> Kafka server process dies after try deleting old log files under Windows 10
> ---
>
> Key: KAFKA-7086
> URL: https://issues.apache.org/jira/browse/KAFKA-7086
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
> Environment: Windows 10, Windows Server 2012 R2
>Reporter: Cezary Wagner
>Priority: Major
>  Labels: windows
>
> Kafka after achieving log.retention.hours dies every time with error.
> {noformat}
> # Log Retention Policy 
> #
> # The following configurations control the disposal of log segments. The 
> policy can
> # be set to delete segments after a period of time, or after a given size has 
> accumulated.
> # A segment will be deleted whenever *either* of these criteria are met. 
> Deletion always happens
> # from the end of the log.
> # The minimum age of a log file to be eligible for deletion due to age
> log.retention.hours=168
> # A size-based retention policy for logs. Segments are pruned from the log 
> unless the remaining
> # segments drop below log.retention.bytes. Functions independently of 
> log.retention.hours.
> #log.retention.bytes=1073741824
> # The maximum size of a log segment file. When this size is reached a new log 
> segment will be created.
> log.segment.bytes=1073741824
> # The interval at which log segments are checked to see if they can be 
> deleted according
> # to the retention policies
> log.retention.check.interval.ms=30{noformat}
> Exception raised:
> {noformat}
> > C:\root\kafka_2.12-1.1.0\data\__consumer_offsets-3\.log.swap:
> >  Proces nie mo┐e uzyskaŠ dostŕpu do pliku, poniewa┐ jest on u┐ywany przez 
> > inny proces.
>     at sun.nio.fs.WindowsException.translateToIOException(Unknown Source)
>     at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
>     at sun.nio.fs.WindowsFileCopy.move(Unknown Source)
>     at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source)
>     at java.nio.file.Files.move(Unknown Source)
>     at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
>     at 
> org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
>     at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
>     at kafka.log.Log.replaceSegments(Log.scala:1644)
>     at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535)
>     at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462)
>     at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461)
>     at scala.collection.immutable.List.foreach(List.scala:389)
>     at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>     at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>     at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>     at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>     Suppressed: java.nio.file.FileSystemException: 
> C:\root\kafka_2.12-1.1.0\data\__consumer_offsets-3\.log.cleaned
>  -> 
> 

[jira] [Commented] (KAFKA-7086) Kafka server process dies after try deleting old log files under Windows 10

2018-12-11 Thread Hans Schuell (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717293#comment-16717293
 ] 

Hans Schuell commented on KAFKA-7086:
-

I have a similar issue - see KAFKA-6188 Comment

The english translation of the system error above is: *{color:#33}the 
process cannot access the file because it is being used by another 
process{color}.*

As mention there, the log files cannot be deleted, because of the different 
behaviour of the Windows OS file system. Under Windows a file cannot be moved 
(in the case above) or deleted, when there is still an open file handle of the 
running current process or of another process.

This makes a Windows installation of Kafka 2.0.0 currently unusable, because 
sooner or later this issue happens in a running or starting broker.

> Kafka server process dies after try deleting old log files under Windows 10
> ---
>
> Key: KAFKA-7086
> URL: https://issues.apache.org/jira/browse/KAFKA-7086
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
> Environment: Windows 10, Windows Server 2012 R2
>Reporter: Cezary Wagner
>Priority: Major
>  Labels: windows
>
> Kafka after achieving log.retention.hours dies every time with error.
> {noformat}
> # Log Retention Policy 
> #
> # The following configurations control the disposal of log segments. The 
> policy can
> # be set to delete segments after a period of time, or after a given size has 
> accumulated.
> # A segment will be deleted whenever *either* of these criteria are met. 
> Deletion always happens
> # from the end of the log.
> # The minimum age of a log file to be eligible for deletion due to age
> log.retention.hours=168
> # A size-based retention policy for logs. Segments are pruned from the log 
> unless the remaining
> # segments drop below log.retention.bytes. Functions independently of 
> log.retention.hours.
> #log.retention.bytes=1073741824
> # The maximum size of a log segment file. When this size is reached a new log 
> segment will be created.
> log.segment.bytes=1073741824
> # The interval at which log segments are checked to see if they can be 
> deleted according
> # to the retention policies
> log.retention.check.interval.ms=30{noformat}
> Exception raised:
> {noformat}
> > C:\root\kafka_2.12-1.1.0\data\__consumer_offsets-3\.log.swap:
> >  Proces nie mo┐e uzyskaŠ dostŕpu do pliku, poniewa┐ jest on u┐ywany przez 
> > inny proces.
>     at sun.nio.fs.WindowsException.translateToIOException(Unknown Source)
>     at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
>     at sun.nio.fs.WindowsFileCopy.move(Unknown Source)
>     at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source)
>     at java.nio.file.Files.move(Unknown Source)
>     at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
>     at 
> org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
>     at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
>     at kafka.log.Log.replaceSegments(Log.scala:1644)
>     at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535)
>     at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462)
>     at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461)
>     at scala.collection.immutable.List.foreach(List.scala:389)
>     at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>     at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>     at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>     at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>     Suppressed: java.nio.file.FileSystemException: 
> C:\root\kafka_2.12-1.1.0\data\__consumer_offsets-3\.log.cleaned
>  -> 
> C:\root\kafka_2.12-1.1.0\data\__consumer_offsets-3\.log.swap:
>  Proces nie mo┐e uzyskaŠ dostŕpu do pliku, poniewa┐ jest on u┐ywany przez 
> inny proces.
>     at sun.nio.fs.WindowsException.translateToIOException(Unknown 
> Source)
>     at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown 
> Source)
>     at sun.nio.fs.WindowsFileCopy.move(Unknown Source)
>     at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source)
>     at java.nio.file.Files.move(Unknown Source)
>     at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)
>     ... 12 more
> [2018-06-21 13:06:34,196] INFO [ReplicaManager broker=0] Stopping serving 
> replicas 

[jira] [Updated] (KAFKA-7721) Connection to zookeeper refused

2018-12-11 Thread Mohammad Etemad (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mohammad Etemad updated KAFKA-7721:
---
Component/s: (was: KafkaConnect)

> Connection to zookeeper refused
> ---
>
> Key: KAFKA-7721
> URL: https://issues.apache.org/jira/browse/KAFKA-7721
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
> Environment: Dockerized containers - kubernetes 1.9
>Reporter: Mohammad Etemad
>Priority: Major
>
> Kafka throws exception when trying to connect to zookeeper. This happens when 
> zookeeper connection is lost and recovered. Kafka seems to be stuck in a loop 
> that cannot renew the connection. Here are the logs:
> 2018-12-11 13:52:52,905] INFO Opening socket connection to server 
> zookeeper-0.zookeeper.logging.svc.cluster.local/10.38.128.12:2181. Will not 
> attempt to authenticate using SASL (unknown error) 
> (org.apache.zookeeper.ClientCnxn)
> [2018-12-11 13:52:52,906] WARN Session 0x1001443ad77000f for server null, 
> unexpected error, closing socket connection and attempting reconnect 
> (org.apache.zookeeper.ClientCnxn)
> java.net.ConnectException: Connection refused
>  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>  at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>  at 
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
>  at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> On the zookeeper side it can be seen that kafka connection is established. 
> Here are the logs:
> 2018-12-11 13:53:44,969 [myid:] - INFO 
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - 
> Accepted socket connection from /10.38.128.8:46066
> 2018-12-11 13:53:44,976 [myid:] - INFO 
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@938] - Client 
> attempting to establish new session at /10.38.128.8:46066
> 2018-12-11 13:53:45,005 [myid:] - INFO [SyncThread:0:ZooKeeperServer@683] - 
> Established session 0x10060ff12a58dc0 with negotiated timeout 3 for 
> client /10.38.128.8:46066
> 2018-12-11 13:53:45,071 [myid:] - INFO [ProcessThread(sid:0 
> cport:2181)::PrepRequestProcessor@487] - Processed session termination for 
> sessionid: 0x10060ff12a58dc0
> 2018-12-11 13:53:45,077 [myid:] - INFO 
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed 
> socket connection for client /10.38.128.8:46066 which had sessionid 
> 0x10060ff12a58dc0
> 2018-12-11 13:53:47,119 [myid:] - INFO 
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - 
> Accepted socket connection from /10.36.0.8:48798
> 2018-12-11 13:53:47,124 [myid:] - INFO 
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@938] - Client 
> attempting to establish new session at /10.36.0.8:48798
> 2018-12-11 13:53:47,134 [myid:] - INFO [SyncThread:0:ZooKeeperServer@683] - 
> Established session 0x10060ff12a58dc1 with negotiated timeout 3 for 
> client /10.36.0.8:48798
> 2018-12-11 13:53:47,582 [myid:] - INFO [ProcessThread(sid:0 
> cport:2181)::PrepRequestProcessor@487] - Processed session termination for 
> sessionid: 0x10060ff12a58dc1
> 2018-12-11 13:53:47,592 [myid:] - INFO 
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed 
> socket connection for client /10.36.0.8:48798 which had sessionid 
> 0x10060ff12a58dc1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7721) Connection to zookeeper refused

2018-12-11 Thread Mohammad Etemad (JIRA)
Mohammad Etemad created KAFKA-7721:
--

 Summary: Connection to zookeeper refused
 Key: KAFKA-7721
 URL: https://issues.apache.org/jira/browse/KAFKA-7721
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.1.0
 Environment: Dockerized containers - kubernetes 1.9
Reporter: Mohammad Etemad


Kafka throws exception when trying to connect to zookeeper. This happens when 
zookeeper connection is lost and recovered. Kafka seems to be stuck in a loop 
that cannot renew the connection. Here are the logs:

2018-12-11 13:52:52,905] INFO Opening socket connection to server 
zookeeper-0.zookeeper.logging.svc.cluster.local/10.38.128.12:2181. Will not 
attempt to authenticate using SASL (unknown error) 
(org.apache.zookeeper.ClientCnxn)
[2018-12-11 13:52:52,906] WARN Session 0x1001443ad77000f for server null, 
unexpected error, closing socket connection and attempting reconnect 
(org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
 at 
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
 at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

On the zookeeper side it can be seen that kafka connection is established. Here 
are the logs:

2018-12-11 13:53:44,969 [myid:] - INFO 
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted 
socket connection from /10.38.128.8:46066
2018-12-11 13:53:44,976 [myid:] - INFO 
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@938] - Client 
attempting to establish new session at /10.38.128.8:46066
2018-12-11 13:53:45,005 [myid:] - INFO [SyncThread:0:ZooKeeperServer@683] - 
Established session 0x10060ff12a58dc0 with negotiated timeout 3 for client 
/10.38.128.8:46066
2018-12-11 13:53:45,071 [myid:] - INFO [ProcessThread(sid:0 
cport:2181)::PrepRequestProcessor@487] - Processed session termination for 
sessionid: 0x10060ff12a58dc0
2018-12-11 13:53:45,077 [myid:] - INFO 
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket 
connection for client /10.38.128.8:46066 which had sessionid 0x10060ff12a58dc0
2018-12-11 13:53:47,119 [myid:] - INFO 
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted 
socket connection from /10.36.0.8:48798
2018-12-11 13:53:47,124 [myid:] - INFO 
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@938] - Client 
attempting to establish new session at /10.36.0.8:48798
2018-12-11 13:53:47,134 [myid:] - INFO [SyncThread:0:ZooKeeperServer@683] - 
Established session 0x10060ff12a58dc1 with negotiated timeout 3 for client 
/10.36.0.8:48798
2018-12-11 13:53:47,582 [myid:] - INFO [ProcessThread(sid:0 
cport:2181)::PrepRequestProcessor@487] - Processed session termination for 
sessionid: 0x10060ff12a58dc1
2018-12-11 13:53:47,592 [myid:] - INFO 
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket 
connection for client /10.36.0.8:48798 which had sessionid 0x10060ff12a58dc1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7719) Improve fairness in SocketServer processors

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716853#comment-16716853
 ] 

ASF GitHub Bot commented on KAFKA-7719:
---

rajinisivaram opened a new pull request #6022: KAFKA-7719: Improve fairness in 
SocketServer processors (KIP-402)
URL: https://github.com/apache/kafka/pull/6022
 
 
   Limit the number of new connections processed in each iteration in 
`SocketServer` on each `Processor`. Block `Acceptor` if the connection queue is 
full on all Processors. Added a metric to track accept idle time percent. See 
KIP-402 for details.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve fairness in SocketServer processors
> ---
>
> Key: KAFKA-7719
> URL: https://issues.apache.org/jira/browse/KAFKA-7719
> Project: Kafka
>  Issue Type: Improvement
>  Components: network
>Affects Versions: 2.1.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.2.0
>
>
> See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-402%3A+Improve+fairness+in+SocketServer+processors
>  for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7720) kafka-configs script should also describe default broker entries

2018-12-11 Thread Edoardo Comar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716782#comment-16716782
 ] 

Edoardo Comar commented on KAFKA-7720:
--

cc [~mimaison]

> kafka-configs script should also describe default broker entries
> 
>
> Key: KAFKA-7720
> URL: https://issues.apache.org/jira/browse/KAFKA-7720
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Edoardo Comar
>Priority: Major
>
> Running the configs tool to describe the broker configs only appears to print 
> dynamically added entries.
> Running
> {{bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe 
> --entity-default --entity-type brokers}} 
> on a broker without a prior added configs 
> {{--alter --add-config 'key=value'}}
> will otherwise print an empty list.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7719) Improve fairness in SocketServer processors

2018-12-11 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-7719:
-

 Summary: Improve fairness in SocketServer processors
 Key: KAFKA-7719
 URL: https://issues.apache.org/jira/browse/KAFKA-7719
 Project: Kafka
  Issue Type: Improvement
  Components: network
Affects Versions: 2.1.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.2.0


See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-402%3A+Improve+fairness+in+SocketServer+processors
 for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7720) kafka-configs script should also describe default broker entries

2018-12-11 Thread Edoardo Comar (JIRA)
Edoardo Comar created KAFKA-7720:


 Summary: kafka-configs script should also describe default broker 
entries
 Key: KAFKA-7720
 URL: https://issues.apache.org/jira/browse/KAFKA-7720
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Reporter: Edoardo Comar


Running the configs tool to describe the broker configs only appears to print 
dynamically added entries.
Running
{{bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe 
--entity-default --entity-type brokers}} 
on a broker without a prior added configs 
{{--alter --add-config 'key=value'}}
will otherwise print an empty list.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-12-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716756#comment-16716756
 ] 

Matthias J. Sax commented on KAFKA-7580:


I agree that it is extremely hard. Unfortunately, I don't have any good advice 
how to tackle it... :(

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Minor
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/
> Hence the unit test does not throw an exception - "ProcessorStateException" 
> when the readOnly temporary file directory is opened, and the unit test 
> rightly fails for a root user.
> Two approaches for resolving this failing unit test case:-
> 1.) Run the unit tests as non-root users(simplest).
> 2.) If running the unit test as root user, make the temporary file directory 
> as immutable in the unit test code and then test for exception(needs code 
> changes in the unit tests):-
> root@p006vm18:/tmp# chattr +i /tmp/readOnlyDir/
> root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/grandparent

[jira] [Commented] (KAFKA-7708) [kafka-streams-scala] Invalid signature for KTable join in 2.12

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716720#comment-16716720
 ] 

ASF GitHub Bot commented on KAFKA-7708:
---

lodamar closed pull request #6019: KAFKA-7708: Fixed KTable tests using KStream 
API in scala tests
URL: https://github.com/apache/kafka/pull/6019
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
index dc080f13310..0ef50e383c9 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
@@ -34,26 +34,32 @@ class KTableTest extends FlatSpec with Matchers with 
TestDriver {
 val sourceTopic = "source"
 val sinkTopic = "sink"
 
-val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count()
-table.filter((_, value) => value > 1).toStream.to(sinkTopic)
+val table = builder.table[String, String](sourceTopic)
+table.mapValues(_.length).filter((_, value) => value > 
5).toStream.to(sinkTopic)
 
 val testDriver = createTestDriver(builder)
 
 {
-  testDriver.pipeRecord(sourceTopic, ("1", "value1"))
-  val record = testDriver.readRecord[String, Long](sinkTopic)
+  testDriver.pipeRecord(sourceTopic, ("1", "firstvalue"))
+  val record = testDriver.readRecord[String, Int](sinkTopic)
   record.key shouldBe "1"
-  record.value shouldBe (null: java.lang.Long)
+  record.value shouldBe 10
+}
+{
+  testDriver.pipeRecord(sourceTopic, ("1", "secondvalue"))
+  val record = testDriver.readRecord[String, Int](sinkTopic)
+  record.key shouldBe "1"
+  record.value shouldBe 11
 }
 {
-  testDriver.pipeRecord(sourceTopic, ("1", "value2"))
-  val record = testDriver.readRecord[String, Long](sinkTopic)
+  testDriver.pipeRecord(sourceTopic, ("1", "short"))
+  val record = testDriver.readRecord[String, Int](sinkTopic)
   record.key shouldBe "1"
-  record.value shouldBe 2
+  record.value shouldBe (null: java.lang.Long)
 }
 {
-  testDriver.pipeRecord(sourceTopic, ("2", "value1"))
-  val record = testDriver.readRecord[String, Long](sinkTopic)
+  testDriver.pipeRecord(sourceTopic, ("2", "val3"))
+  val record = testDriver.readRecord[String, Int](sinkTopic)
   record.key shouldBe "2"
   record.value shouldBe (null: java.lang.Long)
 }
@@ -67,30 +73,36 @@ class KTableTest extends FlatSpec with Matchers with 
TestDriver {
 val sourceTopic = "source"
 val sinkTopic = "sink"
 
-val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count()
-table.filterNot((_, value) => value > 1).toStream.to(sinkTopic)
+val table = builder.table[String, String](sourceTopic)
+table.filterNot((_, value) => 
value.exists(_.isUpper)).toStream.to(sinkTopic)
 
 val testDriver = createTestDriver(builder)
 
 {
-  testDriver.pipeRecord(sourceTopic, ("1", "value1"))
-  val record = testDriver.readRecord[String, Long](sinkTopic)
+  testDriver.pipeRecord(sourceTopic, ("1", "FirstValue"))
+  val record = testDriver.readRecord[String, String](sinkTopic)
   record.key shouldBe "1"
-  record.value shouldBe 1
+  record.value shouldBe (null: java.lang.String)
 }
 {
-  testDriver.pipeRecord(sourceTopic, ("1", "value2"))
-  val record = testDriver.readRecord[String, Long](sinkTopic)
+  testDriver.pipeRecord(sourceTopic, ("1", "secondvalue"))
+  val record = testDriver.readRecord[String, String](sinkTopic)
   record.key shouldBe "1"
-  record.value shouldBe (null: java.lang.Long)
+  record.value shouldBe "secondvalue"
 }
 {
-  testDriver.pipeRecord(sourceTopic, ("2", "value1"))
-  val record = testDriver.readRecord[String, Long](sinkTopic)
+  testDriver.pipeRecord(sourceTopic, ("1", "Short"))
+  val record = testDriver.readRecord[String, String](sinkTopic)
+  record.key shouldBe "1"
+  record.value shouldBe (null: java.lang.String)
+}
+{
+  testDriver.pipeRecord(sourceTopic, ("2", "val"))
+  val record = testDriver.readRecord[String, String](sinkTopic)
   record.key shouldBe "2"
-  record.value shouldBe 1
+  record.value shouldBe "val"
 }
-testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+testDriver.readRecord[String, String](sinkTopic) shouldBe null
 
 testDriver.close()
   }
@@ -101,17 +113,17 

[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716687#comment-16716687
 ] 

ASF GitHub Bot commented on KAFKA-6970:
---

mjsax closed pull request #6016: KAFKA-6970: All standard state stores guarded 
with read only wrapper
URL: https://github.com/apache/kafka/pull/6016
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
index ff3ef44894b..99ba0f6ce06 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -50,9 +50,18 @@
 private CachedStateStore cachedStateStore(final StateStore store) {
 if (store instanceof CachedStateStore) {
 return (CachedStateStore) store;
-} else if (store instanceof WrappedStateStore
-&& ((WrappedStateStore) store).wrappedStore() instanceof 
CachedStateStore) {
-return (CachedStateStore) ((WrappedStateStore) 
store).wrappedStore();
+} else if (store instanceof WrappedStateStore) {
+StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+
+while (wrapped instanceof WrappedStateStore && !(wrapped 
instanceof CachedStateStore)) {
+wrapped = ((WrappedStateStore) wrapped).wrappedStore();
+}
+
+if (!(wrapped instanceof CachedStateStore)) {
+return null;
+}
+
+return (CachedStateStore) wrapped;
 }
 return null;
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index c79ec35328a..e7dd4dbc42a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.internals.ApiUtils;
@@ -37,6 +38,7 @@
 
 import java.time.Duration;
 import java.util.List;
+import 
org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore;
 
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 
@@ -102,7 +104,16 @@ public StateStore getStateStore(final String name) {
 "please file a bug report at 
https://issues.apache.org/jira/projects/KAFKA.;);
 }
 
-return stateManager.getStore(name);
+final StateStore store = stateManager.getStore(name);
+if (store instanceof KeyValueStore) {
+return new KeyValueStoreReadWriteDecorator((KeyValueStore) store);
+} else if (store instanceof WindowStore) {
+return new WindowStoreReadWriteDecorator((WindowStore) store);
+} else if (store instanceof SessionStore) {
+return new SessionStoreReadWriteDecorator((SessionStore) store);
+}
+
+return store;
 }
 
 @SuppressWarnings("unchecked")
@@ -196,23 +207,16 @@ public long streamTime() {
 return streamTimeSupplier.get();
 }
 
-private abstract static class StateStoreReadOnlyDecorator implements StateStore {
+private abstract static class StateStoreReadOnlyDecorator extends AbstractStateStore {
 static final String ERROR_MESSAGE = "Global store is read only";
 
-final T underlying;
-
-StateStoreReadOnlyDecorator(final T underlying) {
-this.underlying = underlying;
-}
-
-@Override
-public String name() {
-return underlying.name();
+StateStoreReadOnlyDecorator(final T inner) {
+super(inner);
 }
 
-@Override
-public void init(final ProcessorContext context, final StateStore 
root) {
-underlying.init(context, root);
+@SuppressWarnings("unchecked")
+T getInner() {
+return (T) wrappedStore();
 }
 
 @Override
@@ -221,44 +225,39 @@ public void flush() {
 }
 
 @Override
-public void close() {
-underlying.close();
-}
-
-@Override
-public boolean persistent() {
-return underlying.persistent();
+ 

[jira] [Updated] (KAFKA-7443) OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic

2018-12-11 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7443:
---
Affects Version/s: 1.1.0
   1.1.1
   2.0.1

> OffsetOutOfRangeException in restoring state store from changelog topic when 
> start offset of local checkpoint is smaller than that of changelog topic
> -
>
> Key: KAFKA-7443
> URL: https://issues.apache.org/jira/browse/KAFKA-7443
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0
>Reporter: linyue li
>Assignee: linyue li
>Priority: Major
>  Labels: feather
> Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2
>
> Attachments: KAFKA-7443.url
>
>
> When restoring local state store from a changelog topic in EOS, kafka stream 
> will sometimes throw out the OffsetOutOfRangeException such as:
> {quote}Restoring StreamTasks failed. Deleting StreamTasks stores to recreate 
> from scratch.
>  org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
> range with no configured reset policy for partitions:
> {AuditTrailBatch_PROD3-Dedup-key-store-changelog-32=75465112}
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:950)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:470)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1249)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1157)
>  at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:89)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:765)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:734)
>  
> {quote}
> This scenario occurs when changelog topic deleted the expired log segments 
> according to the retention.ms, but the start offset in the local .checkpoint 
> file is the position when the task last exits from this instance, which may 
> be smaller than the updated beginning offset of changelog topic. Restoring 
> store from start offset in checkpoint file will throw exception.
> It can be reproduced as below (Kafka Stream runs in EOS):
>  # task for topic partition test-1 is running on instance A. When task exits, 
> kafka stream writes the last committed offset 100 for test-1 in checkpoint 
> file.
>  # task test-1 transfer to instance B.
>  # During this time, the remote changelog topic for test-1 updates its start 
> offset to 120 as the old log segment reaches retention time and is deleted.
>  # After a while, task test-1 exits from instance B and resumes on instance 
> A, and task restores local state store of A from checkpoint offset 100, which 
> is smaller than the valid offset 120 of changelog topic. Such exception 
> throws out.
> When this exception occurs, kafka stream tries to reinitialize the task and 
> intends to restore from beginning in catch block below. Unfortunately, this 
> handle not work and the task keeps throwing  OffsetOutOfRangeException in the 
> following restoring processes.
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> //handle for OffsetOutOfRangeException in kafka stream
> catch (final InvalidOffsetException recoverableException) {
>  log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to 
> recreate from scratch.", recoverableException);
>  final Set partitions = recoverableException.partitions();
>  for (final TopicPartition partition : partitions) {
>final StreamTask task = active.restoringTaskFor(partition);
>log.info("Reinitializing StreamTask {} for changelog {}", task, partition);
>needsInitializing.remove(partition);
>needsRestoring.remove(partition);
>
> task.reinitializeStateStoresForPartitions(recoverableException.partitions());
>  }
>  restoreConsumer.seekToBeginning(partitions);
> }{code}
>  
>  Investigate why the handle for this exception not work, I found the root 
> cause:
>  Kafka stream registered state restorers in the variable stateRestorers, 
> which is used to read /update the start and end offset for restoring local 
> state store.
> {code:java}
> 

[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-12-11 Thread Sarvesh Tamba (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716588#comment-16716588
 ] 

Sarvesh Tamba commented on KAFKA-7580:
--

Reproducing "pure virtual method" issue is extremely hard, since it happens 
intermittently and for any random unit test(not the same unit test will fail 
next time). The ones noted above were some of the failing unit tests observed. 
Note that the status next to the test shows PASSED(is this correct or 
misleading?).

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Minor
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/
> Hence the unit test does not throw an exception - "ProcessorStateException" 
> when the readOnly temporary file directory is opened, and the unit test 
> rightly fails for a root user.
> Two approaches for resolving this failing unit test case:-
> 1.) Run the unit tests as non-root users(simplest).
> 2.) If running the unit test as root user, make the temporary file directory 
> 

[jira] [Updated] (KAFKA-7443) OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic

2018-12-11 Thread linyue li (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

linyue li updated KAFKA-7443:
-
Fix Version/s: 2.1.1

> OffsetOutOfRangeException in restoring state store from changelog topic when 
> start offset of local checkpoint is smaller than that of changelog topic
> -
>
> Key: KAFKA-7443
> URL: https://issues.apache.org/jira/browse/KAFKA-7443
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.1.0
>Reporter: linyue li
>Assignee: linyue li
>Priority: Major
>  Labels: feather
> Fix For: 2.1.1
>
> Attachments: KAFKA-7443.url
>
>
> When restoring local state store from a changelog topic in EOS, kafka stream 
> will sometimes throw out the OffsetOutOfRangeException such as:
> {quote}Restoring StreamTasks failed. Deleting StreamTasks stores to recreate 
> from scratch.
>  org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
> range with no configured reset policy for partitions:
> {AuditTrailBatch_PROD3-Dedup-key-store-changelog-32=75465112}
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:950)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:470)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1249)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1157)
>  at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:89)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:765)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:734)
>  
> {quote}
> This scenario occurs when changelog topic deleted the expired log segments 
> according to the retention.ms, but the start offset in the local .checkpoint 
> file is the position when the task last exits from this instance, which may 
> be smaller than the updated beginning offset of changelog topic. Restoring 
> store from start offset in checkpoint file will throw exception.
> It can be reproduced as below (Kafka Stream runs in EOS):
>  # task for topic partition test-1 is running on instance A. When task exits, 
> kafka stream writes the last committed offset 100 for test-1 in checkpoint 
> file.
>  # task test-1 transfer to instance B.
>  # During this time, the remote changelog topic for test-1 updates its start 
> offset to 120 as the old log segment reaches retention time and is deleted.
>  # After a while, task test-1 exits from instance B and resumes on instance 
> A, and task restores local state store of A from checkpoint offset 100, which 
> is smaller than the valid offset 120 of changelog topic. Such exception 
> throws out.
> When this exception occurs, kafka stream tries to reinitialize the task and 
> intends to restore from beginning in catch block below. Unfortunately, this 
> handle not work and the task keeps throwing  OffsetOutOfRangeException in the 
> following restoring processes.
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> //handle for OffsetOutOfRangeException in kafka stream
> catch (final InvalidOffsetException recoverableException) {
>  log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to 
> recreate from scratch.", recoverableException);
>  final Set partitions = recoverableException.partitions();
>  for (final TopicPartition partition : partitions) {
>final StreamTask task = active.restoringTaskFor(partition);
>log.info("Reinitializing StreamTask {} for changelog {}", task, partition);
>needsInitializing.remove(partition);
>needsRestoring.remove(partition);
>
> task.reinitializeStateStoresForPartitions(recoverableException.partitions());
>  }
>  restoreConsumer.seekToBeginning(partitions);
> }{code}
>  
>  Investigate why the handle for this exception not work, I found the root 
> cause:
>  Kafka stream registered state restorers in the variable stateRestorers, 
> which is used to read /update the start and end offset for restoring local 
> state store.
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> private final Map stateRestorers = new 
> HashMap<>();{code}
> When the OffsetOutOfRangeException occurs, 

[jira] [Commented] (KAFKA-7443) OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716567#comment-16716567
 ] 

ASF GitHub Bot commented on KAFKA-7443:
---

mjsax closed pull request #5946: KAFKA-7443: OffsetOutOfRangeException in 
restoring state store from changelog topic when start offset of local 
checkpoint is smaller than that of changelog topic
URL: https://github.com/apache/kafka/pull/5946
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index e43c292e6a3..f877f9d13a4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -188,6 +188,10 @@ public synchronized void unsubscribe() {
 if (!subscriptions.isPaused(entry.getKey())) {
 final List> recs = entry.getValue();
 for (final ConsumerRecord rec : recs) {
+if (beginningOffsets.get(entry.getKey()) != null && 
beginningOffsets.get(entry.getKey()) > subscriptions.position(entry.getKey())) {
+throw new 
OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), 
subscriptions.position(entry.getKey(;
+}
+
 if (assignment().contains(entry.getKey()) && rec.offset() 
>= subscriptions.position(entry.getKey())) {
 results.computeIfAbsent(entry.getKey(), partition -> 
new ArrayList<>()).add(rec);
 subscriptions.position(entry.getKey(), rec.offset() + 
1);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 34e6e5cdb6f..fdd9d6c303c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -107,6 +107,8 @@ public void register(final StateRestorer restorer) {
 needsInitializing.remove(partition);
 needsRestoring.remove(partition);
 
+final StateRestorer restorer = stateRestorers.get(partition);
+restorer.setCheckpointOffset(StateRestorer.NO_CHECKPOINT);
 
task.reinitializeStateStoresForPartitions(recoverableException.partitions());
 }
 restoreConsumer.seekToBeginning(partitions);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 34f0a32b88c..d08f0d7360d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -157,6 +157,42 @@ public void 
shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() {
 assertThat(callback.restored.size(), equalTo(messages));
 }
 
+@Test
+public void 
shouldRecoverFromOffsetOutOfRangeExceptionAndRestoreFromStart() {
+final int messages = 10;
+final int startOffset = 5;
+final long expiredCheckpoint = 1L;
+assignPartition(messages, topicPartition);
+
consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, (long) 
startOffset));
+consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
(long) (messages + startOffset)));
+
+addRecords(messages, topicPartition, startOffset);
+consumer.assign(Collections.emptyList());
+
+final StateRestorer stateRestorer = new StateRestorer(
+topicPartition,
+restoreListener,
+expiredCheckpoint,
+Long.MAX_VALUE,
+true,
+"storeName");
+changelogReader.register(stateRestorer);
+
+
EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+EasyMock.replay(active, task);
+
+// first restore call "fails" since OffsetOutOfRangeException but we 
should not die with an exception
+assertEquals(0, changelogReader.restore(active).size());
+//the starting offset for stateRestorer is set to NO_CHECKPOINT
+assertThat(stateRestorer.checkpoint(), equalTo(-1L));
+
+//restore the active task again
+

[jira] [Updated] (KAFKA-7523) TransformerSupplier/ProcessorSupplier enhancements

2018-12-11 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7523:
---
Labels: kip  (was: needs-kip)

> TransformerSupplier/ProcessorSupplier enhancements
> --
>
> Key: KAFKA-7523
> URL: https://issues.apache.org/jira/browse/KAFKA-7523
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paul Whalen
>Assignee: Paul Whalen
>Priority: Minor
>  Labels: kip
>
> I have found that when writing "low level" {{Processors}} and 
> {{Transformers}} that are stateful, often I want these processors to "own" 
> one or more state stores, the details of which are not important to the 
> business logic of the application.  However, when incorporating these into 
> the topologies defined by the high level API, using {{KStream::transform}} or 
> {{KStream::process}}, I'm forced to specify the stores so the topology is 
> wired up correctly.  This creates an unfortunate pattern where the 
> {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the 
> pattern I've been following) holds the information about the name of the 
> state stores, must be defined above the "high level" "fluent API"-style 
> pipeline, which makes it hard to understand the business logic data flow.
>  
> What I currently have to do:
> {code:java}
> TransformerSupplier transformerSupplier = new 
> TransformerSupplierWithState(topology, val -> businessLogic(val));
> builder.stream("in.topic")
> .transform(transformerSupplier, transformerSupplier.stateStoreNames())
> .to("out.topic");{code}
> I have to both define the {{TransformerSupplier}} above the "fluent block", 
> and pass the topology in so I can call {{topology.addStateStore()}} inside 
> the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what 
> the state store names are for that point in the topology. The lambda {{val -> 
> businessLogic(val)}} is really what I want to see in-line because that's the 
> crux of what is happening, along with the name of some factory method 
> describing what the transformer is doing for me internally. This issue is 
> obviously exacerbated when the "fluent block" is much longer than this 
> example - It gets worse the farther away {{val -> businessLogic(val)}} is 
> from {{KStream::transform}}.
>  
> An improvement:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(topology, val -> 
> businessLogic(val)))
> .to("out.topic");{code}
> Which implies the existence of a {{KStream::transform}} that takes a single 
> argument that adheres to this interface:
> {code:java}
> interface TransformerSupplierWithState {
> Transformer get();
> String[] stateStoreNames();
> }{code}
> Or better yet, I wouldn't have to pass in the topology, the caller of 
> {{TransformerSupplierWithState}} could also handle the job of "adding" its 
> state stores to the topology:
> {code:java}
> interface TransformerSupplierWithState {
> Transformer get();
> Map stateStores();
> }{code}
> Which would enable my ideal:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(val -> businessLogic(val)))
> .to("out.topic");{code}
> I think this would be a huge improvement in the usability of low-level 
> processors with the high-level DSL.
> Please let me know if I'm missing something as to why this cannot or should 
> not happen, or if there is a better forum for this suggestion (presumably it 
> would require a KIP?). I'd be happy to build it as well if there is a chance 
> of it being merged, it doesn't seem like a huge challenge to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-12-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716433#comment-16716433
 ] 

Matthias J. Sax commented on KAFKA-7580:


"pure virtual method" indicates an issue with RocksDB. Maybe some binaries are 
not compatible on your OS? It could also be a bug in Streams code base itself 
thought. It's pretty hard to debug those. Can you reproduce it reliably? Do you 
know test fails at which point?

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Minor
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/
> Hence the unit test does not throw an exception - "ProcessorStateException" 
> when the readOnly temporary file directory is opened, and the unit test 
> rightly fails for a root user.
> Two approaches for resolving this failing unit test case:-
> 1.) Run the unit tests as non-root users(simplest).
> 2.) If running the unit test as root user, make the temporary file directory 
> as immutable in the unit test code and then