[jira] [Commented] (KAFKA-4029) SSL support for Connect REST API

2017-10-10 Thread Jakub Scholz (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198318#comment-16198318
 ] 

Jakub Scholz commented on KAFKA-4029:
-

Authorization / ACLs would be of course another valuable feature. But I would 
prefer to keep it out of this KIP to keep the scope under control. I updated 
the KIP to make this clear. Thanks for the suggestion.

> SSL support for Connect REST API
> 
>
> Key: KAFKA-4029
> URL: https://issues.apache.org/jira/browse/KAFKA-4029
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Ewen Cheslack-Postava
>Assignee: Jakub Scholz
>
> Currently the Connect REST API only supports http. We should also add SSL 
> support so access to the REST API can be secured.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6040) Expose topic size for monitoring

2017-10-10 Thread Michael Andre Pearce (JIRA)
Michael Andre Pearce created KAFKA-6040:
---

 Summary: Expose topic size for monitoring
 Key: KAFKA-6040
 URL: https://issues.apache.org/jira/browse/KAFKA-6040
 Project: Kafka
  Issue Type: New Feature
Reporter: Michael Andre Pearce


Currently it seems there is no monitoring endpoint of topic size, even though 
the cluster must calculate such estimation to maintain retention.size

This is useful operationally to get understanding of actual topic usages, based 
on what the broker believes.

As interim it seems some scripts exist to get this, but it would be ideal to 
get the value the broker believes/calculates.

https://gist.github.com/kapkaev/ef633348dbecfedc0166



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6022) mirror maker stores offset in zookeeper

2017-10-10 Thread Ronald van de Kuil (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198377#comment-16198377
 ] 

Ronald van de Kuil commented on KAFKA-6022:
---

Hi Manikumar,

I believe that I have found a little improvement option then

I am at the latest version and the help states that new consumer is the 
default. 

pi@pi5:/opt/kafka_2.11-0.11.0.0 $ bin/kafka-mirror-maker.sh --help

Option   Description
--   ---
..
--new.consumer   Use new consumer in mirror maker (this 
is the default).

The current documentation on the kafka site did not mention the --new.consumer 
flag. Quite understandable if the --new.consumer is the default.

>From what I have seen in the Kafka Tool the --new.consumer is then not the 
>default somewhere deep down in the kafka.tools.MirrorMaker class. That can 
>then be improved. 

A second option would be to update the documentation.

Switching from zookeeper to kafka to keep track of the kafkaMirror offset ... 
how would such an upgrade take place? I bet that it would be advisable to 
rebuild the mirror cluster from scratch to prevent duplicates, am I right?




> mirror maker stores offset in zookeeper
> ---
>
> Key: KAFKA-6022
> URL: https://issues.apache.org/jira/browse/KAFKA-6022
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ronald van de Kuil
>Priority: Minor
>
> I happened to notice that the mirror maker stores its offset in zookeeper. 
> I do not see it using:
> bin/kafka-consumer-groups.sh --bootstrap-server pi1:9092 --new-consumer --list
> I do however see consumers that store their offset in kafka.
> I would guess that storing the offset in zookeeper is old style?
> Would it be an idea to update the mirror maker to the new consumer style?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6021) run kafka-mirror-maker.sh in daemon mode

2017-10-10 Thread Ronald van de Kuil (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198384#comment-16198384
 ] 

Ronald van de Kuil commented on KAFKA-6021:
---

Liie a NOOB I now run it in screen ;)

> run kafka-mirror-maker.sh in daemon mode
> 
>
> Key: KAFKA-6021
> URL: https://issues.apache.org/jira/browse/KAFKA-6021
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ronald van de Kuil
>Priority: Minor
>
> I run kafka_2.11-0.11.0.0. 
> It would be nice if the mirror maker could run in daemon mode like the broker 
> and the zookeeper.
> pi@pi5:/opt/kafka_2.11-0.11.0.0 $ bin/kafka-mirror-maker.sh -daemon 
> --consumer.config config/DC1.properties --num.streams 1 --producer.config 
> config/DC2.properties --whitelist=".*"
> Java HotSpot(TM) Server VM warning: G1 GC is disabled in this release.
> [2017-10-08 08:36:10,627] ERROR Exception when starting mirror maker. 
> (kafka.tools.MirrorMaker$)
> joptsimple.UnrecognizedOptionException: d is not a recognized option
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6041) "--producer.config" option doesn't work

2017-10-10 Thread Phoenix_Dad (JIRA)
Phoenix_Dad created KAFKA-6041:
--

 Summary: "--producer.config" option doesn't work
 Key: KAFKA-6041
 URL: https://issues.apache.org/jira/browse/KAFKA-6041
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.11.0.0
Reporter: Phoenix_Dad
Priority: Minor


The configured parameters in the config\producer.properties file will be 
overwritten by the parameter default values.

For example:

//modify the config\producer.properties file:
bootstrap.servers=kafkahost:9092
linger.ms=2000
batch.size=1024

//excute the bin\Kafka-console-producer.sh 
bin\Kafka-console-producer --topic test --producer.config 
config/producer.properties

//while excuting the ConsoleProducer Class, the batch.size is set  to 200, the 
linger.ms is set to 1000, the bootstrap.servers is set to localhost:9092. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6041) "--producer.config" option doesn't work

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198392#comment-16198392
 ] 

ASF GitHub Bot commented on KAFKA-6041:
---

GitHub user wujianping10043419 opened a pull request:

https://github.com/apache/kafka/pull/4050

KAFKA-6041: "--producer.config" option doesn't work"

The configured parameters in the config\producer.properties file will be 
overwritten by the parameter default values.

For example:

//modify the config\producer.properties file:
bootstrap.servers=kafkahost:9092
linger.ms=2000
batch.size=1024

//excute the bin\Kafka-console-producer.sh 
bin\Kafka-console-producer --topic test --producer.config 
config/producer.properties

//while excuting the ConsoleProducer Class, the batch.size is set  to 200, 
the linger.ms is set to 1000, the bootstrap.servers is set to localhost:9092. 


The correct way is to overwrite the default parameters values by configured 
values.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wujianping10043419/kafka wjp-1010

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4050.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4050


commit 03348f4541365ec5b4e1dd69cd711a8245c84703
Author: 10043419 
Date:   2017-10-10T07:26:08Z

"--producer.config" option doesn't work"




> "--producer.config" option doesn't work
> ---
>
> Key: KAFKA-6041
> URL: https://issues.apache.org/jira/browse/KAFKA-6041
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0
>Reporter: Phoenix_Dad
>Priority: Minor
>
> The configured parameters in the config\producer.properties file will be 
> overwritten by the parameter default values.
> For example:
> //modify the config\producer.properties file:
> bootstrap.servers=kafkahost:9092
> linger.ms=2000
> batch.size=1024
> //excute the bin\Kafka-console-producer.sh 
> bin\Kafka-console-producer --topic test --producer.config 
> config/producer.properties
> //while excuting the ConsoleProducer Class, the batch.size is set  to 200, 
> the linger.ms is set to 1000, the bootstrap.servers is set to localhost:9092. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6040) Expose topic size for monitoring

2017-10-10 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198397#comment-16198397
 ] 

Ismael Juma commented on KAFKA-6040:


Retention is done on a partition, not topic, so the broker doesn't need to 
maintain a topic size at the moment. The partition size is exposed via a Log 
size metric

> Expose topic size for monitoring
> 
>
> Key: KAFKA-6040
> URL: https://issues.apache.org/jira/browse/KAFKA-6040
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Michael Andre Pearce
>
> Currently it seems there is no monitoring endpoint of topic size, even though 
> the cluster must calculate such estimation to maintain retention.size
> This is useful operationally to get understanding of actual topic usages, 
> based on what the broker believes.
> As interim it seems some scripts exist to get this, but it would be ideal to 
> get the value the broker believes/calculates.
> https://gist.github.com/kapkaev/ef633348dbecfedc0166



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6022) mirror maker stores offset in zookeeper

2017-10-10 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198416#comment-16198416
 ] 

Manikumar commented on KAFKA-6022:
--

Since 0.10.1.0,  no need to pass --new-consumer option. You just need to pass 
broker connect string (bootstrap.servers) in consumer.config file
Check here: http://kafka.apache.org/documentation.html#upgrade_1010_notable

check for below question in the FAQ section.
How do we migrate to committing offsets to Kafka (rather than Zookeeper) in 
0.8.2?
https://cwiki.apache.org/confluence/display/KAFKA/FAQ

Yes, If you have an option, it is better to mirror cluster from scratch using 
new consumer.

Pl close the JIRA, if it solves your issue.



> mirror maker stores offset in zookeeper
> ---
>
> Key: KAFKA-6022
> URL: https://issues.apache.org/jira/browse/KAFKA-6022
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ronald van de Kuil
>Priority: Minor
>
> I happened to notice that the mirror maker stores its offset in zookeeper. 
> I do not see it using:
> bin/kafka-consumer-groups.sh --bootstrap-server pi1:9092 --new-consumer --list
> I do however see consumers that store their offset in kafka.
> I would guess that storing the offset in zookeeper is old style?
> Would it be an idea to update the mirror maker to the new consumer style?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6042) Kafka Request Handler deadlocks and brings down the cluster.

2017-10-10 Thread Ben Corlett (JIRA)
Ben Corlett created KAFKA-6042:
--

 Summary: Kafka Request Handler deadlocks and brings down the 
cluster.
 Key: KAFKA-6042
 URL: https://issues.apache.org/jira/browse/KAFKA-6042
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.0, 0.11.0.1
 Environment: kafka version: 0.11.0.1
client versions: 0.8.2.1-0.10.2.1
platform: aws (eu-west-1a)
nodes: 36 x r4.xlarge
disk storage: 2.5 tb per node (~73% usage per node)
topics: 250
number of partitions: 48k (approx)
os: ubuntu 14.04
jvm: Java(TM) SE Runtime Environment (build 1.8.0_131-b11), Java HotSpot(TM) 
64-Bit Server VM (build 25.131-b11, mixed mode)
Reporter: Ben Corlett
Priority: Critical
 Attachments: thread_dump.txt.gz

We have been experiencing a deadlock that happens on a consistent server within 
our cluster. This happens multiple times a week currently. It first started 
happening when we upgraded to 0.11.0.0. Sadly 0.11.0.1 failed to resolve the 
issue.

Sequence of events:

At a seemingly random time broker 125 goes into a deadlock. As soon as it is 
deadlocked it will remove all the ISR's for any partition is its the leader for.

[2017-10-10 00:06:10,061] INFO Partition [XX,24] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,073] INFO Partition [XX,974] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,079] INFO Partition [XX,64] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,081] INFO Partition [XX,21] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,084] INFO Partition [XX,12] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,085] INFO Partition [XX,61] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,086] INFO Partition [XX,53] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,088] INFO Partition [XX,27] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,090] INFO Partition [XX,182] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,091] INFO Partition [XX,16] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)



The other nodes fail to connect to the node 125 

[2017-10-10 00:08:42,318] WARN [ReplicaFetcherThread-0-125]: Error in fetch to 
broker 125, request (type=FetchRequest, replicaId=101, maxWait=500, minBytes=1, 
maxBytes=10485760, fetchData={XX-94=(offset=0, logStartOffset=0, 
maxBytes=1048576), XX-22=(offset=0, logStartOffset=0, 
maxBytes=1048576), XX-58=(offset=0, logStartOffset=0, 
maxBytes=1048576), XX-11=(offset=78932482, logStartOffset=50881481, 
maxBytes=1048576), XX-55=(offset=0, logStartOffset=0, 
maxBytes=1048576), XX-19=(offset=0, logStartOffset=0, 
maxBytes=1048576), XX-91=(offset=0, logStartOffset=0, 
maxBytes=1048576), XX-5=(offset=903857106, logStartOffset=0, 
maxBytes=1048576), XX-80=(offset=0, logStartOffset=0, 
maxBytes=1048576), XX-88=(offset=0, logStartOffset=0, 
maxBytes=1048576), XX-34=(offset=308, logStartOffset=308, 
maxBytes=1048576), XX-7=(offset=369990, logStartOffset=369990, 
maxBytes=1048576), XX-0=(offset=57965795, logStartOffset=0, 
maxBytes=1048576)}) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 125 was disconnected before the response was 
read
at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93)
at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)

As node 125 removed all the ISRs as it was locking up, a failover for any 
partition without an unclean leader election is not possible. This breaks any 
partition that this node was leader for. As we spread all topics across all 
servers this essentials brings down the entire cluster.


Recovery:

Unforuntately with broker 125 in its deadlocked state a clean shutdown does not 
work. A kill -9 is necessary. After an unclean shutdown the indexes must be 
rebuilt and start up time is around 10 minutes. After node 125 finally starts 
the cluster reco

[jira] [Commented] (KAFKA-6042) Kafka Request Handler deadlocks and brings down the cluster.

2017-10-10 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198453#comment-16198453
 ] 

Manikumar commented on KAFKA-6042:
--

This may be related KAFKA-5970

> Kafka Request Handler deadlocks and brings down the cluster.
> 
>
> Key: KAFKA-6042
> URL: https://issues.apache.org/jira/browse/KAFKA-6042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1
> Environment: kafka version: 0.11.0.1
> client versions: 0.8.2.1-0.10.2.1
> platform: aws (eu-west-1a)
> nodes: 36 x r4.xlarge
> disk storage: 2.5 tb per node (~73% usage per node)
> topics: 250
> number of partitions: 48k (approx)
> os: ubuntu 14.04
> jvm: Java(TM) SE Runtime Environment (build 1.8.0_131-b11), Java HotSpot(TM) 
> 64-Bit Server VM (build 25.131-b11, mixed mode)
>Reporter: Ben Corlett
>Priority: Critical
> Attachments: thread_dump.txt.gz
>
>
> We have been experiencing a deadlock that happens on a consistent server 
> within our cluster. This happens multiple times a week currently. It first 
> started happening when we upgraded to 0.11.0.0. Sadly 0.11.0.1 failed to 
> resolve the issue.
> Sequence of events:
> At a seemingly random time broker 125 goes into a deadlock. As soon as it is 
> deadlocked it will remove all the ISR's for any partition is its the leader 
> for.
> [2017-10-10 00:06:10,061] INFO Partition [XX,24] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,073] INFO Partition [XX,974] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,079] INFO Partition [XX,64] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,081] INFO Partition [XX,21] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,084] INFO Partition [XX,12] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,085] INFO Partition [XX,61] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,086] INFO Partition [XX,53] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,088] INFO Partition [XX,27] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,090] INFO Partition [XX,182] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,091] INFO Partition [XX,16] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> 
> The other nodes fail to connect to the node 125 
> [2017-10-10 00:08:42,318] WARN [ReplicaFetcherThread-0-125]: Error in fetch 
> to broker 125, request (type=FetchRequest, replicaId=101, maxWait=500, 
> minBytes=1, maxBytes=10485760, fetchData={XX-94=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-22=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-58=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-11=(offset=78932482, 
> logStartOffset=50881481, maxBytes=1048576), XX-55=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-19=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-91=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-5=(offset=903857106, 
> logStartOffset=0, maxBytes=1048576), XX-80=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-88=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-34=(offset=308, 
> logStartOffset=308, maxBytes=1048576), XX-7=(offset=369990, 
> logStartOffset=369990, maxBytes=1048576), XX-0=(offset=57965795, 
> logStartOffset=0, maxBytes=1048576)}) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 125 was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> As node 125 removed all the ISRs as it was locking up, a failover for any 
> partition without an unclean leader election is not possible. This breaks 

[jira] [Commented] (KAFKA-6042) Kafka Request Handler deadlocks and brings down the cluster.

2017-10-10 Thread Ben Corlett (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198483#comment-16198483
 ] 

Ben Corlett commented on KAFKA-6042:


The second stack trace in that issue looks very familiar. You may well be 
right. I should have searched a bit harder.

Do you know when 0.11.0.2 will be released. Wondering if I should create a 
build with that pull request?

> Kafka Request Handler deadlocks and brings down the cluster.
> 
>
> Key: KAFKA-6042
> URL: https://issues.apache.org/jira/browse/KAFKA-6042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1
> Environment: kafka version: 0.11.0.1
> client versions: 0.8.2.1-0.10.2.1
> platform: aws (eu-west-1a)
> nodes: 36 x r4.xlarge
> disk storage: 2.5 tb per node (~73% usage per node)
> topics: 250
> number of partitions: 48k (approx)
> os: ubuntu 14.04
> jvm: Java(TM) SE Runtime Environment (build 1.8.0_131-b11), Java HotSpot(TM) 
> 64-Bit Server VM (build 25.131-b11, mixed mode)
>Reporter: Ben Corlett
>Priority: Critical
> Attachments: thread_dump.txt.gz
>
>
> We have been experiencing a deadlock that happens on a consistent server 
> within our cluster. This happens multiple times a week currently. It first 
> started happening when we upgraded to 0.11.0.0. Sadly 0.11.0.1 failed to 
> resolve the issue.
> Sequence of events:
> At a seemingly random time broker 125 goes into a deadlock. As soon as it is 
> deadlocked it will remove all the ISR's for any partition is its the leader 
> for.
> [2017-10-10 00:06:10,061] INFO Partition [XX,24] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,073] INFO Partition [XX,974] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,079] INFO Partition [XX,64] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,081] INFO Partition [XX,21] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,084] INFO Partition [XX,12] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,085] INFO Partition [XX,61] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,086] INFO Partition [XX,53] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,088] INFO Partition [XX,27] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,090] INFO Partition [XX,182] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,091] INFO Partition [XX,16] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> 
> The other nodes fail to connect to the node 125 
> [2017-10-10 00:08:42,318] WARN [ReplicaFetcherThread-0-125]: Error in fetch 
> to broker 125, request (type=FetchRequest, replicaId=101, maxWait=500, 
> minBytes=1, maxBytes=10485760, fetchData={XX-94=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-22=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-58=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-11=(offset=78932482, 
> logStartOffset=50881481, maxBytes=1048576), XX-55=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-19=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-91=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-5=(offset=903857106, 
> logStartOffset=0, maxBytes=1048576), XX-80=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-88=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-34=(offset=308, 
> logStartOffset=308, maxBytes=1048576), XX-7=(offset=369990, 
> logStartOffset=369990, maxBytes=1048576), XX-0=(offset=57965795, 
> logStartOffset=0, maxBytes=1048576)}) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 125 was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.Shutdo

[jira] [Commented] (KAFKA-6013) Controller getting stuck

2017-10-10 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198489#comment-16198489
 ] 

Ismael Juma commented on KAFKA-6013:


cc [~onurkaraman] [~junrao]

> Controller getting stuck
> 
>
> Key: KAFKA-6013
> URL: https://issues.apache.org/jira/browse/KAFKA-6013
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Ivan Babrou
>  Labels: reliability
>
> It looks like a new issue in 0.11.0.0 and 0.11.0.1 still has it.
> We upgraded one of the clusters from 0.11.0.0 to 0.11.0.1 by shutting down 28 
> machines at once (single rack). When nodes came up none of them progressed 
> after these log lines:
> {noformat}
> Oct 05 02:17:42 mybroker14 kafka[32940]: INFO Kafka version : 0.11.0.1 
> (org.apache.kafka.common.utils.AppInfoParser)
> Oct 05 02:17:42 mybroker14 kafka[32940]: INFO Kafka commitId : 
> c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser)
> Oct 05 02:17:42 mybroker14 kafka[32940]: INFO [Kafka Server 10014], started 
> (kafka.server.KafkaServer)
> {noformat}
> There was no indication in controller node logs that it picked up rebooted 
> nodes. This happened multiple times during the upgrade: once per rack plus 
> some on top of that.
> Reboot took ~20m, all nodes in a single rack rebooted in parallel.
> The fix was to restart controller node, but that did not go cleanly too:
> {noformat}
> ivan@mybroker26:~$ sudo journalctl --since 01:00 -u kafka | fgrep 'Error 
> during controlled shutdown' -A1
> Oct 05 01:57:41 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
> during controlled shutdown, possibly because leader movement took longer than 
> the configured controller.socket.timeout.ms and/or request.timeout.ms: 
> Connection to 10026 was disconnected before the response was read 
> (kafka.server.KafkaServer)
> Oct 05 01:57:46 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Retrying 
> controlled shutdown after the previous attempt failed... 
> (kafka.server.KafkaServer)
> --
> Oct 05 01:58:16 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
> during controlled shutdown, possibly because leader movement took longer than 
> the configured controller.socket.timeout.ms and/or request.timeout.ms: 
> Connection to 10026 was disconnected before the response was read 
> (kafka.server.KafkaServer)
> Oct 05 01:58:18 mybroker26 kafka[37409]: INFO Rolled new log segment for 
> 'requests-40' in 3 ms. (kafka.log.Log)
> --
> Oct 05 01:58:51 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
> during controlled shutdown, possibly because leader movement took longer than 
> the configured controller.socket.timeout.ms and/or request.timeout.ms: 
> Connection to 10026 was disconnected before the response was read 
> (kafka.server.KafkaServer)
> Oct 05 01:58:56 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Retrying 
> controlled shutdown after the previous attempt failed... 
> (kafka.server.KafkaServer)
> {noformat}
> I'm unable to reproduce the issue by just restarting or even rebooting one 
> broker, controller picks it up:
> {noformat}
> Oct 05 03:18:18 mybroker83 kafka[37402]: INFO [Controller 10083]: Newly added 
> brokers: 10001, deleted brokers: , all live brokers: ...
> {noformat}
> KAFKA-5028 happened in 0.11.0.0, so it's likely related.
> cc [~ijuma]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6013) Controller getting stuck

2017-10-10 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6013:
---
Labels: reliability  (was: )

> Controller getting stuck
> 
>
> Key: KAFKA-6013
> URL: https://issues.apache.org/jira/browse/KAFKA-6013
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Ivan Babrou
>  Labels: reliability
>
> It looks like a new issue in 0.11.0.0 and 0.11.0.1 still has it.
> We upgraded one of the clusters from 0.11.0.0 to 0.11.0.1 by shutting down 28 
> machines at once (single rack). When nodes came up none of them progressed 
> after these log lines:
> {noformat}
> Oct 05 02:17:42 mybroker14 kafka[32940]: INFO Kafka version : 0.11.0.1 
> (org.apache.kafka.common.utils.AppInfoParser)
> Oct 05 02:17:42 mybroker14 kafka[32940]: INFO Kafka commitId : 
> c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser)
> Oct 05 02:17:42 mybroker14 kafka[32940]: INFO [Kafka Server 10014], started 
> (kafka.server.KafkaServer)
> {noformat}
> There was no indication in controller node logs that it picked up rebooted 
> nodes. This happened multiple times during the upgrade: once per rack plus 
> some on top of that.
> Reboot took ~20m, all nodes in a single rack rebooted in parallel.
> The fix was to restart controller node, but that did not go cleanly too:
> {noformat}
> ivan@mybroker26:~$ sudo journalctl --since 01:00 -u kafka | fgrep 'Error 
> during controlled shutdown' -A1
> Oct 05 01:57:41 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
> during controlled shutdown, possibly because leader movement took longer than 
> the configured controller.socket.timeout.ms and/or request.timeout.ms: 
> Connection to 10026 was disconnected before the response was read 
> (kafka.server.KafkaServer)
> Oct 05 01:57:46 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Retrying 
> controlled shutdown after the previous attempt failed... 
> (kafka.server.KafkaServer)
> --
> Oct 05 01:58:16 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
> during controlled shutdown, possibly because leader movement took longer than 
> the configured controller.socket.timeout.ms and/or request.timeout.ms: 
> Connection to 10026 was disconnected before the response was read 
> (kafka.server.KafkaServer)
> Oct 05 01:58:18 mybroker26 kafka[37409]: INFO Rolled new log segment for 
> 'requests-40' in 3 ms. (kafka.log.Log)
> --
> Oct 05 01:58:51 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
> during controlled shutdown, possibly because leader movement took longer than 
> the configured controller.socket.timeout.ms and/or request.timeout.ms: 
> Connection to 10026 was disconnected before the response was read 
> (kafka.server.KafkaServer)
> Oct 05 01:58:56 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Retrying 
> controlled shutdown after the previous attempt failed... 
> (kafka.server.KafkaServer)
> {noformat}
> I'm unable to reproduce the issue by just restarting or even rebooting one 
> broker, controller picks it up:
> {noformat}
> Oct 05 03:18:18 mybroker83 kafka[37402]: INFO [Controller 10083]: Newly added 
> brokers: 10001, deleted brokers: , all live brokers: ...
> {noformat}
> KAFKA-5028 happened in 0.11.0.0, so it's likely related.
> cc [~ijuma]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3359) Parallel log-recovery of un-flushed segments on startup

2017-10-10 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198493#comment-16198493
 ] 

Ismael Juma commented on KAFKA-3359:


[~bobrik], it shouldn't read all data from the partition. You may have run into 
https://github.com/apache/kafka/commit/91517e8fbd7767ba6d7f43b517f5a26b6f870585

> Parallel log-recovery of un-flushed segments on startup
> ---
>
> Key: KAFKA-3359
> URL: https://issues.apache.org/jira/browse/KAFKA-3359
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1
>Reporter: Vamsi Subhash Achanta
>Assignee: Jay Kreps
>
> On startup, currently the log segments within a logDir are loaded 
> sequentially when there is a un-clean shutdown. This will take a lot of time 
> for the segments to be loaded as the logSegment.recover(..) is called for 
> every segment and for brokers which have many partitions, the time taken will 
> be very high (we have noticed ~40mins for 2k partitions).
> https://github.com/apache/kafka/pull/1035
> This pull request will make the log-segment load parallel with two 
> configurable properties "log.recovery.threads" and 
> "log.recovery.max.interval.ms".
> Logic:
> 1. Have a threadpool defined of fixed length (log.recovery.threads)
> 2. Submit the logSegment recovery as a job to the threadpool and add the 
> future returned to a job list
> 3. Wait till all the jobs are done within req. time 
> (log.recovery.max.interval.ms - default set to Long.Max).
> 4. If they are done and the futures are all null (meaning that the jobs are 
> successfully completed), it is considered done.
> 5. If any of the recovery jobs failed, then it is logged and 
> LogRecoveryFailedException is thrown
> 6. If the timeout is reached, LogRecoveryFailedException is thrown.
> The logic is backward compatible with the current sequential implementation 
> as the default thread count is set to 1.
> PS: I am new to Scala and the code might look Java-ish but I will be happy to 
> modify the code review changes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6042) Kafka Request Handler deadlocks and brings down the cluster.

2017-10-10 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198497#comment-16198497
 ] 

Manikumar commented on KAFKA-6042:
--

It would we great if you can use KAFKA-5970 and validate in your cluster.

Kafka Community has not yet started discussion on 0.11.0.2 release.  




> Kafka Request Handler deadlocks and brings down the cluster.
> 
>
> Key: KAFKA-6042
> URL: https://issues.apache.org/jira/browse/KAFKA-6042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1
> Environment: kafka version: 0.11.0.1
> client versions: 0.8.2.1-0.10.2.1
> platform: aws (eu-west-1a)
> nodes: 36 x r4.xlarge
> disk storage: 2.5 tb per node (~73% usage per node)
> topics: 250
> number of partitions: 48k (approx)
> os: ubuntu 14.04
> jvm: Java(TM) SE Runtime Environment (build 1.8.0_131-b11), Java HotSpot(TM) 
> 64-Bit Server VM (build 25.131-b11, mixed mode)
>Reporter: Ben Corlett
>Priority: Critical
> Attachments: thread_dump.txt.gz
>
>
> We have been experiencing a deadlock that happens on a consistent server 
> within our cluster. This happens multiple times a week currently. It first 
> started happening when we upgraded to 0.11.0.0. Sadly 0.11.0.1 failed to 
> resolve the issue.
> Sequence of events:
> At a seemingly random time broker 125 goes into a deadlock. As soon as it is 
> deadlocked it will remove all the ISR's for any partition is its the leader 
> for.
> [2017-10-10 00:06:10,061] INFO Partition [XX,24] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,073] INFO Partition [XX,974] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,079] INFO Partition [XX,64] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,081] INFO Partition [XX,21] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,084] INFO Partition [XX,12] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,085] INFO Partition [XX,61] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,086] INFO Partition [XX,53] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,088] INFO Partition [XX,27] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,090] INFO Partition [XX,182] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,091] INFO Partition [XX,16] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> 
> The other nodes fail to connect to the node 125 
> [2017-10-10 00:08:42,318] WARN [ReplicaFetcherThread-0-125]: Error in fetch 
> to broker 125, request (type=FetchRequest, replicaId=101, maxWait=500, 
> minBytes=1, maxBytes=10485760, fetchData={XX-94=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-22=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-58=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-11=(offset=78932482, 
> logStartOffset=50881481, maxBytes=1048576), XX-55=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-19=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-91=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-5=(offset=903857106, 
> logStartOffset=0, maxBytes=1048576), XX-80=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-88=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-34=(offset=308, 
> logStartOffset=308, maxBytes=1048576), XX-7=(offset=369990, 
> logStartOffset=369990, maxBytes=1048576), XX-0=(offset=57965795, 
> logStartOffset=0, maxBytes=1048576)}) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 125 was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> As node 125 removed all the ISRs a

[jira] [Assigned] (KAFKA-6027) System test failure: LogDirFailureTest

2017-10-10 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-6027:
--

Assignee: Dong Lin

https://github.com/apache/kafka/pull/4045

> System test failure: LogDirFailureTest
> --
>
> Key: KAFKA-6027
> URL: https://issues.apache.org/jira/browse/KAFKA-6027
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Dong Lin
>Priority: Blocker
> Fix For: 1.0.0
>
>
> LogDirFailureTest started failing a few days ago:
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-04--001.1507121832--apache--trunk--cbef33f/report.html
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-06--001.1507295254--apache--trunk--196bcfc/report.html
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-07--001.1507382831--apache--trunk--e2e8d4a/report.html
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-08--001.1507468285--apache--trunk--a1ea536/report.html
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-09--001.1507555111--apache--trunk--a1ea536/report.html
> Last good build:
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-02--001.1506949758--apache--trunk--4f4f995/report.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6042) Kafka Request Handler deadlocks and brings down the cluster.

2017-10-10 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198505#comment-16198505
 ] 

Ismael Juma commented on KAFKA-6042:


[~corlettb], you could create a build from the 0.11.0 branch as you probably 
don't want to wait until the new release is out. Also, it would be good to have 
confirmation that it does indeed fix the issue. It should do, but until we have 
someone who has confirmed it in their environment, we can't be 100% sure.

> Kafka Request Handler deadlocks and brings down the cluster.
> 
>
> Key: KAFKA-6042
> URL: https://issues.apache.org/jira/browse/KAFKA-6042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1
> Environment: kafka version: 0.11.0.1
> client versions: 0.8.2.1-0.10.2.1
> platform: aws (eu-west-1a)
> nodes: 36 x r4.xlarge
> disk storage: 2.5 tb per node (~73% usage per node)
> topics: 250
> number of partitions: 48k (approx)
> os: ubuntu 14.04
> jvm: Java(TM) SE Runtime Environment (build 1.8.0_131-b11), Java HotSpot(TM) 
> 64-Bit Server VM (build 25.131-b11, mixed mode)
>Reporter: Ben Corlett
>Priority: Critical
> Attachments: thread_dump.txt.gz
>
>
> We have been experiencing a deadlock that happens on a consistent server 
> within our cluster. This happens multiple times a week currently. It first 
> started happening when we upgraded to 0.11.0.0. Sadly 0.11.0.1 failed to 
> resolve the issue.
> Sequence of events:
> At a seemingly random time broker 125 goes into a deadlock. As soon as it is 
> deadlocked it will remove all the ISR's for any partition is its the leader 
> for.
> [2017-10-10 00:06:10,061] INFO Partition [XX,24] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,073] INFO Partition [XX,974] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,079] INFO Partition [XX,64] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,081] INFO Partition [XX,21] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,084] INFO Partition [XX,12] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,085] INFO Partition [XX,61] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,086] INFO Partition [XX,53] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,088] INFO Partition [XX,27] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,090] INFO Partition [XX,182] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,091] INFO Partition [XX,16] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> 
> The other nodes fail to connect to the node 125 
> [2017-10-10 00:08:42,318] WARN [ReplicaFetcherThread-0-125]: Error in fetch 
> to broker 125, request (type=FetchRequest, replicaId=101, maxWait=500, 
> minBytes=1, maxBytes=10485760, fetchData={XX-94=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-22=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-58=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-11=(offset=78932482, 
> logStartOffset=50881481, maxBytes=1048576), XX-55=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-19=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-91=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-5=(offset=903857106, 
> logStartOffset=0, maxBytes=1048576), XX-80=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-88=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-34=(offset=308, 
> logStartOffset=308, maxBytes=1048576), XX-7=(offset=369990, 
> logStartOffset=369990, maxBytes=1048576), XX-0=(offset=57965795, 
> logStartOffset=0, maxBytes=1048576)}) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 125 was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151)
> at 
> kafka.server.Abstr

[jira] [Commented] (KAFKA-6027) System test failure: LogDirFailureTest

2017-10-10 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198508#comment-16198508
 ] 

Dong Lin commented on KAFKA-6027:
-

[~ijuma] Yeah it is really convenient to setup system test using docker. It is 
much better than my previous experience with using virtual box. Thanks for the 
suggestion.

> System test failure: LogDirFailureTest
> --
>
> Key: KAFKA-6027
> URL: https://issues.apache.org/jira/browse/KAFKA-6027
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Dong Lin
>Priority: Blocker
> Fix For: 1.0.0
>
>
> LogDirFailureTest started failing a few days ago:
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-04--001.1507121832--apache--trunk--cbef33f/report.html
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-06--001.1507295254--apache--trunk--196bcfc/report.html
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-07--001.1507382831--apache--trunk--e2e8d4a/report.html
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-08--001.1507468285--apache--trunk--a1ea536/report.html
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-09--001.1507555111--apache--trunk--a1ea536/report.html
> Last good build:
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-02--001.1506949758--apache--trunk--4f4f995/report.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6043) Kafka 8.1.1 - .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) blocked

2017-10-10 Thread Ramkumar (JIRA)
Ramkumar created KAFKA-6043:
---

 Summary: Kafka 8.1.1 - 
.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) 
blocked
 Key: KAFKA-6043
 URL: https://issues.apache.org/jira/browse/KAFKA-6043
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8.1.1
Reporter: Ramkumar


we are using 3 node kafka cluster. Around this kafka cluster we have a RESTful 
service which provides http APIs for client. This service maintains the 
consumer connection in cache. And this cache is set to expire in 60 minutes 
after which , the consumer connection will get disconnected.
But we see this zookeeperconnection thread is blocked and the consumer object 
is still hanging in jvm. Can you pls let me know if there is any solution 
identified for this

Below is the output from thread dump when this occurred
pool-25100-thread-1" prio=10 tid=0x7f711804e820 nid=0x6726 waiting for 
monitor entry [0x7f6cd999b000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at 
kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
- waiting to lock <0x00076a922c40> (a java.lang.Object)
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
at 
com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:207)
at 
com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:201)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)

T--APPC-LCM-READ-E2E_T2_watcher_executor" prio=10 tid=0x7f7180198030 
nid=0x55a2 waiting on condition [0x7f6c9f4fa000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00076b2e1508> (a 
java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
at 
kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at 
kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
- locked <0x00076a922340> (a java.lang.Object)
at 
kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala
:524)
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:562)
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:457)
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:408)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:402)
- locked <0x00076a922c40> (a java.lang.Object)
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadP

[jira] [Created] (KAFKA-6044) Unable to start broker after ungraceful shutdown on windows

2017-10-10 Thread Niels Hoogeveen (JIRA)
Niels Hoogeveen created KAFKA-6044:
--

 Summary: Unable to start broker after ungraceful shutdown on 
windows
 Key: KAFKA-6044
 URL: https://issues.apache.org/jira/browse/KAFKA-6044
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.11.0.0
 Environment: Windows
Reporter: Niels Hoogeveen


When a broker is ungracefully shutdown on windows, it won't restart, and shows 
the following message:

[2017-10-10 14:41:20,196] INFO Loading logs. (kafka.log.LogManager)
[2017-10-10 14:41:20,244] WARN Found a corrupted index file due to requirement 
failed: Corrupt index found, index file (
c:\tmp\kafka-logs\test-0\.index) has non-zero size but the 
last offset is 0 which is no larger than
the base offset 0.}. deleting 
c:\tmp\kafka-logs\test-0\.timeindex, 
c:\tmp\kafka-logs\test-0\
.index, and c:\tmp\kafka-logs\test-0\.txnindex 
and rebuilding index... (kafka.log.Log)
[2017-10-10 14:41:20,245] ERROR There was an error in one of the threads during 
logs loading: java.nio.file.FileSystemEx
ception: c:\tmp\kafka-logs\test-0\.timeindex: The process 
cannot access the file because it is being
 used by another process.
 (kafka.log.LogManager)
[2017-10-10 14:41:20,246] FATAL [Kafka Server 0], Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.ser
ver.KafkaServer)
java.nio.file.FileSystemException: 
c:\tmp\kafka-logs\test-0\.timeindex: The process cannot 
access the file because it is being used by another process.

at 
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
at 
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at 
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
at 
sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269)
at 
sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:108)
at java.nio.file.Files.deleteIfExists(Files.java:1165)
at kafka.log.Log$$anonfun$loadSegmentFiles$3.apply(Log.scala:321)
at kafka.log.Log$$anonfun$loadSegmentFiles$3.apply(Log.scala:279)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.log.Log.loadSegmentFiles(Log.scala:279)
at kafka.log.Log.loadSegments(Log.scala:386)
at kafka.log.Log.(Log.scala:186)
at kafka.log.Log$.apply(Log.scala:1612)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$5$$anonfun$apply$12$$anonfun$apply$1.apply$mcV$sp(LogManage
r.scala:172)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

In kafka.log.Log.scala the offending code seems to be:

329  Files.deleteIfExists(timeIndexFile.toPath)
328  Files.delete(indexFile.toPath)

The files are still memory mapped and can therefore not be deleted.

When replacing these two lines with the following, recovery continues and 
everything seems to be working well:

if(timeIndexFileExists)
 timeIndexFile.delete()
indexFile.delete()

Kind regards,
Niels Hoogeveen




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6042) Kafka Request Handler deadlocks and brings down the cluster.

2017-10-10 Thread Ben Corlett (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198640#comment-16198640
 ] 

Ben Corlett commented on KAFKA-6042:


I've deployed a build of 0.11.0.1 with the commits of pull request 3956 to node 
125. Will let you know how I get on.

> Kafka Request Handler deadlocks and brings down the cluster.
> 
>
> Key: KAFKA-6042
> URL: https://issues.apache.org/jira/browse/KAFKA-6042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1
> Environment: kafka version: 0.11.0.1
> client versions: 0.8.2.1-0.10.2.1
> platform: aws (eu-west-1a)
> nodes: 36 x r4.xlarge
> disk storage: 2.5 tb per node (~73% usage per node)
> topics: 250
> number of partitions: 48k (approx)
> os: ubuntu 14.04
> jvm: Java(TM) SE Runtime Environment (build 1.8.0_131-b11), Java HotSpot(TM) 
> 64-Bit Server VM (build 25.131-b11, mixed mode)
>Reporter: Ben Corlett
>Priority: Critical
> Attachments: thread_dump.txt.gz
>
>
> We have been experiencing a deadlock that happens on a consistent server 
> within our cluster. This happens multiple times a week currently. It first 
> started happening when we upgraded to 0.11.0.0. Sadly 0.11.0.1 failed to 
> resolve the issue.
> Sequence of events:
> At a seemingly random time broker 125 goes into a deadlock. As soon as it is 
> deadlocked it will remove all the ISR's for any partition is its the leader 
> for.
> [2017-10-10 00:06:10,061] INFO Partition [XX,24] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,073] INFO Partition [XX,974] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,079] INFO Partition [XX,64] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,081] INFO Partition [XX,21] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,084] INFO Partition [XX,12] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,085] INFO Partition [XX,61] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,086] INFO Partition [XX,53] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,088] INFO Partition [XX,27] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,090] INFO Partition [XX,182] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,091] INFO Partition [XX,16] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> 
> The other nodes fail to connect to the node 125 
> [2017-10-10 00:08:42,318] WARN [ReplicaFetcherThread-0-125]: Error in fetch 
> to broker 125, request (type=FetchRequest, replicaId=101, maxWait=500, 
> minBytes=1, maxBytes=10485760, fetchData={XX-94=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-22=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-58=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-11=(offset=78932482, 
> logStartOffset=50881481, maxBytes=1048576), XX-55=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-19=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-91=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-5=(offset=903857106, 
> logStartOffset=0, maxBytes=1048576), XX-80=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-88=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-34=(offset=308, 
> logStartOffset=308, maxBytes=1048576), XX-7=(offset=369990, 
> logStartOffset=369990, maxBytes=1048576), XX-0=(offset=57965795, 
> logStartOffset=0, maxBytes=1048576)}) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 125 was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> As node 125 removed all the ISRs as it was locking up, a fai

[jira] [Created] (KAFKA-6045) All access to log should fail if log is closed

2017-10-10 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-6045:
---

 Summary: All access to log should fail if log is closed
 Key: KAFKA-6045
 URL: https://issues.apache.org/jira/browse/KAFKA-6045
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin


After log.close() or log.closeHandlers() is called for a given log, all uses of 
the Log's API should fail with proper exception. For example, 
log.appendAsLeader() should throw KafkaStorageException. APIs such as 
Log.activeProducersWithLastSequence() should also fail but not necessarily with 
KafkaStorageException, since the KafkaStorageException indicates failure to 
access disk.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4905) StreamPartitionAssignor doesn't respect subscriptions to assign partitions.

2017-10-10 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198776#comment-16198776
 ] 

Seweryn Habdank-Wojewodzki commented on KAFKA-4905:
---

[~fredriv] What exactly means: "Just needed to scale down old cluster 
completely"? How to do this in production without stopping other services based 
on Kafka? Thanks in advance.

> StreamPartitionAssignor doesn't respect subscriptions to assign partitions.
> ---
>
> Key: KAFKA-4905
> URL: https://issues.apache.org/jira/browse/KAFKA-4905
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>
> Both RangeAssignor and RoundRobinAssignor use the subscriptions to assign 
> partition to each consumer. This allow to have two consumers belonging to the 
> the same group and subscribing to two differents topics.
> This doesn't seem to be the case of the StreamPartitionAssignor resulting to 
> an IllegalArgumentException thrown during rebalance. 
> java.lang.IllegalArgumentException: Assigned partition foo-2 for 
> non-subscribed topic regex pattern; subscription pattern is bar
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:190)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:216)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> This is because the consumer group leader attempt to assign partitions to a 
> consumer that didn't subscribe to the associated topic.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-4905) StreamPartitionAssignor doesn't respect subscriptions to assign partitions.

2017-10-10 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198776#comment-16198776
 ] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-4905 at 10/10/17 2:53 PM:
-

@[~fredriv]: What exactly means: "Just needed to scale down old cluster 
completely"? How to do this in production without stopping other services based 
on Kafka? Thanks in advance.


was (Author: habdank):
[~fredriv] What exactly means: "Just needed to scale down old cluster 
completely"? How to do this in production without stopping other services based 
on Kafka? Thanks in advance.

> StreamPartitionAssignor doesn't respect subscriptions to assign partitions.
> ---
>
> Key: KAFKA-4905
> URL: https://issues.apache.org/jira/browse/KAFKA-4905
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>
> Both RangeAssignor and RoundRobinAssignor use the subscriptions to assign 
> partition to each consumer. This allow to have two consumers belonging to the 
> the same group and subscribing to two differents topics.
> This doesn't seem to be the case of the StreamPartitionAssignor resulting to 
> an IllegalArgumentException thrown during rebalance. 
> java.lang.IllegalArgumentException: Assigned partition foo-2 for 
> non-subscribed topic regex pattern; subscription pattern is bar
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:190)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:216)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> This is because the consumer group leader attempt to assign partitions to a 
> consumer that didn't subscribe to the associated topic.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2017-10-10 Thread Ramkumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198786#comment-16198786
 ] 

Ramkumar commented on KAFKA-1716:
-

Is any solution found on this?

> hang during shutdown of ZookeeperConsumerConnector
> --
>
> Key: KAFKA-1716
> URL: https://issues.apache.org/jira/browse/KAFKA-1716
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1
>Reporter: Sean Fay
>Assignee: Neha Narkhede
> Attachments: after-shutdown.log, before-shutdown.log, 
> kafka-shutdown-stuck.log
>
>
> It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
> wedge in the case that some consumer fetcher threads receive messages during 
> the shutdown process.
> Shutdown thread:
> {code}-- Parking to wait for: 
> java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
> at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at 
> scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
> at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
> at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
> at 
> scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
> at 
> kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at 
> kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
> at 
> kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> ConsumerFetcherThread:
> {code}-- Parking to wait for: 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> at 
> java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> at 
> kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
> at 
> scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> at kafka/utils/Utils$.inLock(Utils.scala:538)
> at 
> kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> at 
> kafka/server/AbstractFe

[jira] [Updated] (KAFKA-6045) All access to log should fail if log is closed

2017-10-10 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6045:
---
Fix Version/s: 1.1.0

> All access to log should fail if log is closed
> --
>
> Key: KAFKA-6045
> URL: https://issues.apache.org/jira/browse/KAFKA-6045
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
> Fix For: 1.1.0
>
>
> After log.close() or log.closeHandlers() is called for a given log, all uses 
> of the Log's API should fail with proper exception. For example, 
> log.appendAsLeader() should throw KafkaStorageException. APIs such as 
> Log.activeProducersWithLastSequence() should also fail but not necessarily 
> with KafkaStorageException, since the KafkaStorageException indicates failure 
> to access disk.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6027) System test failure: LogDirFailureTest

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198834#comment-16198834
 ] 

ASF GitHub Bot commented on KAFKA-6027:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4045


> System test failure: LogDirFailureTest
> --
>
> Key: KAFKA-6027
> URL: https://issues.apache.org/jira/browse/KAFKA-6027
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Dong Lin
>Priority: Blocker
> Fix For: 1.0.0
>
>
> LogDirFailureTest started failing a few days ago:
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-04--001.1507121832--apache--trunk--cbef33f/report.html
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-06--001.1507295254--apache--trunk--196bcfc/report.html
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-07--001.1507382831--apache--trunk--e2e8d4a/report.html
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-08--001.1507468285--apache--trunk--a1ea536/report.html
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-09--001.1507555111--apache--trunk--a1ea536/report.html
> Last good build:
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-10-02--001.1506949758--apache--trunk--4f4f995/report.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-5140) Flaky ResetIntegrationTest

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-5140:
--
  Assignee: Guozhang Wang  (was: Matthias J. Sax)

Looking into this issue.

> Flaky ResetIntegrationTest
> --
>
> Key: KAFKA-5140
> URL: https://issues.apache.org/jira/browse/KAFKA-5140
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
> Fix For: 0.11.0.0
>
>
> {noformat}
> org.apache.kafka.streams.integration.ResetIntegrationTest > 
> testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
> java.lang.AssertionError: 
> Expected: <[KeyValue(2986681642095, 1), KeyValue(2986681642055, 1), 
> KeyValue(2986681642075, 1), KeyValue(2986681642035, 1), 
> KeyValue(2986681642095, 1), KeyValue(2986681642055, 1), 
> KeyValue(2986681642115, 1), KeyValue(2986681642075, 1), 
> KeyValue(2986681642075, 2), KeyValue(2986681642095, 2), 
> KeyValue(2986681642115, 1), KeyValue(2986681642135, 1), 
> KeyValue(2986681642095, 2), KeyValue(2986681642115, 2), 
> KeyValue(2986681642155, 1), KeyValue(2986681642135, 1), 
> KeyValue(2986681642115, 2), KeyValue(2986681642135, 2), 
> KeyValue(2986681642155, 1), KeyValue(2986681642175, 1), 
> KeyValue(2986681642135, 2), KeyValue(2986681642155, 2), 
> KeyValue(2986681642175, 1), KeyValue(2986681642195, 1), 
> KeyValue(2986681642135, 3), KeyValue(2986681642155, 2), 
> KeyValue(2986681642175, 2), KeyValue(2986681642195, 1), 
> KeyValue(2986681642155, 3), KeyValue(2986681642175, 2), 
> KeyValue(2986681642195, 2), KeyValue(2986681642155, 3), 
> KeyValue(2986681642175, 3), KeyValue(2986681642195, 2), 
> KeyValue(2986681642155, 4), KeyValue(2986681642175, 3), 
> KeyValue(2986681642195, 3)]>
>  but: was <[KeyValue(2986681642095, 1), KeyValue(2986681642055, 1), 
> KeyValue(2986681642075, 1), KeyValue(2986681642035, 1), 
> KeyValue(2986681642095, 1), KeyValue(2986681642055, 1), 
> KeyValue(2986681642115, 1), KeyValue(2986681642075, 1), 
> KeyValue(2986681642075, 2), KeyValue(2986681642095, 2), 
> KeyValue(2986681642115, 1), KeyValue(2986681642135, 1), 
> KeyValue(2986681642095, 2), KeyValue(2986681642115, 2), 
> KeyValue(2986681642155, 1), KeyValue(2986681642135, 1), 
> KeyValue(2986681642115, 2), KeyValue(2986681642135, 2), 
> KeyValue(2986681642155, 1), KeyValue(2986681642175, 1), 
> KeyValue(2986681642135, 2), KeyValue(2986681642155, 2), 
> KeyValue(2986681642175, 1), KeyValue(2986681642195, 1), 
> KeyValue(2986681642135, 3), KeyValue(2986681642155, 2), 
> KeyValue(2986681642175, 2), KeyValue(2986681642195, 1), 
> KeyValue(2986681642155, 3), KeyValue(2986681642175, 2), 
> KeyValue(2986681642195, 2), KeyValue(2986681642155, 3)]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:190)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6046) DeleteRecordsRequest to a non-leader

2017-10-10 Thread Tom Bentley (JIRA)
Tom Bentley created KAFKA-6046:
--

 Summary: DeleteRecordsRequest to a non-leader
 Key: KAFKA-6046
 URL: https://issues.apache.org/jira/browse/KAFKA-6046
 Project: Kafka
  Issue Type: Bug
Reporter: Tom Bentley
 Fix For: 1.1.0


When a `DeleteRecordsRequest` is sent to a broker that's not the leader for the 
partition the  `DeleteRecordsResponse` returns `UNKNOWN_TOPIC_OR_PARTITION`. 
This is ambiguous (does the topic not exist on any broker, or did we just sent 
the request to the wrong broker?), and inconsistent (a `ProduceRequest` would 
return `NOT_LEADER_FOR_PARTITION`).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6046) DeleteRecordsRequest to a non-leader

2017-10-10 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198932#comment-16198932
 ] 

Ted Yu commented on KAFKA-6046:
---

Looking at core/src/main/scala/kafka/server/DelayedDeleteRecords.scala :
{code}
  case Some(partition) =>
if (partition eq ReplicaManager.OfflinePartition) {
  (false, Errors.KAFKA_STORAGE_ERROR, 
DeleteRecordsResponse.INVALID_LOW_WATERMARK)
} else {
  partition.leaderReplicaIfLocal match {
case Some(_) =>
  val leaderLW = partition.lowWatermarkIfLeader
  (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
case None =>
  (false, Errors.NOT_LEADER_FOR_PARTITION, 
DeleteRecordsResponse.INVALID_LOW_WATERMARK)
  }
}
{code}
It seems NOT_LEADER_FOR_PARTITION is covered.

> DeleteRecordsRequest to a non-leader
> 
>
> Key: KAFKA-6046
> URL: https://issues.apache.org/jira/browse/KAFKA-6046
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tom Bentley
> Fix For: 1.1.0
>
>
> When a `DeleteRecordsRequest` is sent to a broker that's not the leader for 
> the partition the  `DeleteRecordsResponse` returns 
> `UNKNOWN_TOPIC_OR_PARTITION`. This is ambiguous (does the topic not exist on 
> any broker, or did we just sent the request to the wrong broker?), and 
> inconsistent (a `ProduceRequest` would return `NOT_LEADER_FOR_PARTITION`).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6043) Kafka 8.1.1 - .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) blocked

2017-10-10 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198944#comment-16198944
 ] 

Ted Yu commented on KAFKA-6043:
---

0.8.1.1 was very old.

Are you able to try newer release ?

> Kafka 8.1.1 - 
> .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) 
> blocked
> 
>
> Key: KAFKA-6043
> URL: https://issues.apache.org/jira/browse/KAFKA-6043
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.1.1
>Reporter: Ramkumar
>
> we are using 3 node kafka cluster. Around this kafka cluster we have a 
> RESTful service which provides http APIs for client. This service maintains 
> the consumer connection in cache. And this cache is set to expire in 60 
> minutes after which , the consumer connection will get disconnected.
> But we see this zookeeperconnection thread is blocked and the consumer object 
> is still hanging in jvm. Can you pls let me know if there is any solution 
> identified for this
> Below is the output from thread dump when this occurred
> pool-25100-thread-1" prio=10 tid=0x7f711804e820 nid=0x6726 waiting for 
> monitor entry [0x7f6cd999b000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> - waiting to lock <0x00076a922c40> (a java.lang.Object)
> at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:207)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:201)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> T--APPC-LCM-READ-E2E_T2_watcher_executor" prio=10 tid=0x7f7180198030 
> nid=0x55a2 waiting on condition [0x7f6c9f4fa000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00076b2e1508> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
> at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> - locked <0x00076a922340> (a java.lang.Object)
> at 
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala
> :524)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:562)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:457)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:408)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>  

[jira] [Commented] (KAFKA-6046) DeleteRecordsRequest to a non-leader

2017-10-10 Thread Tom Bentley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198963#comment-16198963
 ] 

Tom Bentley commented on KAFKA-6046:


It _would be_ covered if execution ever got this far. In fact 
{{ReplicaManager.deleteRecordsOnLocalLog()}} has already thrown 
{{UnknownTopicOrPartitionException}}.

> DeleteRecordsRequest to a non-leader
> 
>
> Key: KAFKA-6046
> URL: https://issues.apache.org/jira/browse/KAFKA-6046
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tom Bentley
> Fix For: 1.1.0
>
>
> When a `DeleteRecordsRequest` is sent to a broker that's not the leader for 
> the partition the  `DeleteRecordsResponse` returns 
> `UNKNOWN_TOPIC_OR_PARTITION`. This is ambiguous (does the topic not exist on 
> any broker, or did we just sent the request to the wrong broker?), and 
> inconsistent (a `ProduceRequest` would return `NOT_LEADER_FOR_PARTITION`).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6043) Kafka 8.1.1 - .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) blocked

2017-10-10 Thread Ramkumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198967#comment-16198967
 ] 

Ramkumar commented on KAFKA-6043:
-

Thanks you for your reply, we found this issue in production in high profile 
env . So validating if there is any configuration fix or patch will resolve 
this for a severity issue .

> Kafka 8.1.1 - 
> .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) 
> blocked
> 
>
> Key: KAFKA-6043
> URL: https://issues.apache.org/jira/browse/KAFKA-6043
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.1.1
>Reporter: Ramkumar
>
> we are using 3 node kafka cluster. Around this kafka cluster we have a 
> RESTful service which provides http APIs for client. This service maintains 
> the consumer connection in cache. And this cache is set to expire in 60 
> minutes after which , the consumer connection will get disconnected.
> But we see this zookeeperconnection thread is blocked and the consumer object 
> is still hanging in jvm. Can you pls let me know if there is any solution 
> identified for this
> Below is the output from thread dump when this occurred
> pool-25100-thread-1" prio=10 tid=0x7f711804e820 nid=0x6726 waiting for 
> monitor entry [0x7f6cd999b000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> - waiting to lock <0x00076a922c40> (a java.lang.Object)
> at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:207)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:201)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> T--APPC-LCM-READ-E2E_T2_watcher_executor" prio=10 tid=0x7f7180198030 
> nid=0x55a2 waiting on condition [0x7f6c9f4fa000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00076b2e1508> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
> at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> - locked <0x00076a922340> (a java.lang.Object)
> at 
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala
> :524)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:562)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:457)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV

[jira] [Commented] (KAFKA-6046) DeleteRecordsRequest to a non-leader

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199004#comment-16199004
 ] 

ASF GitHub Bot commented on KAFKA-6046:
---

GitHub user tedyu opened a pull request:

https://github.com/apache/kafka/pull/4052

KAFKA-6046 DeleteRecordsRequest to a non-leader should give proper error



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tedyu/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4052.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4052


commit 4f25a3e9ef5cc079ca1ea549787d75f7ca6f8940
Author: tedyu 
Date:   2017-10-10T17:17:13Z

KAFKA-6046 DeleteRecordsRequest to a non-leader should give proper error




> DeleteRecordsRequest to a non-leader
> 
>
> Key: KAFKA-6046
> URL: https://issues.apache.org/jira/browse/KAFKA-6046
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tom Bentley
> Fix For: 1.1.0
>
>
> When a `DeleteRecordsRequest` is sent to a broker that's not the leader for 
> the partition the  `DeleteRecordsResponse` returns 
> `UNKNOWN_TOPIC_OR_PARTITION`. This is ambiguous (does the topic not exist on 
> any broker, or did we just sent the request to the wrong broker?), and 
> inconsistent (a `ProduceRequest` would return `NOT_LEADER_FOR_PARTITION`).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6013) Controller getting stuck

2017-10-10 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199031#comment-16199031
 ] 

Ivan Babrou commented on KAFKA-6013:


I restarted one node again today and controller forgot to pick it up after 
restart. I had to restart controller nodes twice, because each new controller 
also managed to forget about previous controller node I restarted. On the third 
node everything finally came to order. All these machines were up for ~5d.

One observation that I don't understand: 
kafka.controller:type=KafkaController,name=ControllerState reports state 9 (ISR 
change) all the time. Shouldn't it be 0 (idle) most of the time?

Stuck controllers have this stack for controller thread:

{noformat}
Oct 10 17:05:17 mybroker70 kafka[37433]: "controller-event-thread" #77 prio=5 
os_prio=0 tid=0x7f5cda487800 nid=0x963f in Object.wait() 
[0x7f5aaeced000]
Oct 10 17:05:17 mybroker70 kafka[37433]:java.lang.Thread.State: WAITING (on 
object monitor)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
java.lang.Object.wait(Native Method)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
java.lang.Object.wait(Object.java:502)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1406)
Oct 10 17:05:17 mybroker70 kafka[37433]: - locked <0x0007b2e00540> 
(a org.apache.zookeeper.ClientCnxn$Packet)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1210)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1241)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:125)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.I0Itec.zkclient.ZkClient$12.call(ZkClient.java:1104)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.I0Itec.zkclient.ZkClient$12.call(ZkClient.java:1100)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:991)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1100)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1095)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.utils.ZkUtils.readDataMaybeNull(ZkUtils.scala:660)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.controller.KafkaController$IsrChangeNotification.getTopicAndPartition(KafkaController.scala:1329)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.controller.KafkaController$IsrChangeNotification.$anonfun$process$26(KafkaController.scala:1310)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.controller.KafkaController$IsrChangeNotification$$Lambda$1253/1422719045.apply(Unknown
 Source)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:241)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.TraversableLike$$Lambda$391/1306246648.apply(Unknown Source)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.Iterator.foreach(Iterator.scala:929)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.Iterator.foreach$(Iterator.scala:929)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.IterableLike.foreach(IterableLike.scala:71)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.IterableLike.foreach$(IterableLike.scala:70)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.TraversableLike.flatMap(TraversableLike.scala:241)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.TraversableLike.flatMap$(TraversableLike.scala:238)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.controller.KafkaController$IsrChangeNotification.process(KafkaController.scala:1310)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:50)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.controller.ControllerEventManager$ControllerEventThread$$Lambda$395/1856206530.apply$mcV$sp(Unknown
 Source)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
Oc

[jira] [Updated] (KAFKA-6031) Expose standby replicas endpoints in StreamsMetadata

2017-10-10 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6031:
---
Labels: needs-kip  (was: )

> Expose standby replicas endpoints in StreamsMetadata
> 
>
> Key: KAFKA-6031
> URL: https://issues.apache.org/jira/browse/KAFKA-6031
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Stanislav Chizhov
>  Labels: needs-kip
>
> Currently reads for a key are served by single replica, which has 2 drawbacks:
> - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
> - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6016) Use the idempotent producer in the reassign_partitions_test

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199049#comment-16199049
 ] 

ASF GitHub Bot commented on KAFKA-6016:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4029


> Use the idempotent producer in the reassign_partitions_test
> ---
>
> Key: KAFKA-6016
> URL: https://issues.apache.org/jira/browse/KAFKA-6016
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
> Fix For: 1.1.0
>
>
> Currently, the reassign partitions test doesn't use the idempotent producer. 
> This means that bugs like KAFKA-6003 have gone unnoticed. We should update 
> the test to use the idempotent producer and recreate that bug on a regular 
> basis so that we are fully testing all code paths.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2017-10-10 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199106#comment-16199106
 ] 

Matthias J. Sax commented on KAFKA-3758:


> I am running the streams instances on the same machines where the Kafka 
> brokers are present

This is not recommended btw.

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Eno Thereska
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.jav

[jira] [Commented] (KAFKA-6013) Controller getting stuck

2017-10-10 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199121#comment-16199121
 ] 

Ivan Babrou commented on KAFKA-6013:


I forced preferred leader election and current controller switched 9 -> 7 -> 0.

> Controller getting stuck
> 
>
> Key: KAFKA-6013
> URL: https://issues.apache.org/jira/browse/KAFKA-6013
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Ivan Babrou
>  Labels: reliability
>
> It looks like a new issue in 0.11.0.0 and 0.11.0.1 still has it.
> We upgraded one of the clusters from 0.11.0.0 to 0.11.0.1 by shutting down 28 
> machines at once (single rack). When nodes came up none of them progressed 
> after these log lines:
> {noformat}
> Oct 05 02:17:42 mybroker14 kafka[32940]: INFO Kafka version : 0.11.0.1 
> (org.apache.kafka.common.utils.AppInfoParser)
> Oct 05 02:17:42 mybroker14 kafka[32940]: INFO Kafka commitId : 
> c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser)
> Oct 05 02:17:42 mybroker14 kafka[32940]: INFO [Kafka Server 10014], started 
> (kafka.server.KafkaServer)
> {noformat}
> There was no indication in controller node logs that it picked up rebooted 
> nodes. This happened multiple times during the upgrade: once per rack plus 
> some on top of that.
> Reboot took ~20m, all nodes in a single rack rebooted in parallel.
> The fix was to restart controller node, but that did not go cleanly too:
> {noformat}
> ivan@mybroker26:~$ sudo journalctl --since 01:00 -u kafka | fgrep 'Error 
> during controlled shutdown' -A1
> Oct 05 01:57:41 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
> during controlled shutdown, possibly because leader movement took longer than 
> the configured controller.socket.timeout.ms and/or request.timeout.ms: 
> Connection to 10026 was disconnected before the response was read 
> (kafka.server.KafkaServer)
> Oct 05 01:57:46 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Retrying 
> controlled shutdown after the previous attempt failed... 
> (kafka.server.KafkaServer)
> --
> Oct 05 01:58:16 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
> during controlled shutdown, possibly because leader movement took longer than 
> the configured controller.socket.timeout.ms and/or request.timeout.ms: 
> Connection to 10026 was disconnected before the response was read 
> (kafka.server.KafkaServer)
> Oct 05 01:58:18 mybroker26 kafka[37409]: INFO Rolled new log segment for 
> 'requests-40' in 3 ms. (kafka.log.Log)
> --
> Oct 05 01:58:51 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
> during controlled shutdown, possibly because leader movement took longer than 
> the configured controller.socket.timeout.ms and/or request.timeout.ms: 
> Connection to 10026 was disconnected before the response was read 
> (kafka.server.KafkaServer)
> Oct 05 01:58:56 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Retrying 
> controlled shutdown after the previous attempt failed... 
> (kafka.server.KafkaServer)
> {noformat}
> I'm unable to reproduce the issue by just restarting or even rebooting one 
> broker, controller picks it up:
> {noformat}
> Oct 05 03:18:18 mybroker83 kafka[37402]: INFO [Controller 10083]: Newly added 
> brokers: 10001, deleted brokers: , all live brokers: ...
> {noformat}
> KAFKA-5028 happened in 0.11.0.0, so it's likely related.
> cc [~ijuma]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6023) ThreadCache#sizeBytes() should check overflow

2017-10-10 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6023:
---
Component/s: streams

> ThreadCache#sizeBytes() should check overflow
> -
>
> Key: KAFKA-6023
> URL: https://issues.apache.org/jira/browse/KAFKA-6023
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ted Yu
>Assignee: siva santhalingam
>Priority: Minor
>
> {code}
> long sizeBytes() {
> long sizeInBytes = 0;
> for (final NamedCache namedCache : caches.values()) {
> sizeInBytes += namedCache.sizeInBytes();
> }
> return sizeInBytes;
> }
> {code}
> The summation w.r.t. sizeInBytes may overflow.
> Check similar to what is done in size() should be performed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-10 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-6007:
-
Component/s: KafkaConnect

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.1
>Reporter: Stephane Maarek
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6047) Allow retries configuration for InternalTopicManager

2017-10-10 Thread Dmitry Vsekhvalnov (JIRA)
Dmitry Vsekhvalnov created KAFKA-6047:
-

 Summary: Allow retries configuration for InternalTopicManager
 Key: KAFKA-6047
 URL: https://issues.apache.org/jira/browse/KAFKA-6047
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.11.0.0
Reporter: Dmitry Vsekhvalnov
Priority: Minor


There is hardcoded number of retries when kafka-streams attempts to create 
internal topics.
*InternalTopicManager.MAX_TOPIC_READY_TRY*

Which is not resilient in some scenarios. Consider setup where replication 
factor for internal streams topics == number of brokers. (RF=3 and x3 kafka 
brokers). When any of brokers dies kafka-streams can shutdown before broker is 
resurrected with approximate log:

{code}
[WARN ] [org.apache.kafka.streams.processor.internals.InternalTopicManager] 
[Could not create internal topics: Found only 2 brokers,  but replication 
factor is 3. Decrease replication factor for internal topics via StreamsConfig 
parameter "replication.factor" or add more brokers to your cluster. Retry #2]
[WARN ] [org.apache.kafka.streams.processor.internals.InternalTopicManager] 
[Could not create internal topics: Found only 2 brokers,  but replication 
factor is 3. Decrease replication factor for internal topics via StreamsConfig 
parameter "replication.factor" or add more brokers to your cluster. Retry #3]
[WARN ] [org.apache.kafka.streams.processor.internals.InternalTopicManager] 
[Could not create internal topics: Found only 2 brokers,  but replication 
factor is 3. Decrease replication factor for internal topics via StreamsConfig 
parameter "replication.factor" or add more brokers to your cluster. Retry #4]
[INFO ] [org.apache.kafka.streams.processor.internals.StreamThread] 
[stream-thread [Shutting down]
{code}

Would be nice if kafka-streams provides configuration for InternalTopicManager 
retries and ideally for retry backoff strategy. To have possibility for 
resilience tuning. Possibly can re-use corresponding producer configuration. 





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-10 Thread Konstantine Karantasis (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-6007:
--
Priority: Blocker  (was: Major)

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.1
>Reporter: Stephane Maarek
>Priority: Blocker
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-10 Thread Konstantine Karantasis (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-6007:
--
Fix Version/s: 1.0.0
   0.11.0.1

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Stephane Maarek
>Priority: Blocker
> Fix For: 0.11.0.1, 1.0.0
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-10 Thread Konstantine Karantasis (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-6007:
--
Affects Version/s: (was: 0.11.0.1)
   0.11.0.0

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Stephane Maarek
>Priority: Blocker
> Fix For: 0.11.0.1, 1.0.0
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-10 Thread Konstantine Karantasis (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis reassigned KAFKA-6007:
-

Assignee: Konstantine Karantasis

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Stephane Maarek
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 0.11.0.1, 1.0.0
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-10 Thread Konstantine Karantasis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199252#comment-16199252
 ] 

Konstantine Karantasis commented on KAFKA-6007:
---

My explanation so far is as follows: 

Normally, when a connector's classloader can't find another plugin within its 
path (the classloader's set of urls), it will delegate class loading to its 
parent. 

This works for all the plugin classloaders that are created for plugins 
residing in the {{plugin.path}}. Upon such delegation, the delegating 
classloader, which is the parent of every plugin classloader, will be able to 
elect the appropriate plugin classloader for the plugin that is nested in the 
connector's configuration. 

However, if the connector is residing in the system's classpath and not in any 
of the {{plugin.path}} locations, this means that its classloader is the system 
classloader. This classloader's parent cannot be overridden and set to be the 
delegating classloader. Thus, when a connector loaded from the system 
classpath, requires a transform or other plugin, this plugin needs also to 
reside in the system classpath (as opposed to the {{plugin.path}}). 

The solution would be to move all the plugins with interdependencies to each 
other, either to the {{plugin.path}} altogether or the system classpath again 
as a whole set. 

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Stephane Maarek
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 0.11.0.1, 1.0.0
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6043) Kafka 8.1.1 - .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) blocked

2017-10-10 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199291#comment-16199291
 ] 

Ted Yu commented on KAFKA-6043:
---

bq. locked <0x00076a922c40> (a java.lang.Object)

This is the rebalanceLock

bq. locked <0x00076a922340> (a java.lang.Object)

This is the mapLock in AbstractFetcherManager

> Kafka 8.1.1 - 
> .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) 
> blocked
> 
>
> Key: KAFKA-6043
> URL: https://issues.apache.org/jira/browse/KAFKA-6043
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.1.1
>Reporter: Ramkumar
>
> we are using 3 node kafka cluster. Around this kafka cluster we have a 
> RESTful service which provides http APIs for client. This service maintains 
> the consumer connection in cache. And this cache is set to expire in 60 
> minutes after which , the consumer connection will get disconnected.
> But we see this zookeeperconnection thread is blocked and the consumer object 
> is still hanging in jvm. Can you pls let me know if there is any solution 
> identified for this
> Below is the output from thread dump when this occurred
> pool-25100-thread-1" prio=10 tid=0x7f711804e820 nid=0x6726 waiting for 
> monitor entry [0x7f6cd999b000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> - waiting to lock <0x00076a922c40> (a java.lang.Object)
> at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:207)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:201)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> T--APPC-LCM-READ-E2E_T2_watcher_executor" prio=10 tid=0x7f7180198030 
> nid=0x55a2 waiting on condition [0x7f6c9f4fa000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00076b2e1508> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
> at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> - locked <0x00076a922340> (a java.lang.Object)
> at 
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala
> :524)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:562)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:457)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Z

[jira] [Commented] (KAFKA-6043) Kafka 8.1.1 - .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) blocked

2017-10-10 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199292#comment-16199292
 ] 

Ted Yu commented on KAFKA-6043:
---

Did you see who is holding 0x00076b2e1508 ?

> Kafka 8.1.1 - 
> .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) 
> blocked
> 
>
> Key: KAFKA-6043
> URL: https://issues.apache.org/jira/browse/KAFKA-6043
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.1.1
>Reporter: Ramkumar
>
> we are using 3 node kafka cluster. Around this kafka cluster we have a 
> RESTful service which provides http APIs for client. This service maintains 
> the consumer connection in cache. And this cache is set to expire in 60 
> minutes after which , the consumer connection will get disconnected.
> But we see this zookeeperconnection thread is blocked and the consumer object 
> is still hanging in jvm. Can you pls let me know if there is any solution 
> identified for this
> Below is the output from thread dump when this occurred
> pool-25100-thread-1" prio=10 tid=0x7f711804e820 nid=0x6726 waiting for 
> monitor entry [0x7f6cd999b000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> - waiting to lock <0x00076a922c40> (a java.lang.Object)
> at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:207)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:201)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> T--APPC-LCM-READ-E2E_T2_watcher_executor" prio=10 tid=0x7f7180198030 
> nid=0x55a2 waiting on condition [0x7f6c9f4fa000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00076b2e1508> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
> at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> - locked <0x00076a922340> (a java.lang.Object)
> at 
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala
> :524)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:562)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:457)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:408)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at 
> ka

[jira] [Updated] (KAFKA-6043) Kafka 8.1.1 - .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) blocked

2017-10-10 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6043:
--
Attachment: 6043.v1

> Kafka 8.1.1 - 
> .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) 
> blocked
> 
>
> Key: KAFKA-6043
> URL: https://issues.apache.org/jira/browse/KAFKA-6043
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.1.1
>Reporter: Ramkumar
> Attachments: 6043.v1
>
>
> we are using 3 node kafka cluster. Around this kafka cluster we have a 
> RESTful service which provides http APIs for client. This service maintains 
> the consumer connection in cache. And this cache is set to expire in 60 
> minutes after which , the consumer connection will get disconnected.
> But we see this zookeeperconnection thread is blocked and the consumer object 
> is still hanging in jvm. Can you pls let me know if there is any solution 
> identified for this
> Below is the output from thread dump when this occurred
> pool-25100-thread-1" prio=10 tid=0x7f711804e820 nid=0x6726 waiting for 
> monitor entry [0x7f6cd999b000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> - waiting to lock <0x00076a922c40> (a java.lang.Object)
> at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:207)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:201)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> T--APPC-LCM-READ-E2E_T2_watcher_executor" prio=10 tid=0x7f7180198030 
> nid=0x55a2 waiting on condition [0x7f6c9f4fa000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00076b2e1508> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
> at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> - locked <0x00076a922340> (a java.lang.Object)
> at 
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala
> :524)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:562)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:457)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:408)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalan

[jira] [Commented] (KAFKA-6029) Controller should wait for the leader migration to finish before ack a ControlledShutdownRequest

2017-10-10 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199314#comment-16199314
 ] 

Jun Rao commented on KAFKA-6029:


One of the issues that can lead to this is that the follower and the leader may 
receive LeaderAndIsrRequests at different times. So, if the leader receives a 
LeaderAndIsrRequest with reduced ISR due to controlled shutdown of the 
follower, but the follower continues to fetch (since it hasn't received the 
LeaderAndIsrRequest yet), the follower will by added back to ISR. Then, when 
the follower shuts down, we have to wait for replica.lag.time.max.ms for the 
follower to be dropped out ISR. 

Onur and I discussed this a bit. One way to improve this is for the 
LeaderAndIsrRequest to indicate that a replica is about to go down such that 
the leader doesn't add it back to ISR. That indication could be piggy-backed on 
a broker epoch, which is needed in 
https://issues.apache.org/jira/browse/KAFKA-1120.

> Controller should wait for the leader migration to finish before ack a 
> ControlledShutdownRequest
> 
>
> Key: KAFKA-6029
> URL: https://issues.apache.org/jira/browse/KAFKA-6029
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
> Fix For: 1.1.0
>
>
> In the controlled shutdown process, the controller will return the 
> ControlledShutdownResponse immediately after the state machine is updated. 
> Because the LeaderAndIsrRequests and UpdateMetadataRequests may not have been 
> successfully processed by the brokers, the leader migration and active ISR 
> shrink may not have done when the shutting down broker proceeds to shut down. 
> This will cause some of the leaders to take up to replica.lag.time.max.ms to 
> kick the broker out of ISR. Meanwhile the produce purgatory size will grow.
> Ideally, the controller should wait until all the LeaderAndIsrRequests and 
> UpdateMetadataRequests has been acked before sending back the 
> ControlledShutdownResponse.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6028) Improve the quota throttle communication.

2017-10-10 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199316#comment-16199316
 ] 

Jun Rao commented on KAFKA-6028:


Another way to address this is what's described in 
https://issues.apache.org/jira/browse/KAFKA-5871. We can bound the delay up to 
the length of a single window on the server side.

> Improve the quota throttle communication.
> -
>
> Key: KAFKA-6028
> URL: https://issues.apache.org/jira/browse/KAFKA-6028
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 1.1.0
>
>
> Currently if a client is throttled duet to quota violation, the broker will 
> only send back a response to the clients after the throttle time has passed. 
> In this case, the clients don't know how long the response will be throttled 
> and might hit request timeout before the response is returned. As a result 
> the clients will retry sending a request and results a even longer throttle 
> time.
> The above scenario could happen when a large clients group sending records to 
> the brokers. We saw this when a MapReduce job pushes data to the Kafka 
> cluster.
> To improve this, the broker can return the response with throttle time 
> immediately after processing the requests. After that, the broker will mute 
> the channel for this client. A correct client implementation should back off 
> for that long before sending the next request. If the client ignored the 
> throttle time and send the next request immediately, the channel will be 
> muted and the request won't be processed until the throttle time has passed.
> A KIP will follow with more details.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-10 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199387#comment-16199387
 ] 

Stephane Maarek commented on KAFKA-6007:


I agree. You are correct in your assessment.
Overall this seems like a user error, and the error message is erroneous. It 
leads to believe there's a problem with connect itself whereas it's a misusage 
of the framework. 

I think if you can change the code so that the error messages are more explicit 
and context accurate, this would give enough feedback to the user to fix the 
issues and place the connectors where appropriate. 

This kind of error message is cool:
“we loaded this connector from the plugins, and your transformer is not in the 
plugins”
“we loaded this connector from the cp, and your transformer is not in the cp”

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Stephane Maarek
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 0.11.0.1, 1.0.0
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-10 Thread Konstantine Karantasis (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-6007:
--
Fix Version/s: (was: 0.11.0.1)
   (was: 1.0.0)
   1.0.1
   0.10.0.2

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Stephane Maarek
>Assignee: Konstantine Karantasis
> Fix For: 0.10.0.2, 1.0.1
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-10 Thread Konstantine Karantasis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199399#comment-16199399
 ] 

Konstantine Karantasis commented on KAFKA-6007:
---

I agree. 

Thanks [~stephane.maa...@gmail.com] for the feedback!

I'm going to downgrade the severity of the ticket at this point, and will focus 
on better error messaging shortly, targeting one of the next releases instead 
of the forthcoming. 

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Stephane Maarek
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 0.10.0.2, 1.0.1
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-10 Thread Konstantine Karantasis (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-6007:
--
Priority: Major  (was: Blocker)

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Stephane Maarek
>Assignee: Konstantine Karantasis
> Fix For: 0.10.0.2, 1.0.1
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-10 Thread Konstantine Karantasis (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-6007:
--
Affects Version/s: 1.0.0
   0.11.0.1

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0
>Reporter: Stephane Maarek
>Assignee: Konstantine Karantasis
> Fix For: 0.10.0.2, 1.0.1
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6025) there is something wrong in kafka stream document

2017-10-10 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-6025:
--

Assignee: Bill Bejeck

> there is something wrong in kafka stream document
> -
>
> Key: KAFKA-6025
> URL: https://issues.apache.org/jira/browse/KAFKA-6025
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: laomei
>Assignee: Bill Bejeck
>Priority: Trivial
>  Labels: beginner, newbie
> Attachments: pic.PNG
>
>
> I'm new to kafka stream. When I was reading kafka stream document, I found 
> there is something wrong. 
> !pic.PNG|thumbnail!
> In my upload picture, the second "builder" should be "source". 
> location: 
> [https://kafka.apache.org/0110/documentation/streams/tutorial#tutorial_code_linesplit]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5212) Consumer ListOffsets request can starve group heartbeats

2017-10-10 Thread Richard Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu reassigned KAFKA-5212:
-

Assignee: Richard Yu

> Consumer ListOffsets request can starve group heartbeats
> 
>
> Key: KAFKA-5212
> URL: https://issues.apache.org/jira/browse/KAFKA-5212
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Richard Yu
> Fix For: 1.1.0, 1.0.1
>
>
> The consumer is not able to send heartbeats while it is awaiting a 
> ListOffsets response. Typically this is not a problem because ListOffsets 
> requests are handled quickly, but in the worst case if the request takes 
> longer than the session timeout, the consumer will fall out of the group.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6025) there is something wrong in kafka stream document

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199454#comment-16199454
 ] 

ASF GitHub Bot commented on KAFKA-6025:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4053


> there is something wrong in kafka stream document
> -
>
> Key: KAFKA-6025
> URL: https://issues.apache.org/jira/browse/KAFKA-6025
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: laomei
>Assignee: Bill Bejeck
>Priority: Trivial
>  Labels: beginner, newbie
> Fix For: 1.0.0
>
> Attachments: pic.PNG
>
>
> I'm new to kafka stream. When I was reading kafka stream document, I found 
> there is something wrong. 
> !pic.PNG|thumbnail!
> In my upload picture, the second "builder" should be "source". 
> location: 
> [https://kafka.apache.org/0110/documentation/streams/tutorial#tutorial_code_linesplit]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5541) Streams should not re-throw if suspending/closing tasks fails

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199466#comment-16199466
 ] 

ASF GitHub Bot commented on KAFKA-5541:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4046


> Streams should not re-throw if suspending/closing tasks fails
> -
>
> Key: KAFKA-5541
> URL: https://issues.apache.org/jira/browse/KAFKA-5541
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 1.1.0
>
>
> Currently, if Stream suspends a task on rebalance or closes a suspended task 
> that got revoked, it re-throws any exception that might occur and the thread 
> dies. However, this in not really necessary as the task was suspended/closed 
> anyway and we can just clean up the task and carry on with the rebalance.
> (cf comments https://github.com/apache/kafka/pull/3449#discussion_r124437816)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6025) There is something wrong in Kafka Streams document

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6025:
-
Summary: There is something wrong in Kafka Streams document  (was: there is 
something wrong in kafka stream document)

> There is something wrong in Kafka Streams document
> --
>
> Key: KAFKA-6025
> URL: https://issues.apache.org/jira/browse/KAFKA-6025
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: laomei
>Assignee: Bill Bejeck
>Priority: Trivial
>  Labels: beginner, newbie
> Fix For: 1.0.0
>
> Attachments: pic.PNG
>
>
> I'm new to kafka stream. When I was reading kafka stream document, I found 
> there is something wrong. 
> !pic.PNG|thumbnail!
> In my upload picture, the second "builder" should be "source". 
> location: 
> [https://kafka.apache.org/0110/documentation/streams/tutorial#tutorial_code_linesplit]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-10 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6007:
-
Fix Version/s: (was: 0.10.0.2)
   0.11.0.2

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0
>Reporter: Stephane Maarek
>Assignee: Konstantine Karantasis
> Fix For: 0.11.0.2, 1.0.1
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-10 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199478#comment-16199478
 ] 

Randall Hauch commented on KAFKA-6007:
--

[~kkonstantine], thanks for the excellent diagnosis and explanation. I agree it 
is not a blocker since there are several ways to work around this. 

However, what if anything can be done to eliminate the problem altogether? For 
example, is there a way to try to resolve any class not found on the current 
classloader by always checking if it is one of the connector, transformation, 
or converter implementations in the Plugins component, and to load it the 
correct plugin's classloader?

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0
>Reporter: Stephane Maarek
>Assignee: Konstantine Karantasis
> Fix For: 0.11.0.2, 1.0.1
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6048) Support negative record timestamps

2017-10-10 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-6048:
--

 Summary: Support negative record timestamps
 Key: KAFKA-6048
 URL: https://issues.apache.org/jira/browse/KAFKA-6048
 Project: Kafka
  Issue Type: Improvement
  Components: clients, core, streams
Affects Versions: 1.0.0
Reporter: Matthias J. Sax


Kafka does not support negative record timestamps, and this prevents the 
storage of historical data in Kafka. In general, negative timestamps are 
supported by UNIX system time stamps: 

>From https://en.wikipedia.org/wiki/Unix_time
{quote}
The Unix time number is zero at the Unix epoch, and increases by exactly 86,400 
per day since the epoch. Thus 2004-09-16T00:00:00Z, 12,677 days after the 
epoch, is represented by the Unix time number 12,677 × 86,400 = 1095292800. 
This can be extended backwards from the epoch too, using negative numbers; thus 
1957-10-04T00:00:00Z, 4,472 days before the epoch, is represented by the Unix 
time number −4,472 × 86,400 = −386380800.
{quote}

Allowing for negative timestamps would require multiple changes:

 - while brokers in general do support negative timestamps, broker use {{-1}} 
as default value if a producer uses an old message format (this would not be 
compatible with supporting negative timestamps "end-to-end" as {{-1}} cannot be 
used as "unknown" anymore): we could introduce a message flag indicating a 
missing timestamp (and let producer throw an exception if 
{{ConsumerRecord#timestamp()}} is called. Another possible solution might be, 
to require topics that are used by old producers to be configured with 
{{LogAppendTime}} semantics and rejecting writes to topics with {{CreateTime}} 
semantics for older message formats
 - {{KafkaProducer}} does not allow to send records with negative timestamp and 
thus this would need to be fixed
 - Streams API does drop records with negative timestamps (or fails by default) 
-- also, some internal store implementation for windowed stores assume that 
there are not negative timestamps to do range queries

There might be other gaps we need to address. This is just a summary of issue 
coming to my mind atm.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5977) Upgrade RocksDB dependency to legally acceptable version

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-5977.
--
Resolution: Fixed

> Upgrade RocksDB dependency to legally acceptable version
> 
>
> Key: KAFKA-5977
> URL: https://issues.apache.org/jira/browse/KAFKA-5977
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Stevo Slavic
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 1.0.0
>
>
> RocksDB 5.5.5+ seems to be legally acceptable. For more info see
> - https://issues.apache.org/jira/browse/LEGAL-303 and
> - https://www.apache.org/legal/resolved.html#category-x
> Even latest trunk of Apache Kafka depends on older RocksDB 
> https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L67
> If I'm not mistaken, this makes all current Apache Kafka 0.10+ releases not 
> legally acceptable Apache products.
> Please consider upgrading the dependency. If possible please include the 
> change in Apache Kafka 1.0.0 release, if not also in patch releases of older 
> still supported 0.x Apache Kafka branches.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-5977) Upgrade RocksDB dependency to legally acceptable version

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-5977:
--

> Upgrade RocksDB dependency to legally acceptable version
> 
>
> Key: KAFKA-5977
> URL: https://issues.apache.org/jira/browse/KAFKA-5977
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Stevo Slavic
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 1.0.0
>
>
> RocksDB 5.5.5+ seems to be legally acceptable. For more info see
> - https://issues.apache.org/jira/browse/LEGAL-303 and
> - https://www.apache.org/legal/resolved.html#category-x
> Even latest trunk of Apache Kafka depends on older RocksDB 
> https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L67
> If I'm not mistaken, this makes all current Apache Kafka 0.10+ releases not 
> legally acceptable Apache products.
> Please consider upgrading the dependency. If possible please include the 
> change in Apache Kafka 1.0.0 release, if not also in patch releases of older 
> still supported 0.x Apache Kafka branches.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5640) Look into making acks=all the default setting

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5640:
-
Fix Version/s: (was: 1.0.0)

> Look into making acks=all the default setting
> -
>
> Key: KAFKA-5640
> URL: https://issues.apache.org/jira/browse/KAFKA-5640
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>
> KAFKA-5494 proposed dropping the requirement for 
> {{max.inflight.requests.per.connection=1}} for the idempotent producer. 
> That is a stepping stone to enabling the idempotent producer by default 
> without sacrificing performance.
> A further step would be making {{acks=all}} the default setting as well. 
> Then, with {{enable.idempotence=true}}, 
> {{max.inflight.requests.per.connection=5}}, {{acks=all}}, 
> {{retries=MAX_INT}}, we would have exactly once semantics with strong 
> durability guarantees. 
> This particular ticket is about investigating the performance degradation 
> caused by {{acks=all}}. How much does throughput degrade? If it is 
> significant, are there low hanging fruit in terms of code or config changes 
> which would allow us to bridge most of the gap?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5569) Document any changes from this task

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5569:
-
Fix Version/s: (was: 1.0.0)

> Document any changes from this task
> ---
>
> Key: KAFKA-5569
> URL: https://issues.apache.org/jira/browse/KAFKA-5569
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Eno Thereska
>Assignee: Matthias J. Sax
>
> After fixing the exceptions, document what was done, e.g., KIP-161 at a 
> minimum.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5621) The producer should retry expired batches when retries are enabled

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5621:
-
Fix Version/s: (was: 1.0.0)

> The producer should retry expired batches when retries are enabled
> --
>
> Key: KAFKA-5621
> URL: https://issues.apache.org/jira/browse/KAFKA-5621
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Sumant Tambe
>
> Today, when a batch is expired in the accumulator, a {{TimeoutException}} is 
> raised to the user.
> It might be better the producer to retry the expired batch rather up to the 
> configured number of retries. This is more intuitive from the user's point of 
> view. 
> Further the proposed behavior makes it easier for applications like mirror 
> maker to provide ordering guarantees even when batches expire. Today, they 
> would resend the expired batch and it would get added to the back of the 
> queue, causing the output ordering to be different from the input ordering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5011) Replica fetchers may need to down-convert messages during a selective message format upgrade

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5011:
-
Fix Version/s: (was: 1.0.0)

> Replica fetchers may need to down-convert messages during a selective message 
> format upgrade
> 
>
> Key: KAFKA-5011
> URL: https://issues.apache.org/jira/browse/KAFKA-5011
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Jiangjie Qin
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5499) Double check how we handle exceptions when commits fail

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5499:
-
Fix Version/s: (was: 1.0.0)

> Double check how we handle exceptions when commits fail
> ---
>
> Key: KAFKA-5499
> URL: https://issues.apache.org/jira/browse/KAFKA-5499
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Eno Thereska
>Assignee: Matthias J. Sax
>
> When a task does a lot of processing in-between calls to poll() it happens 
> that it might miss a rebalance. It can find that out once it tries to 
> commit() since it will get an exception. Double check what is supposed to 
> happen on such an exception, e.g., should the thread fail, or should it 
> continue? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5573) kaka-clients 0.11.0.0 AdminClient#createTopics() does not set configs

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5573:
-
Fix Version/s: (was: 1.0.0)

> kaka-clients 0.11.0.0 AdminClient#createTopics() does not set configs
> -
>
> Key: KAFKA-5573
> URL: https://issues.apache.org/jira/browse/KAFKA-5573
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.11.0.0
>Reporter: Dmitry Minkovsky
>
> I am creating topics like
> ```
>private void createTopics(String[] topics, Map config) {
> log.info("creating topics: {} with config: {}", names, config);
> CreateTopicsResult result = admin.createTopics(
>   Arrays
> .stream(topics)
> .map(topic -> new NewTopic(topic, partitions, replication))
> .collect(Collectors.toList())
> );
> for (Map.Entry> entry : 
> result.values().entrySet()) {
> try {
> entry.getValue().get();
> log.info("topic {} created", entry.getKey());
> } catch (InterruptedException | ExecutionException e) {
> if (Throwables.getRootCause(e) instanceof 
> TopicExistsException) {
> log.info("topic {} existed", entry.getKey());
> }
> }
> }
> }
> ```
> where I call this function like 
> ```
> Map config = new HashMap<>();
> config.put("cleanup.policy", "compact");
> createTopics(new String[]{"topic"}, config);
> ```
> However, when I inspect the topic with 
> ./kafka-configs.sh --zookeeper localhost:2181 --entity-type topics 
> --entity-name topic --describe
> or 
> ./kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic
> there are no configs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5300) Improve exception handling on producer path

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5300:
-
Fix Version/s: (was: 1.0.0)

> Improve exception handling on producer path
> ---
>
> Key: KAFKA-5300
> URL: https://issues.apache.org/jira/browse/KAFKA-5300
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
>
> Fix exception handling along the code path that uses the KafkaProducer, and 
> goes through RecordCollector and further up to SinkNode and StoreChangeLogger.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4563) State transitions error PARTITIONS_REVOKED to NOT_RUNNING

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4563:
-
Fix Version/s: (was: 1.0.0)

> State transitions error PARTITIONS_REVOKED to NOT_RUNNING
> -
>
> Key: KAFKA-4563
> URL: https://issues.apache.org/jira/browse/KAFKA-4563
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
>
> When starting and stopping streams quickly, the following exception is thrown:
> java.lang.IllegalStateException: Incorrect state transition from 
> PARTITIONS_REVOKED to NOT_RUNNING
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:164)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:414)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:366)
> A temporary fix is to convert the exception into a warning, since clearly not 
> all state transitions are thought through yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL

2017-10-10 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6049:


 Summary: Kafka Streams: Add Cogroup in the DSL
 Key: KAFKA-6049
 URL: https://issues.apache.org/jira/browse/KAFKA-6049
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Guozhang Wang
Assignee: Kyle Winkelman


When multiple streams aggregate together to form a single larger object (eg. A 
shopping website may have a cart stream, a wish list stream, and a purchases 
stream. Together they make up a Customer.), it is very difficult to accommodate 
this in the Kafka-Streams DSL. It generally requires you to group and aggregate 
all of the streams to KTables then make multiple outerjoin calls to end up with 
a KTable with your desired object. This will create a state store for each 
stream and a long chain of ValueJoiners that each new record must go through to 
get to the final object.
Creating a cogroup method where you use a single state store will:
 Reduce the number of gets from state stores. With the multiple joins when a 
new value comes into any of the streams a chain reaction happens where 
ValueGetters keep calling ValueGetters until we have accessed all state stores.
Slight performance increase. As described above all ValueGetters are called 
also causing all ValueJoiners to be called forcing a recalculation of the 
current joined value of all other streams, impacting performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL

2017-10-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199642#comment-16199642
 ] 

Guozhang Wang commented on KAFKA-6049:
--

WIP PR ready at 
https://github.com/apache/kafka/pull/2975#issuecomment-331275009.

Needs someone to pick it up, address the left comments, rebase on trunk and 
push a new PR to continue.

> Kafka Streams: Add Cogroup in the DSL
> -
>
> Key: KAFKA-6049
> URL: https://issues.apache.org/jira/browse/KAFKA-6049
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Kyle Winkelman
>  Labels: api, needs-kip, user-experience
>
> When multiple streams aggregate together to form a single larger object (eg. 
> A shopping website may have a cart stream, a wish list stream, and a 
> purchases stream. Together they make up a Customer.), it is very difficult to 
> accommodate this in the Kafka-Streams DSL. It generally requires you to group 
> and aggregate all of the streams to KTables then make multiple outerjoin 
> calls to end up with a KTable with your desired object. This will create a 
> state store for each stream and a long chain of ValueJoiners that each new 
> record must go through to get to the final object.
> Creating a cogroup method where you use a single state store will:
>  Reduce the number of gets from state stores. With the multiple joins when a 
> new value comes into any of the streams a chain reaction happens where 
> ValueGetters keep calling ValueGetters until we have accessed all state 
> stores.
> Slight performance increase. As described above all ValueGetters are called 
> also causing all ValueJoiners to be called forcing a recalculation of the 
> current joined value of all other streams, impacting performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-6049:


Assignee: (was: Kyle Winkelman)

> Kafka Streams: Add Cogroup in the DSL
> -
>
> Key: KAFKA-6049
> URL: https://issues.apache.org/jira/browse/KAFKA-6049
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api, needs-kip, user-experience
>
> When multiple streams aggregate together to form a single larger object (eg. 
> A shopping website may have a cart stream, a wish list stream, and a 
> purchases stream. Together they make up a Customer.), it is very difficult to 
> accommodate this in the Kafka-Streams DSL. It generally requires you to group 
> and aggregate all of the streams to KTables then make multiple outerjoin 
> calls to end up with a KTable with your desired object. This will create a 
> state store for each stream and a long chain of ValueJoiners that each new 
> record must go through to get to the final object.
> Creating a cogroup method where you use a single state store will:
>  Reduce the number of gets from state stores. With the multiple joins when a 
> new value comes into any of the streams a chain reaction happens where 
> ValueGetters keep calling ValueGetters until we have accessed all state 
> stores.
> Slight performance increase. As described above all ValueGetters are called 
> also causing all ValueJoiners to be called forcing a recalculation of the 
> current joined value of all other streams, impacting performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL

2017-10-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199643#comment-16199643
 ] 

Guozhang Wang commented on KAFKA-6049:
--

This is primarily contributed by [~winkelman.kyle]. I'm leaving the assignor as 
empty for now to get someone interested in helping complete it off.

> Kafka Streams: Add Cogroup in the DSL
> -
>
> Key: KAFKA-6049
> URL: https://issues.apache.org/jira/browse/KAFKA-6049
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api, needs-kip, user-experience
>
> When multiple streams aggregate together to form a single larger object (eg. 
> A shopping website may have a cart stream, a wish list stream, and a 
> purchases stream. Together they make up a Customer.), it is very difficult to 
> accommodate this in the Kafka-Streams DSL. It generally requires you to group 
> and aggregate all of the streams to KTables then make multiple outerjoin 
> calls to end up with a KTable with your desired object. This will create a 
> state store for each stream and a long chain of ValueJoiners that each new 
> record must go through to get to the final object.
> Creating a cogroup method where you use a single state store will:
>  Reduce the number of gets from state stores. With the multiple joins when a 
> new value comes into any of the streams a chain reaction happens where 
> ValueGetters keep calling ValueGetters until we have accessed all state 
> stores.
> Slight performance increase. As described above all ValueGetters are called 
> also causing all ValueJoiners to be called forcing a recalculation of the 
> current joined value of all other streams, impacting performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL

2017-10-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199642#comment-16199642
 ] 

Guozhang Wang edited comment on KAFKA-6049 at 10/11/17 12:59 AM:
-

WIP PR ready at https://github.com/apache/kafka/pull/2975

Needs someone to pick it up, address the left comments, rebase on trunk and 
push a new PR to continue.


was (Author: guozhang):
WIP PR ready at 
https://github.com/apache/kafka/pull/2975#issuecomment-331275009.

Needs someone to pick it up, address the left comments, rebase on trunk and 
push a new PR to continue.

> Kafka Streams: Add Cogroup in the DSL
> -
>
> Key: KAFKA-6049
> URL: https://issues.apache.org/jira/browse/KAFKA-6049
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api, needs-kip, user-experience
>
> When multiple streams aggregate together to form a single larger object (eg. 
> A shopping website may have a cart stream, a wish list stream, and a 
> purchases stream. Together they make up a Customer.), it is very difficult to 
> accommodate this in the Kafka-Streams DSL. It generally requires you to group 
> and aggregate all of the streams to KTables then make multiple outerjoin 
> calls to end up with a KTable with your desired object. This will create a 
> state store for each stream and a long chain of ValueJoiners that each new 
> record must go through to get to the final object.
> Creating a cogroup method where you use a single state store will:
>  Reduce the number of gets from state stores. With the multiple joins when a 
> new value comes into any of the streams a chain reaction happens where 
> ValueGetters keep calling ValueGetters until we have accessed all state 
> stores.
> Slight performance increase. As described above all ValueGetters are called 
> also causing all ValueJoiners to be called forcing a recalculation of the 
> current joined value of all other streams, impacting performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL

2017-10-10 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199645#comment-16199645
 ] 

Ted Yu commented on KAFKA-6049:
---

bq. ValueGetters keep calling ValueGetters until 

Is there typo above ?

> Kafka Streams: Add Cogroup in the DSL
> -
>
> Key: KAFKA-6049
> URL: https://issues.apache.org/jira/browse/KAFKA-6049
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api, needs-kip, user-experience
>
> When multiple streams aggregate together to form a single larger object (eg. 
> A shopping website may have a cart stream, a wish list stream, and a 
> purchases stream. Together they make up a Customer.), it is very difficult to 
> accommodate this in the Kafka-Streams DSL. It generally requires you to group 
> and aggregate all of the streams to KTables then make multiple outerjoin 
> calls to end up with a KTable with your desired object. This will create a 
> state store for each stream and a long chain of ValueJoiners that each new 
> record must go through to get to the final object.
> Creating a cogroup method where you use a single state store will:
>  Reduce the number of gets from state stores. With the multiple joins when a 
> new value comes into any of the streams a chain reaction happens where 
> ValueGetters keep calling ValueGetters until we have accessed all state 
> stores.
> Slight performance increase. As described above all ValueGetters are called 
> also causing all ValueJoiners to be called forcing a recalculation of the 
> current joined value of all other streams, impacting performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL

2017-10-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199649#comment-16199649
 ] 

Guozhang Wang commented on KAFKA-6049:
--

Thanks [~tedyu], fixing now.

> Kafka Streams: Add Cogroup in the DSL
> -
>
> Key: KAFKA-6049
> URL: https://issues.apache.org/jira/browse/KAFKA-6049
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api, needs-kip, user-experience
>
> When multiple streams aggregate together to form a single larger object (eg. 
> A shopping website may have a cart stream, a wish list stream, and a 
> purchases stream. Together they make up a Customer.), it is very difficult to 
> accommodate this in the Kafka-Streams DSL. It generally requires you to group 
> and aggregate all of the streams to KTables then make multiple outerjoin 
> calls to end up with a KTable with your desired object. This will create a 
> state store for each stream and a long chain of ValueJoiners that each new 
> record must go through to get to the final object.
> Creating a cogroup method where you use a single state store will:
>  Reduce the number of gets from state stores. With the multiple joins when a 
> new value comes into any of the streams a chain reaction happens where 
> ValueGetters keep calling ValueGetters until we have accessed all state 
> stores.
> Slight performance increase. As described above all ValueGetters are called 
> also causing all ValueJoiners to be called forcing a recalculation of the 
> current joined value of all other streams, impacting performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6049:
-
Description: 
When multiple streams aggregate together to form a single larger object (e.g. A 
shopping website may have a cart stream, a wish list stream, and a purchases 
stream. Together they make up a Customer), it is very difficult to accommodate 
this in the Kafka-Streams DSL: it generally requires you to group and aggregate 
all of the streams to KTables then make multiple outer join calls to end up 
with a KTable with your desired object. This will create a state store for each 
stream and a long chain of ValueJoiners that each new record must go through to 
get to the final object.

Creating a cogroup method where you use a single state store will:

* Reduce the number of gets from state stores. With the multiple joins when a 
new value comes into any of the streams a chain reaction happens where the join 
processor keep calling ValueGetters until we have accessed all state stores.

* Slight performance increase. As described above all ValueGetters are called 
also causing all ValueJoiners to be called forcing a recalculation of the 
current joined value of all other streams, impacting performance.

  was:
When multiple streams aggregate together to form a single larger object (e.g. A 
shopping website may have a cart stream, a wish list stream, and a purchases 
stream. Together they make up a Customer), it is very difficult to accommodate 
this in the Kafka-Streams DSL: it generally requires you to group and aggregate 
all of the streams to KTables then make multiple outer join calls to end up 
with a KTable with your desired object. This will create a state store for each 
stream and a long chain of ValueJoiners that each new record must go through to 
get to the final object.

Creating a cogroup method where you use a single state store will:

* Reduce the number of gets from state stores. With the multiple joins when a 
new value comes into any of the streams a chain reaction happens where 
ValueGetters keep calling ValueGetters until we have accessed all state stores.

* Slight performance increase. As described above all ValueGetters are called 
also causing all ValueJoiners to be called forcing a recalculation of the 
current joined value of all other streams, impacting performance.


> Kafka Streams: Add Cogroup in the DSL
> -
>
> Key: KAFKA-6049
> URL: https://issues.apache.org/jira/browse/KAFKA-6049
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api, needs-kip, user-experience
>
> When multiple streams aggregate together to form a single larger object (e.g. 
> A shopping website may have a cart stream, a wish list stream, and a 
> purchases stream. Together they make up a Customer), it is very difficult to 
> accommodate this in the Kafka-Streams DSL: it generally requires you to group 
> and aggregate all of the streams to KTables then make multiple outer join 
> calls to end up with a KTable with your desired object. This will create a 
> state store for each stream and a long chain of ValueJoiners that each new 
> record must go through to get to the final object.
> Creating a cogroup method where you use a single state store will:
> * Reduce the number of gets from state stores. With the multiple joins when a 
> new value comes into any of the streams a chain reaction happens where the 
> join processor keep calling ValueGetters until we have accessed all state 
> stores.
> * Slight performance increase. As described above all ValueGetters are called 
> also causing all ValueJoiners to be called forcing a recalculation of the 
> current joined value of all other streams, impacting performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6049:
-
Description: 
When multiple streams aggregate together to form a single larger object (e.g. A 
shopping website may have a cart stream, a wish list stream, and a purchases 
stream. Together they make up a Customer), it is very difficult to accommodate 
this in the Kafka-Streams DSL: it generally requires you to group and aggregate 
all of the streams to KTables then make multiple outer join calls to end up 
with a KTable with your desired object. This will create a state store for each 
stream and a long chain of ValueJoiners that each new record must go through to 
get to the final object.

Creating a cogroup method where you use a single state store will:

* Reduce the number of gets from state stores. With the multiple joins when a 
new value comes into any of the streams a chain reaction happens where 
ValueGetters keep calling ValueGetters until we have accessed all state stores.

* Slight performance increase. As described above all ValueGetters are called 
also causing all ValueJoiners to be called forcing a recalculation of the 
current joined value of all other streams, impacting performance.

  was:
When multiple streams aggregate together to form a single larger object (eg. A 
shopping website may have a cart stream, a wish list stream, and a purchases 
stream. Together they make up a Customer.), it is very difficult to accommodate 
this in the Kafka-Streams DSL. It generally requires you to group and aggregate 
all of the streams to KTables then make multiple outerjoin calls to end up with 
a KTable with your desired object. This will create a state store for each 
stream and a long chain of ValueJoiners that each new record must go through to 
get to the final object.
Creating a cogroup method where you use a single state store will:
 Reduce the number of gets from state stores. With the multiple joins when a 
new value comes into any of the streams a chain reaction happens where 
ValueGetters keep calling ValueGetters until we have accessed all state stores.
Slight performance increase. As described above all ValueGetters are called 
also causing all ValueJoiners to be called forcing a recalculation of the 
current joined value of all other streams, impacting performance.


> Kafka Streams: Add Cogroup in the DSL
> -
>
> Key: KAFKA-6049
> URL: https://issues.apache.org/jira/browse/KAFKA-6049
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api, needs-kip, user-experience
>
> When multiple streams aggregate together to form a single larger object (e.g. 
> A shopping website may have a cart stream, a wish list stream, and a 
> purchases stream. Together they make up a Customer), it is very difficult to 
> accommodate this in the Kafka-Streams DSL: it generally requires you to group 
> and aggregate all of the streams to KTables then make multiple outer join 
> calls to end up with a KTable with your desired object. This will create a 
> state store for each stream and a long chain of ValueJoiners that each new 
> record must go through to get to the final object.
> Creating a cogroup method where you use a single state store will:
> * Reduce the number of gets from state stores. With the multiple joins when a 
> new value comes into any of the streams a chain reaction happens where 
> ValueGetters keep calling ValueGetters until we have accessed all state 
> stores.
> * Slight performance increase. As described above all ValueGetters are called 
> also causing all ValueJoiners to be called forcing a recalculation of the 
> current joined value of all other streams, impacting performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL

2017-10-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6049:
-
Description: 
When multiple streams aggregate together to form a single larger object (e.g. a 
shopping website may have a cart stream, a wish list stream, and a purchases 
stream. Together they make up a Customer), it is very difficult to accommodate 
this in the Kafka-Streams DSL: it generally requires you to group and aggregate 
all of the streams to KTables then make multiple outer join calls to end up 
with a KTable with your desired object. This will create a state store for each 
stream and a long chain of ValueJoiners that each new record must go through to 
get to the final object.

Creating a cogroup method where you use a single state store will:

* Reduce the number of gets from state stores. With the multiple joins when a 
new value comes into any of the streams a chain reaction happens where the join 
processor keep calling ValueGetters until we have accessed all state stores.

* Slight performance increase. As described above all ValueGetters are called 
also causing all ValueJoiners to be called forcing a recalculation of the 
current joined value of all other streams, impacting performance.

  was:
When multiple streams aggregate together to form a single larger object (e.g. A 
shopping website may have a cart stream, a wish list stream, and a purchases 
stream. Together they make up a Customer), it is very difficult to accommodate 
this in the Kafka-Streams DSL: it generally requires you to group and aggregate 
all of the streams to KTables then make multiple outer join calls to end up 
with a KTable with your desired object. This will create a state store for each 
stream and a long chain of ValueJoiners that each new record must go through to 
get to the final object.

Creating a cogroup method where you use a single state store will:

* Reduce the number of gets from state stores. With the multiple joins when a 
new value comes into any of the streams a chain reaction happens where the join 
processor keep calling ValueGetters until we have accessed all state stores.

* Slight performance increase. As described above all ValueGetters are called 
also causing all ValueJoiners to be called forcing a recalculation of the 
current joined value of all other streams, impacting performance.


> Kafka Streams: Add Cogroup in the DSL
> -
>
> Key: KAFKA-6049
> URL: https://issues.apache.org/jira/browse/KAFKA-6049
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api, needs-kip, user-experience
>
> When multiple streams aggregate together to form a single larger object (e.g. 
> a shopping website may have a cart stream, a wish list stream, and a 
> purchases stream. Together they make up a Customer), it is very difficult to 
> accommodate this in the Kafka-Streams DSL: it generally requires you to group 
> and aggregate all of the streams to KTables then make multiple outer join 
> calls to end up with a KTable with your desired object. This will create a 
> state store for each stream and a long chain of ValueJoiners that each new 
> record must go through to get to the final object.
> Creating a cogroup method where you use a single state store will:
> * Reduce the number of gets from state stores. With the multiple joins when a 
> new value comes into any of the streams a chain reaction happens where the 
> join processor keep calling ValueGetters until we have accessed all state 
> stores.
> * Slight performance increase. As described above all ValueGetters are called 
> also causing all ValueJoiners to be called forcing a recalculation of the 
> current joined value of all other streams, impacting performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6043) Kafka 8.1.1 - .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) blocked

2017-10-10 Thread Ramkumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199667#comment-16199667
 ] 

Ramkumar commented on KAFKA-6043:
-

Hi Ted, thanks for your reply. I couldn't get the reference for 
"0x00076b2e1508" to find who holds.  I am attaching the entire thread dump. 
it does seems to occur when shut down activity is triggered. Thanks again for 
your help[^6043.v1]

> Kafka 8.1.1 - 
> .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) 
> blocked
> 
>
> Key: KAFKA-6043
> URL: https://issues.apache.org/jira/browse/KAFKA-6043
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.1.1
>Reporter: Ramkumar
> Attachments: 6043.v1
>
>
> we are using 3 node kafka cluster. Around this kafka cluster we have a 
> RESTful service which provides http APIs for client. This service maintains 
> the consumer connection in cache. And this cache is set to expire in 60 
> minutes after which , the consumer connection will get disconnected.
> But we see this zookeeperconnection thread is blocked and the consumer object 
> is still hanging in jvm. Can you pls let me know if there is any solution 
> identified for this
> Below is the output from thread dump when this occurred
> pool-25100-thread-1" prio=10 tid=0x7f711804e820 nid=0x6726 waiting for 
> monitor entry [0x7f6cd999b000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> - waiting to lock <0x00076a922c40> (a java.lang.Object)
> at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:207)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:201)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> T--APPC-LCM-READ-E2E_T2_watcher_executor" prio=10 tid=0x7f7180198030 
> nid=0x55a2 waiting on condition [0x7f6c9f4fa000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00076b2e1508> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
> at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> - locked <0x00076a922340> (a java.lang.Object)
> at 
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala
> :524)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:562)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:457)
> at 
> kafk

[jira] [Comment Edited] (KAFKA-6043) Kafka 8.1.1 - .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) blocked

2017-10-10 Thread Ramkumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199667#comment-16199667
 ] 

Ramkumar edited comment on KAFKA-6043 at 10/11/17 1:24 AM:
---

Hi Ted, thanks for your reply. I couldn't get the reference for 
"0x00076b2e1508" to find who holds.  I am attaching the entire thread dump. 
it does seems to occur when shut down activity is triggered. Thanks again for 
your help[^6043.v1]


was (Author: ram_...@yahoo.com):
Hi Ted, thanks for your reply. I couldn't get the reference for 
"0x00076b2e1508" to find who holds.  I am attaching the entire thread dump. 
it does seems to occur when shut down activity is triggered. Thanks again for 
your help[^6043.v1]

> Kafka 8.1.1 - 
> .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) 
> blocked
> 
>
> Key: KAFKA-6043
> URL: https://issues.apache.org/jira/browse/KAFKA-6043
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.1.1
>Reporter: Ramkumar
> Attachments: 6043.v1
>
>
> we are using 3 node kafka cluster. Around this kafka cluster we have a 
> RESTful service which provides http APIs for client. This service maintains 
> the consumer connection in cache. And this cache is set to expire in 60 
> minutes after which , the consumer connection will get disconnected.
> But we see this zookeeperconnection thread is blocked and the consumer object 
> is still hanging in jvm. Can you pls let me know if there is any solution 
> identified for this
> Below is the output from thread dump when this occurred
> pool-25100-thread-1" prio=10 tid=0x7f711804e820 nid=0x6726 waiting for 
> monitor entry [0x7f6cd999b000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> - waiting to lock <0x00076a922c40> (a java.lang.Object)
> at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:207)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:201)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> T--APPC-LCM-READ-E2E_T2_watcher_executor" prio=10 tid=0x7f7180198030 
> nid=0x55a2 waiting on condition [0x7f6c9f4fa000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00076b2e1508> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
> at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> - locked <0x00076a922340> (a java.lang.Object)
> at 
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala
> :524)
> a

[jira] [Updated] (KAFKA-6043) Kafka 8.1.1 - .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) blocked

2017-10-10 Thread Ramkumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ramkumar updated KAFKA-6043:

Attachment: stdout.LRMIID-158150.zip

Attaching the complete thread dump with this file .

> Kafka 8.1.1 - 
> .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) 
> blocked
> 
>
> Key: KAFKA-6043
> URL: https://issues.apache.org/jira/browse/KAFKA-6043
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.1.1
>Reporter: Ramkumar
> Attachments: 6043.v1, stdout.LRMIID-158150.zip
>
>
> we are using 3 node kafka cluster. Around this kafka cluster we have a 
> RESTful service which provides http APIs for client. This service maintains 
> the consumer connection in cache. And this cache is set to expire in 60 
> minutes after which , the consumer connection will get disconnected.
> But we see this zookeeperconnection thread is blocked and the consumer object 
> is still hanging in jvm. Can you pls let me know if there is any solution 
> identified for this
> Below is the output from thread dump when this occurred
> pool-25100-thread-1" prio=10 tid=0x7f711804e820 nid=0x6726 waiting for 
> monitor entry [0x7f6cd999b000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> - waiting to lock <0x00076a922c40> (a java.lang.Object)
> at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:207)
> at 
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:201)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> T--APPC-LCM-READ-E2E_T2_watcher_executor" prio=10 tid=0x7f7180198030 
> nid=0x55a2 waiting on condition [0x7f6c9f4fa000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00076b2e1508> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
> at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> - locked <0x00076a922340> (a java.lang.Object)
> at 
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala
> :524)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:562)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:457)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:408)
> at scala.collection.immutable.Range

[jira] [Commented] (KAFKA-6029) Controller should wait for the leader migration to finish before ack a ControlledShutdownRequest

2017-10-10 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199706#comment-16199706
 ] 

Jiangjie Qin commented on KAFKA-6029:
-

[~junrao] Good point. That seems more likely to happen. Just to check if I 
understand correctly. Are you suggesting the following solution?
1. Let each broker have an epoch which changes on restart.
2. During controlled shtudown, the controller will send LeaderAndIsrRequest 
with the new ISR + shutting down broker with epoch.
3. Add the broker epoch to the FetchRequest so the each follower will send 
FetchRequest with their broker epoch.
4. If the leader sees a fetch request from a broker that matches the shutting 
down broker and epoch it will not add it back to the ISR.
5. After the broker restarts, the leaders will see a new broker epoch and add 
the restarted broker back to ISR.



> Controller should wait for the leader migration to finish before ack a 
> ControlledShutdownRequest
> 
>
> Key: KAFKA-6029
> URL: https://issues.apache.org/jira/browse/KAFKA-6029
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
> Fix For: 1.1.0
>
>
> In the controlled shutdown process, the controller will return the 
> ControlledShutdownResponse immediately after the state machine is updated. 
> Because the LeaderAndIsrRequests and UpdateMetadataRequests may not have been 
> successfully processed by the brokers, the leader migration and active ISR 
> shrink may not have done when the shutting down broker proceeds to shut down. 
> This will cause some of the leaders to take up to replica.lag.time.max.ms to 
> kick the broker out of ISR. Meanwhile the produce purgatory size will grow.
> Ideally, the controller should wait until all the LeaderAndIsrRequests and 
> UpdateMetadataRequests has been acked before sending back the 
> ControlledShutdownResponse.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6028) Improve the quota throttle communication.

2017-10-10 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199712#comment-16199712
 ] 

Jiangjie Qin commented on KAFKA-6028:
-

[~junrao] Not sure if this is going to solve the problem. The solution in 
KAFKA-5871 seems to assume that the clients request timeout is greater than the 
metric window which is 30 seconds by default. But if the client's request 
timeout is less than that it would still timeout and reconnect, right?

> Improve the quota throttle communication.
> -
>
> Key: KAFKA-6028
> URL: https://issues.apache.org/jira/browse/KAFKA-6028
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 1.1.0
>
>
> Currently if a client is throttled duet to quota violation, the broker will 
> only send back a response to the clients after the throttle time has passed. 
> In this case, the clients don't know how long the response will be throttled 
> and might hit request timeout before the response is returned. As a result 
> the clients will retry sending a request and results a even longer throttle 
> time.
> The above scenario could happen when a large clients group sending records to 
> the brokers. We saw this when a MapReduce job pushes data to the Kafka 
> cluster.
> To improve this, the broker can return the response with throttle time 
> immediately after processing the requests. After that, the broker will mute 
> the channel for this client. A correct client implementation should back off 
> for that long before sending the next request. If the client ignored the 
> throttle time and send the next request immediately, the channel will be 
> muted and the request won't be processed until the throttle time has passed.
> A KIP will follow with more details.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5212) Consumer ListOffsets request can starve group heartbeats

2017-10-10 Thread Richard Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199778#comment-16199778
 ] 

Richard Yu commented on KAFKA-5212:
---

During some study after the failure of my most recent commit, there is this 
approach which one would note:

1. Due to hierarchy design, heartbeat is only accessible by ConsumerCoordinator.
2. In order to pass a variable to Fetcher, ConsumerCoordinator must pass the 
necessary information through ConsumerNetworkClient.

One of the easiest ways to is to define a variable with the time of the next 
heartbeat (e.g. long nextHeartbeat = now + remainingMs). The variable is 
defined as a private field of ConsumerNetworkClient but is defined by 
ConsumerCoordinator's methods. This way, Fetcher could have access to the next 
heartbeat time.

However, this is when we encounter a problem: If fetcher's poll() is concluded 
(it could not yield, thus we must wait), and if the time.milliseconds() exceeds 
nextHeartbeat, we must resend a heartbeat. However, the poll() method is 
currently only accessible in ConsumerCoordinator. Is there a way to get around 
this?

> Consumer ListOffsets request can starve group heartbeats
> 
>
> Key: KAFKA-5212
> URL: https://issues.apache.org/jira/browse/KAFKA-5212
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Richard Yu
> Fix For: 1.1.0, 1.0.1
>
>
> The consumer is not able to send heartbeats while it is awaiting a 
> ListOffsets response. Typically this is not a problem because ListOffsets 
> requests are handled quickly, but in the worst case if the request takes 
> longer than the session timeout, the consumer will fall out of the group.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5212) Consumer ListOffsets request can starve group heartbeats

2017-10-10 Thread Richard Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199778#comment-16199778
 ] 

Richard Yu edited comment on KAFKA-5212 at 10/11/17 4:06 AM:
-

[~hachikuji] During some study after the failure of my most recent commit, 
there is this approach which one would note:

1. Due to hierarchy design, heartbeat is only accessible by ConsumerCoordinator.
2. In order to pass a variable to Fetcher, ConsumerCoordinator must pass the 
necessary information through ConsumerNetworkClient.

One of the easiest ways to is to define a variable with the time of the next 
heartbeat (e.g. long nextHeartbeat = now + remainingMs). The variable is 
defined as a private field of ConsumerNetworkClient but is defined by 
ConsumerCoordinator's methods. This way, Fetcher could have access to the next 
heartbeat time.

However, this is when we encounter a problem: If fetcher's poll() is concluded 
(it could not yield, thus we must wait), and if the time.milliseconds() exceeds 
nextHeartbeat, we must resend a heartbeat. However, the poll() method is 
currently only accessible in ConsumerCoordinator. Is there a way to get around 
this?


was (Author: yohan123):
During some study after the failure of my most recent commit, there is this 
approach which one would note:

1. Due to hierarchy design, heartbeat is only accessible by ConsumerCoordinator.
2. In order to pass a variable to Fetcher, ConsumerCoordinator must pass the 
necessary information through ConsumerNetworkClient.

One of the easiest ways to is to define a variable with the time of the next 
heartbeat (e.g. long nextHeartbeat = now + remainingMs). The variable is 
defined as a private field of ConsumerNetworkClient but is defined by 
ConsumerCoordinator's methods. This way, Fetcher could have access to the next 
heartbeat time.

However, this is when we encounter a problem: If fetcher's poll() is concluded 
(it could not yield, thus we must wait), and if the time.milliseconds() exceeds 
nextHeartbeat, we must resend a heartbeat. However, the poll() method is 
currently only accessible in ConsumerCoordinator. Is there a way to get around 
this?

> Consumer ListOffsets request can starve group heartbeats
> 
>
> Key: KAFKA-5212
> URL: https://issues.apache.org/jira/browse/KAFKA-5212
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Richard Yu
> Fix For: 1.1.0, 1.0.1
>
>
> The consumer is not able to send heartbeats while it is awaiting a 
> ListOffsets response. Typically this is not a problem because ListOffsets 
> requests are handled quickly, but in the worst case if the request takes 
> longer than the session timeout, the consumer will fall out of the group.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5212) Consumer ListOffsets request can starve group heartbeats

2017-10-10 Thread Richard Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199778#comment-16199778
 ] 

Richard Yu edited comment on KAFKA-5212 at 10/11/17 4:08 AM:
-

[~hachikuji] During some study after the failure of my most recent commit, 
there is this approach which one would note:

1. Due to hierarchy design, heartbeat is only accessible by ConsumerCoordinator.
2. In order to pass a variable to Fetcher, ConsumerCoordinator must pass the 
necessary information through ConsumerNetworkClient.

One of the easiest ways is to define a variable with the time of the next 
heartbeat (e.g. long nextHeartbeat = now + remainingMs). The variable is 
defined as a private field of ConsumerNetworkClient but is defined by 
ConsumerCoordinator's methods. This way, Fetcher could have access to the next 
heartbeat time.

However, this is when we encounter a problem: If fetcher's poll() is concluded 
(it could not yield, thus we must wait), and if the time.milliseconds() exceeds 
nextHeartbeat, we must resend a heartbeat. However, the poll() method is 
currently only accessible in ConsumerCoordinator. Is there a way to get around 
this?


was (Author: yohan123):
[~hachikuji] During some study after the failure of my most recent commit, 
there is this approach which one would note:

1. Due to hierarchy design, heartbeat is only accessible by ConsumerCoordinator.
2. In order to pass a variable to Fetcher, ConsumerCoordinator must pass the 
necessary information through ConsumerNetworkClient.

One of the easiest ways to is to define a variable with the time of the next 
heartbeat (e.g. long nextHeartbeat = now + remainingMs). The variable is 
defined as a private field of ConsumerNetworkClient but is defined by 
ConsumerCoordinator's methods. This way, Fetcher could have access to the next 
heartbeat time.

However, this is when we encounter a problem: If fetcher's poll() is concluded 
(it could not yield, thus we must wait), and if the time.milliseconds() exceeds 
nextHeartbeat, we must resend a heartbeat. However, the poll() method is 
currently only accessible in ConsumerCoordinator. Is there a way to get around 
this?

> Consumer ListOffsets request can starve group heartbeats
> 
>
> Key: KAFKA-5212
> URL: https://issues.apache.org/jira/browse/KAFKA-5212
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Richard Yu
> Fix For: 1.1.0, 1.0.1
>
>
> The consumer is not able to send heartbeats while it is awaiting a 
> ListOffsets response. Typically this is not a problem because ListOffsets 
> requests are handled quickly, but in the worst case if the request takes 
> longer than the session timeout, the consumer will fall out of the group.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5212) Consumer ListOffsets request can starve group heartbeats

2017-10-10 Thread Richard Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199778#comment-16199778
 ] 

Richard Yu edited comment on KAFKA-5212 at 10/11/17 4:10 AM:
-

[~hachikuji] During some study after the failure of my most recent commit, 
there is this approach which one would note:

1. Due to hierarchy design, heartbeat is only accessible by ConsumerCoordinator.
2. In order to pass a variable to Fetcher, ConsumerCoordinator must pass the 
necessary information through ConsumerNetworkClient.

One of the easiest ways is to define a variable with the time of the next 
heartbeat (e.g. long nextHeartbeat = now + remainingMs). The variable is 
defined as a private field of ConsumerNetworkClient but could be changed by 
ConsumerCoordinator's methods. This way, Fetcher could have access to the next 
heartbeat time.

However, this is when we encounter a problem: If fetcher's poll() is concluded 
(it could not yield, thus we must wait), and if the time.milliseconds() exceeds 
nextHeartbeat, we must resend a heartbeat. However, the pollHeartbeat() method 
is currently only accessible in ConsumerCoordinator. Is there a way to get 
around this?


was (Author: yohan123):
[~hachikuji] During some study after the failure of my most recent commit, 
there is this approach which one would note:

1. Due to hierarchy design, heartbeat is only accessible by ConsumerCoordinator.
2. In order to pass a variable to Fetcher, ConsumerCoordinator must pass the 
necessary information through ConsumerNetworkClient.

One of the easiest ways is to define a variable with the time of the next 
heartbeat (e.g. long nextHeartbeat = now + remainingMs). The variable is 
defined as a private field of ConsumerNetworkClient but is defined by 
ConsumerCoordinator's methods. This way, Fetcher could have access to the next 
heartbeat time.

However, this is when we encounter a problem: If fetcher's poll() is concluded 
(it could not yield, thus we must wait), and if the time.milliseconds() exceeds 
nextHeartbeat, we must resend a heartbeat. However, the poll() method is 
currently only accessible in ConsumerCoordinator. Is there a way to get around 
this?

> Consumer ListOffsets request can starve group heartbeats
> 
>
> Key: KAFKA-5212
> URL: https://issues.apache.org/jira/browse/KAFKA-5212
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Richard Yu
> Fix For: 1.1.0, 1.0.1
>
>
> The consumer is not able to send heartbeats while it is awaiting a 
> ListOffsets response. Typically this is not a problem because ListOffsets 
> requests are handled quickly, but in the worst case if the request takes 
> longer than the session timeout, the consumer will fall out of the group.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6028) Improve the quota throttle communication.

2017-10-10 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199792#comment-16199792
 ] 

Jun Rao commented on KAFKA-6028:


[~becket_qin], the window size is configurable and we can reduce the default to 
match the client timeout. It's just right now, the delay for the byte quota is 
not bounded. We could bound it by the window length. One downside of what you 
proposed is that it's possible for a bad client to just ignore the returned 
delay value and still overwhelm the server.

> Improve the quota throttle communication.
> -
>
> Key: KAFKA-6028
> URL: https://issues.apache.org/jira/browse/KAFKA-6028
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 1.1.0
>
>
> Currently if a client is throttled duet to quota violation, the broker will 
> only send back a response to the clients after the throttle time has passed. 
> In this case, the clients don't know how long the response will be throttled 
> and might hit request timeout before the response is returned. As a result 
> the clients will retry sending a request and results a even longer throttle 
> time.
> The above scenario could happen when a large clients group sending records to 
> the brokers. We saw this when a MapReduce job pushes data to the Kafka 
> cluster.
> To improve this, the broker can return the response with throttle time 
> immediately after processing the requests. After that, the broker will mute 
> the channel for this client. A correct client implementation should back off 
> for that long before sending the next request. If the client ignored the 
> throttle time and send the next request immediately, the channel will be 
> muted and the request won't be processed until the throttle time has passed.
> A KIP will follow with more details.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6029) Controller should wait for the leader migration to finish before ack a ControlledShutdownRequest

2017-10-10 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199794#comment-16199794
 ] 

Jun Rao commented on KAFKA-6029:


[~becket_qin], yes, that's the rough idea.

> Controller should wait for the leader migration to finish before ack a 
> ControlledShutdownRequest
> 
>
> Key: KAFKA-6029
> URL: https://issues.apache.org/jira/browse/KAFKA-6029
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
> Fix For: 1.1.0
>
>
> In the controlled shutdown process, the controller will return the 
> ControlledShutdownResponse immediately after the state machine is updated. 
> Because the LeaderAndIsrRequests and UpdateMetadataRequests may not have been 
> successfully processed by the brokers, the leader migration and active ISR 
> shrink may not have done when the shutting down broker proceeds to shut down. 
> This will cause some of the leaders to take up to replica.lag.time.max.ms to 
> kick the broker out of ISR. Meanwhile the produce purgatory size will grow.
> Ideally, the controller should wait until all the LeaderAndIsrRequests and 
> UpdateMetadataRequests has been acked before sending back the 
> ControlledShutdownResponse.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6022) mirror maker stores offset in zookeeper

2017-10-10 Thread Ronald van de Kuil (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199804#comment-16199804
 ] 

Ronald van de Kuil commented on KAFKA-6022:
---

Hi Manikumar,

I see that I copied the mirror makers consumer properties file from the 
consumer.properties files that is present in the 2.11-0.11.0.0 installation. 

Would it make sense to change the Kafka trunks consumer.properties to use the 
new consumer then?

bootstrap.servers=localhost:9092

When I use the consumer API with this setting in a small Java program then I 
see that the offset is indeed stored in Kafka.

I have some time this weekend to mirror the cluster from scratch.

I can keep you posted if you want.

Thank you for your guidance.

> mirror maker stores offset in zookeeper
> ---
>
> Key: KAFKA-6022
> URL: https://issues.apache.org/jira/browse/KAFKA-6022
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ronald van de Kuil
>Priority: Minor
>
> I happened to notice that the mirror maker stores its offset in zookeeper. 
> I do not see it using:
> bin/kafka-consumer-groups.sh --bootstrap-server pi1:9092 --new-consumer --list
> I do however see consumers that store their offset in kafka.
> I would guess that storing the offset in zookeeper is old style?
> Would it be an idea to update the mirror maker to the new consumer style?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6028) Improve the quota throttle communication.

2017-10-10 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199806#comment-16199806
 ] 

Jiangjie Qin commented on KAFKA-6028:
-

[~junrao] We were thinking that the server will mute the channel for throttle 
time after return the response with a throttle time. In that case, even if the 
clients sends the next request immediately, the server won't read it from the 
socket until throttle time has passed. So a bad client won't overwhelm the 
server. The only downside is that the receive socket buffer will be used if a 
bad client does that. I have a KIP wiki ready and will post it shortly.

If we shrink the metric window the quota is essentially violated, right?

> Improve the quota throttle communication.
> -
>
> Key: KAFKA-6028
> URL: https://issues.apache.org/jira/browse/KAFKA-6028
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 1.1.0
>
>
> Currently if a client is throttled duet to quota violation, the broker will 
> only send back a response to the clients after the throttle time has passed. 
> In this case, the clients don't know how long the response will be throttled 
> and might hit request timeout before the response is returned. As a result 
> the clients will retry sending a request and results a even longer throttle 
> time.
> The above scenario could happen when a large clients group sending records to 
> the brokers. We saw this when a MapReduce job pushes data to the Kafka 
> cluster.
> To improve this, the broker can return the response with throttle time 
> immediately after processing the requests. After that, the broker will mute 
> the channel for this client. A correct client implementation should back off 
> for that long before sending the next request. If the client ignored the 
> throttle time and send the next request immediately, the channel will be 
> muted and the request won't be processed until the throttle time has passed.
> A KIP will follow with more details.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6022) mirror maker stores offset in zookeeper

2017-10-10 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199814#comment-16199814
 ] 

Manikumar commented on KAFKA-6022:
--

I think we missed to update consumer.properties file. Will raise a minor PR to 
update consumer.properties file.
Thank for raising this.

> mirror maker stores offset in zookeeper
> ---
>
> Key: KAFKA-6022
> URL: https://issues.apache.org/jira/browse/KAFKA-6022
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ronald van de Kuil
>Priority: Minor
>
> I happened to notice that the mirror maker stores its offset in zookeeper. 
> I do not see it using:
> bin/kafka-consumer-groups.sh --bootstrap-server pi1:9092 --new-consumer --list
> I do however see consumers that store their offset in kafka.
> I would guess that storing the offset in zookeeper is old style?
> Would it be an idea to update the mirror maker to the new consumer style?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6048) Support negative record timestamps

2017-10-10 Thread james chien (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199888#comment-16199888
 ] 

james chien commented on KAFKA-6048:


I am interested in this. :D
Yes, It seems couples issue to address.


> Support negative record timestamps
> --
>
> Key: KAFKA-6048
> URL: https://issues.apache.org/jira/browse/KAFKA-6048
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>  Labels: needs-kip
>
> Kafka does not support negative record timestamps, and this prevents the 
> storage of historical data in Kafka. In general, negative timestamps are 
> supported by UNIX system time stamps: 
> From https://en.wikipedia.org/wiki/Unix_time
> {quote}
> The Unix time number is zero at the Unix epoch, and increases by exactly 
> 86,400 per day since the epoch. Thus 2004-09-16T00:00:00Z, 12,677 days after 
> the epoch, is represented by the Unix time number 12,677 × 86,400 = 
> 1095292800. This can be extended backwards from the epoch too, using negative 
> numbers; thus 1957-10-04T00:00:00Z, 4,472 days before the epoch, is 
> represented by the Unix time number −4,472 × 86,400 = −386380800.
> {quote}
> Allowing for negative timestamps would require multiple changes:
>  - while brokers in general do support negative timestamps, broker use {{-1}} 
> as default value if a producer uses an old message format (this would not be 
> compatible with supporting negative timestamps "end-to-end" as {{-1}} cannot 
> be used as "unknown" anymore): we could introduce a message flag indicating a 
> missing timestamp (and let producer throw an exception if 
> {{ConsumerRecord#timestamp()}} is called. Another possible solution might be, 
> to require topics that are used by old producers to be configured with 
> {{LogAppendTime}} semantics and rejecting writes to topics with 
> {{CreateTime}} semantics for older message formats
>  - {{KafkaProducer}} does not allow to send records with negative timestamp 
> and thus this would need to be fixed
>  - Streams API does drop records with negative timestamps (or fails by 
> default) -- also, some internal store implementation for windowed stores 
> assume that there are not negative timestamps to do range queries
> There might be other gaps we need to address. This is just a summary of issue 
> coming to my mind atm.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >