Kafka cannot shutdown

2016-10-28 Thread Json Tu
Hi all,
We have a kafka cluster with 11 nodes, and we found there are some 
partition’s replica num is not equal to isr’s num,because our data traffic is 
small,we think it should isr’s num should equal to replica’s num at last,
but it can not recovery to normal,so we try to shutdown a broker that have 
follower partition and not catch up with leader.
before we shutdown the broker,we found the broker’s id is not in 
zookeeper’s ids children list,so I think it is disconnected to zookeeper again 
for some network traffic,but the procedure is alive. we have found this 
phenomenon for several
times, we think it is the zookeeper callback missed, so the zkclient cannot 
register it again.
but it is not my point,my point is,we stop the kafka,but it cannot 
normal exit,because it will stop at kafkacontroller.shutdown() for very very 
long time,and we cannot exit the broker util we use kill -9.


for solve this problem,we jstack the procedure and we found it stop at 
autoRebalanceScheduler.shutdown(),my stack’s result is in my email’s attachment.

can someone help it, thank you very much.
2016-10-28 22:22:44
Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed mode):

"Attach Listener" daemon prio=10 tid=0x7f29b0002000 nid=0x27d6 waiting on 
condition [0x]
   java.lang.Thread.State: RUNNABLE

"Thread-2" prio=10 tid=0x7f299049b800 nid=0x241f waiting on condition 
[0x7f2888a0c000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xbd411f20> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260)
at kafka.controller.KafkaController.shutdown(KafkaController.scala:692)
at 
kafka.server.KafkaServer$$anonfun$shutdown$10.apply$mcV$sp(KafkaServer.scala:543)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79)
at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
at kafka.utils.CoreUtils$.swallowWarn(CoreUtils.scala:51)
at kafka.utils.Logging$class.swallow(Logging.scala:94)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:51)
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:543)
at 
kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:49)
at kafka.Kafka$$anon$1.run(Kafka.scala:63)

"SIGTERM handler" daemon prio=10 tid=0x7f29b0001000 nid=0x241c in 
Object.wait() [0x7f2889059000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
- locked <0xbc7fa470> (a kafka.Kafka$$anon$1)
at java.lang.Thread.join(Thread.java:1355)
at 
java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
at 
java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
at java.lang.Shutdown.runHooks(Shutdown.java:123)
at java.lang.Shutdown.sequence(Shutdown.java:167)
at java.lang.Shutdown.exit(Shutdown.java:212)
- locked <0xbc70ada0> (a java.lang.Class for java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:745)

"main-EventThread" daemon prio=10 tid=0x7f2984027800 nid=0x46d3 waiting on 
condition [0x7f2888c55000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xdc11ae78> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:494)

"main-SendThread(10.4.232.86:2181)" daemon prio=10 tid=0x7f2984026800 
nid=0x46d2 runnable [0x7f2888d56000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun

[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback

2016-10-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15616982#comment-15616982
 ] 

ASF GitHub Bot commented on KAFKA-3559:
---

Github user enothereska closed the pull request at:

https://github.com/apache/kafka/pull/2032


> Task creation time taking too long in rebalance callback
> 
>
> Key: KAFKA-3559
> URL: https://issues.apache.org/jira/browse/KAFKA-3559
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.10.2.0
>
>
> Currently in Kafka Streams, we create stream tasks upon getting newly 
> assigned partitions in rebalance callback function {code} onPartitionAssigned 
> {code}, which involves initialization of the processor state stores as well 
> (including opening the rocksDB, restore the store from changelog, etc, which 
> takes time).
> With a large number of state stores, the initialization time itself could 
> take tens of seconds, which usually is larger than the consumer session 
> timeout. As a result, when the callback is completed, the consumer is already 
> treated as failed by the coordinator and rebalance again.
> We need to consider if we can optimize the initialization process, or move it 
> out of the callback function, and while initializing the stores one-by-one, 
> use poll call to send heartbeats to avoid being kicked out by coordinator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback

2016-10-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15616983#comment-15616983
 ] 

ASF GitHub Bot commented on KAFKA-3559:
---

GitHub user enothereska reopened a pull request:

https://github.com/apache/kafka/pull/2032

KAFKA-3559: Recycle old tasks when possible



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka 
KAFKA-3559-onPartitionAssigned

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2032.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2032


commit 28a50430e4136ba75f0d9b957a67f22e7b1e86a0
Author: Eno Thereska 
Date:   2016-10-17T10:46:45Z

Recycle old tasks when possible

commit b3dc438bf1665b9364b19f5efa908dd35d2b7af3
Author: Eno Thereska 
Date:   2016-10-19T15:13:36Z

Adjusted based on Damian's comments

commit f8cfe74d85e0a8cd5efacca87eced236319c83b9
Author: Eno Thereska 
Date:   2016-10-19T17:44:39Z

Refactor

commit 62bb3fd4a90dd28bc7bb58bf077b7ecb60207c7e
Author: Eno Thereska 
Date:   2016-10-24T14:24:48Z

Merge remote-tracking branch 'origin/trunk' into 
KAFKA-3559-onPartitionAssigned

commit 841caa3721172d2d89ec16ef6dfd149f25498649
Author: Eno Thereska 
Date:   2016-10-24T17:32:05Z

Addressed Guozhang's comments

commit c4498564907243c35df832407933b8a9cf32f4ef
Author: Eno Thereska 
Date:   2016-10-25T11:07:28Z

Refactor

commit 4ba24c1ecb8c6293adce426a92b6021e86c9e8b7
Author: Eno Thereska 
Date:   2016-10-25T12:20:04Z

Merge remote-tracking branch 'origin/trunk' into 
KAFKA-3559-onPartitionAssigned

commit 0fe12633b8593eda3b5b7b75bc87244276c95ce2
Author: Eno Thereska 
Date:   2016-10-28T20:46:18Z

Minor reshuffle

commit 7bf5d96cd66ab77130cad39fbff821fccd83aa06
Author: Eno Thereska 
Date:   2016-10-28T21:44:48Z

Guozhang's suggestion to clear queue




> Task creation time taking too long in rebalance callback
> 
>
> Key: KAFKA-3559
> URL: https://issues.apache.org/jira/browse/KAFKA-3559
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.10.2.0
>
>
> Currently in Kafka Streams, we create stream tasks upon getting newly 
> assigned partitions in rebalance callback function {code} onPartitionAssigned 
> {code}, which involves initialization of the processor state stores as well 
> (including opening the rocksDB, restore the store from changelog, etc, which 
> takes time).
> With a large number of state stores, the initialization time itself could 
> take tens of seconds, which usually is larger than the consumer session 
> timeout. As a result, when the callback is completed, the consumer is already 
> treated as failed by the coordinator and rebalance again.
> We need to consider if we can optimize the initialization process, or move it 
> out of the callback function, and while initializing the stores one-by-one, 
> use poll call to send heartbeats to avoid being kicked out by coordinator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2032: KAFKA-3559: Recycle old tasks when possible

2016-10-28 Thread enothereska
Github user enothereska closed the pull request at:

https://github.com/apache/kafka/pull/2032


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2032: KAFKA-3559: Recycle old tasks when possible

2016-10-28 Thread enothereska
GitHub user enothereska reopened a pull request:

https://github.com/apache/kafka/pull/2032

KAFKA-3559: Recycle old tasks when possible



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka 
KAFKA-3559-onPartitionAssigned

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2032.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2032


commit 28a50430e4136ba75f0d9b957a67f22e7b1e86a0
Author: Eno Thereska 
Date:   2016-10-17T10:46:45Z

Recycle old tasks when possible

commit b3dc438bf1665b9364b19f5efa908dd35d2b7af3
Author: Eno Thereska 
Date:   2016-10-19T15:13:36Z

Adjusted based on Damian's comments

commit f8cfe74d85e0a8cd5efacca87eced236319c83b9
Author: Eno Thereska 
Date:   2016-10-19T17:44:39Z

Refactor

commit 62bb3fd4a90dd28bc7bb58bf077b7ecb60207c7e
Author: Eno Thereska 
Date:   2016-10-24T14:24:48Z

Merge remote-tracking branch 'origin/trunk' into 
KAFKA-3559-onPartitionAssigned

commit 841caa3721172d2d89ec16ef6dfd149f25498649
Author: Eno Thereska 
Date:   2016-10-24T17:32:05Z

Addressed Guozhang's comments

commit c4498564907243c35df832407933b8a9cf32f4ef
Author: Eno Thereska 
Date:   2016-10-25T11:07:28Z

Refactor

commit 4ba24c1ecb8c6293adce426a92b6021e86c9e8b7
Author: Eno Thereska 
Date:   2016-10-25T12:20:04Z

Merge remote-tracking branch 'origin/trunk' into 
KAFKA-3559-onPartitionAssigned

commit 0fe12633b8593eda3b5b7b75bc87244276c95ce2
Author: Eno Thereska 
Date:   2016-10-28T20:46:18Z

Minor reshuffle

commit 7bf5d96cd66ab77130cad39fbff821fccd83aa06
Author: Eno Thereska 
Date:   2016-10-28T21:44:48Z

Guozhang's suggestion to clear queue




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3893) Kafka Borker ID disappears from /borkers/ids

2016-10-28 Thread Rekha Joshi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15616914#comment-15616914
 ] 

Rekha Joshi commented on KAFKA-3893:


We saw it too in Kafka 0.8.x and upgraded to Kafka 0.9.x and continued seeing 
this issue.Going with workarounds.But it would be great(can help) if this is 
documented clearly on Apache Kafka/Confluent FAQ.
Thanks
Rekha


> Kafka Borker ID disappears from /borkers/ids
> 
>
> Key: KAFKA-3893
> URL: https://issues.apache.org/jira/browse/KAFKA-3893
> Project: Kafka
>  Issue Type: Bug
>Reporter: chaitra
>Priority: Critical
>
> Kafka version used : 0.8.2.1 
> Zookeeper version: 3.4.6
> We have scenario where kafka 's broker in  zookeeper path /brokers/ids just 
> disappears.
> We see the zookeeper connection active and no network issue.
> The zookeeper conection timeout is set to 6000ms in server.properties
> Hence Kafka not participating in cluster



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4358) Following a hung broker, newly elected leader is unnecessarily slow assuming leadership because of ReplicaFetcherThread

2016-10-28 Thread Nelson Elhage (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nelson Elhage updated KAFKA-4358:
-
Attachment: shutdown.patch

This patch resolves the hang, but I'm not sure whether or not it's otherwise 
safe. It leaves the risk that the old fetch thread will be running concurrently 
with future operation, but we know that the fetch thread has an empty 
`partitionCount`, which should be sufficient to cause it to take no future 
observable action, as best I can tell.


> Following a hung broker, newly elected leader is unnecessarily slow assuming 
> leadership because of ReplicaFetcherThread
> ---
>
> Key: KAFKA-4358
> URL: https://issues.apache.org/jira/browse/KAFKA-4358
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.10.0.1
>Reporter: Nelson Elhage
>Priority: Minor
> Attachments: shutdown.patch
>
>
> When a broker handles a `LeaderAndIsr` request, the replica manager blocks 
> waiting for idle replication fetcher threads to die before responding to the 
> message and being able to service new produce requests.
> If requests to a broker start blackholing (e.g. due to network failure, or 
> due to the broker hanging), shutting down the `ReplicaFetcherThread` can take 
> a long time (around 30s in my testing), blocking recovery of any partitions 
> previously lead by that broker.
> This is a very similar issue to KAFKA-612.
> Instructions to reproduce/demonstrate:
> Stand up three brokers and create a replicated topic:
> {code}
> bin/zookeeper-server-start.sh config/zookeeper.properties &
> bin/kafka-server-start.sh config/server1.properties &
> bin/kafka-server-start.sh config/server2.properties &
> bin/kafka-server-start.sh config/server3.properties &
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 
> 3 --partitions 1 --topic replicated.topic
> {code}
> Identify the leader, and (for simplicity in interpreting the event) make sure 
> it's not the same as the cluster controller:
> {code}
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic 
> replicated.topic
> {code}
> Start a stream of produce events (with a shortened timeout so we get faster 
> visibility into when the cluster recovers):
> {code}
> echo request.timeout.ms=1000 >> config/producer.properties
> bin/kafka-verifiable-producer.sh --throughput 2 --topic replicated.topic 
> --broker-list localhost:9092 --producer.config 
> $(pwd)/config/producer.properties
> {code}
> Now SIGSTOP the leader (3, in my example):
> {code}
> kill -STOP $(pgrep -f server3.properties)
> {code}
> The producer will log errors for about 30 seconds, and then recover. However, 
> if we read logs, we'll see (excerpting key log lines from `state-change.log`):
> {code}
> [2016-10-28 20:36:03,128] TRACE Controller 2 epoch 8 sending become-leader 
> LeaderAndIsr request (Leader:2,ISR:2,1,LeaderEpoch:22,ControllerEpoch:8) to 
> broker 2 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:36:03,131] TRACE Broker 2 handling LeaderAndIsr request 
> correlationId 2 from controller 2 epoch 8 starting the become-leader 
> transition for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:17,741] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] from OnlinePartition to OnlinePartition with leader 3 
> (state.change.logger)
> [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] state from OnlinePartition to OfflinePartition 
> (state.change.logger)
> [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 started leader election 
> for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,016] TRACE Controller 1 epoch 11 elected leader 2 for 
> Offline partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] from OfflinePartition to OnlinePartition with leader 2 
> (state.change.logger)
> [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 sending become-leader 
> LeaderAndIsr request (Leader:2,ISR:1,2,LeaderEpoch:30,ControllerEpoch:11) to 
> broker 2 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,023] TRACE Broker 2 received LeaderAndIsr request 
> PartitionState(controllerEpoch=11, leader=2, leaderEpoch=30, isr=[1, 2], 
> zkVersion=46, replicas=[1, 2, 3]) correlation id 18 from controller 1 epoch 
> 11 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,024] TRACE Broker 2 handling LeaderAndIsr request 
> correlationId 18 from controller 1 epoch 11 starting the become-leader 
> transition for partition [replicated.topic,

[jira] [Issue Comment Deleted] (KAFKA-4358) Following a hung broker, newly elected leader is unnecessarily slow assuming leadership because of ReplicaFetcherThread

2016-10-28 Thread Nelson Elhage (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nelson Elhage updated KAFKA-4358:
-
Comment: was deleted

(was: This patch resolves the hang, but I'm not sure whether or not it's 
otherwise safe. It leaves the risk that the old fetch thread will be running 
concurrently with future operation, but we know that the fetch thread has an 
empty `partitionCount`, which should be sufficient to cause it to take no 
future observable action, as best I can tell.)

> Following a hung broker, newly elected leader is unnecessarily slow assuming 
> leadership because of ReplicaFetcherThread
> ---
>
> Key: KAFKA-4358
> URL: https://issues.apache.org/jira/browse/KAFKA-4358
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.10.0.1
>Reporter: Nelson Elhage
>Priority: Minor
>
> When a broker handles a `LeaderAndIsr` request, the replica manager blocks 
> waiting for idle replication fetcher threads to die before responding to the 
> message and being able to service new produce requests.
> If requests to a broker start blackholing (e.g. due to network failure, or 
> due to the broker hanging), shutting down the `ReplicaFetcherThread` can take 
> a long time (around 30s in my testing), blocking recovery of any partitions 
> previously lead by that broker.
> This is a very similar issue to KAFKA-612.
> Instructions to reproduce/demonstrate:
> Stand up three brokers and create a replicated topic:
> {code}
> bin/zookeeper-server-start.sh config/zookeeper.properties &
> bin/kafka-server-start.sh config/server1.properties &
> bin/kafka-server-start.sh config/server2.properties &
> bin/kafka-server-start.sh config/server3.properties &
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 
> 3 --partitions 1 --topic replicated.topic
> {code}
> Identify the leader, and (for simplicity in interpreting the event) make sure 
> it's not the same as the cluster controller:
> {code}
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic 
> replicated.topic
> {code}
> Start a stream of produce events (with a shortened timeout so we get faster 
> visibility into when the cluster recovers):
> {code}
> echo request.timeout.ms=1000 >> config/producer.properties
> bin/kafka-verifiable-producer.sh --throughput 2 --topic replicated.topic 
> --broker-list localhost:9092 --producer.config 
> $(pwd)/config/producer.properties
> {code}
> Now SIGSTOP the leader (3, in my example):
> {code}
> kill -STOP $(pgrep -f server3.properties)
> {code}
> The producer will log errors for about 30 seconds, and then recover. However, 
> if we read logs, we'll see (excerpting key log lines from `state-change.log`):
> {code}
> [2016-10-28 20:36:03,128] TRACE Controller 2 epoch 8 sending become-leader 
> LeaderAndIsr request (Leader:2,ISR:2,1,LeaderEpoch:22,ControllerEpoch:8) to 
> broker 2 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:36:03,131] TRACE Broker 2 handling LeaderAndIsr request 
> correlationId 2 from controller 2 epoch 8 starting the become-leader 
> transition for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:17,741] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] from OnlinePartition to OnlinePartition with leader 3 
> (state.change.logger)
> [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] state from OnlinePartition to OfflinePartition 
> (state.change.logger)
> [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 started leader election 
> for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,016] TRACE Controller 1 epoch 11 elected leader 2 for 
> Offline partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] from OfflinePartition to OnlinePartition with leader 2 
> (state.change.logger)
> [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 sending become-leader 
> LeaderAndIsr request (Leader:2,ISR:1,2,LeaderEpoch:30,ControllerEpoch:11) to 
> broker 2 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,023] TRACE Broker 2 received LeaderAndIsr request 
> PartitionState(controllerEpoch=11, leader=2, leaderEpoch=30, isr=[1, 2], 
> zkVersion=46, replicas=[1, 2, 3]) correlation id 18 from controller 1 epoch 
> 11 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,024] TRACE Broker 2 handling LeaderAndIsr request 
> correlationId 18 from controller 1 epoch 11 starting the become-leader 
> transition for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 2

[jira] [Updated] (KAFKA-4358) Following a hung broker, newly elected leader is unnecessarily slow assuming leadership because of ReplicaFetcherThread

2016-10-28 Thread Nelson Elhage (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nelson Elhage updated KAFKA-4358:
-
Status: Open  (was: Patch Available)

> Following a hung broker, newly elected leader is unnecessarily slow assuming 
> leadership because of ReplicaFetcherThread
> ---
>
> Key: KAFKA-4358
> URL: https://issues.apache.org/jira/browse/KAFKA-4358
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.10.0.1
>Reporter: Nelson Elhage
>Priority: Minor
>
> When a broker handles a `LeaderAndIsr` request, the replica manager blocks 
> waiting for idle replication fetcher threads to die before responding to the 
> message and being able to service new produce requests.
> If requests to a broker start blackholing (e.g. due to network failure, or 
> due to the broker hanging), shutting down the `ReplicaFetcherThread` can take 
> a long time (around 30s in my testing), blocking recovery of any partitions 
> previously lead by that broker.
> This is a very similar issue to KAFKA-612.
> Instructions to reproduce/demonstrate:
> Stand up three brokers and create a replicated topic:
> {code}
> bin/zookeeper-server-start.sh config/zookeeper.properties &
> bin/kafka-server-start.sh config/server1.properties &
> bin/kafka-server-start.sh config/server2.properties &
> bin/kafka-server-start.sh config/server3.properties &
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 
> 3 --partitions 1 --topic replicated.topic
> {code}
> Identify the leader, and (for simplicity in interpreting the event) make sure 
> it's not the same as the cluster controller:
> {code}
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic 
> replicated.topic
> {code}
> Start a stream of produce events (with a shortened timeout so we get faster 
> visibility into when the cluster recovers):
> {code}
> echo request.timeout.ms=1000 >> config/producer.properties
> bin/kafka-verifiable-producer.sh --throughput 2 --topic replicated.topic 
> --broker-list localhost:9092 --producer.config 
> $(pwd)/config/producer.properties
> {code}
> Now SIGSTOP the leader (3, in my example):
> {code}
> kill -STOP $(pgrep -f server3.properties)
> {code}
> The producer will log errors for about 30 seconds, and then recover. However, 
> if we read logs, we'll see (excerpting key log lines from `state-change.log`):
> {code}
> [2016-10-28 20:36:03,128] TRACE Controller 2 epoch 8 sending become-leader 
> LeaderAndIsr request (Leader:2,ISR:2,1,LeaderEpoch:22,ControllerEpoch:8) to 
> broker 2 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:36:03,131] TRACE Broker 2 handling LeaderAndIsr request 
> correlationId 2 from controller 2 epoch 8 starting the become-leader 
> transition for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:17,741] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] from OnlinePartition to OnlinePartition with leader 3 
> (state.change.logger)
> [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] state from OnlinePartition to OfflinePartition 
> (state.change.logger)
> [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 started leader election 
> for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,016] TRACE Controller 1 epoch 11 elected leader 2 for 
> Offline partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] from OfflinePartition to OnlinePartition with leader 2 
> (state.change.logger)
> [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 sending become-leader 
> LeaderAndIsr request (Leader:2,ISR:1,2,LeaderEpoch:30,ControllerEpoch:11) to 
> broker 2 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,023] TRACE Broker 2 received LeaderAndIsr request 
> PartitionState(controllerEpoch=11, leader=2, leaderEpoch=30, isr=[1, 2], 
> zkVersion=46, replicas=[1, 2, 3]) correlation id 18 from controller 1 epoch 
> 11 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,024] TRACE Broker 2 handling LeaderAndIsr request 
> correlationId 18 from controller 1 epoch 11 starting the become-leader 
> transition for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,026] TRACE Broker 2 stopped fetchers as part of 
> become-leader request from controller 1 epoch 11 with correlation id 18 for 
> partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,026] TRACE Broker 2 completed LeaderAndIsr request 
> correlationId 18 from controller 1 epoch 11 for the become-leader tra

[jira] [Updated] (KAFKA-4358) Following a hung broker, newly elected leader is unnecessarily slow assuming leadership because of ReplicaFetcherThread

2016-10-28 Thread Nelson Elhage (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nelson Elhage updated KAFKA-4358:
-
Status: Patch Available  (was: Open)

This patch resolves the hang, but I'm not sure whether or not it's otherwise 
safe. It leaves the risk that the old fetch thread will be running concurrently 
with future operation, but we know that the fetch thread has an empty 
`partitionCount`, which should be sufficient to cause it to take no future 
observable action, as best I can tell.

> Following a hung broker, newly elected leader is unnecessarily slow assuming 
> leadership because of ReplicaFetcherThread
> ---
>
> Key: KAFKA-4358
> URL: https://issues.apache.org/jira/browse/KAFKA-4358
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.10.0.1
>Reporter: Nelson Elhage
>Priority: Minor
>
> When a broker handles a `LeaderAndIsr` request, the replica manager blocks 
> waiting for idle replication fetcher threads to die before responding to the 
> message and being able to service new produce requests.
> If requests to a broker start blackholing (e.g. due to network failure, or 
> due to the broker hanging), shutting down the `ReplicaFetcherThread` can take 
> a long time (around 30s in my testing), blocking recovery of any partitions 
> previously lead by that broker.
> This is a very similar issue to KAFKA-612.
> Instructions to reproduce/demonstrate:
> Stand up three brokers and create a replicated topic:
> {code}
> bin/zookeeper-server-start.sh config/zookeeper.properties &
> bin/kafka-server-start.sh config/server1.properties &
> bin/kafka-server-start.sh config/server2.properties &
> bin/kafka-server-start.sh config/server3.properties &
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 
> 3 --partitions 1 --topic replicated.topic
> {code}
> Identify the leader, and (for simplicity in interpreting the event) make sure 
> it's not the same as the cluster controller:
> {code}
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic 
> replicated.topic
> {code}
> Start a stream of produce events (with a shortened timeout so we get faster 
> visibility into when the cluster recovers):
> {code}
> echo request.timeout.ms=1000 >> config/producer.properties
> bin/kafka-verifiable-producer.sh --throughput 2 --topic replicated.topic 
> --broker-list localhost:9092 --producer.config 
> $(pwd)/config/producer.properties
> {code}
> Now SIGSTOP the leader (3, in my example):
> {code}
> kill -STOP $(pgrep -f server3.properties)
> {code}
> The producer will log errors for about 30 seconds, and then recover. However, 
> if we read logs, we'll see (excerpting key log lines from `state-change.log`):
> {code}
> [2016-10-28 20:36:03,128] TRACE Controller 2 epoch 8 sending become-leader 
> LeaderAndIsr request (Leader:2,ISR:2,1,LeaderEpoch:22,ControllerEpoch:8) to 
> broker 2 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:36:03,131] TRACE Broker 2 handling LeaderAndIsr request 
> correlationId 2 from controller 2 epoch 8 starting the become-leader 
> transition for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:17,741] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] from OnlinePartition to OnlinePartition with leader 3 
> (state.change.logger)
> [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] state from OnlinePartition to OfflinePartition 
> (state.change.logger)
> [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 started leader election 
> for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,016] TRACE Controller 1 epoch 11 elected leader 2 for 
> Offline partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] from OfflinePartition to OnlinePartition with leader 2 
> (state.change.logger)
> [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 sending become-leader 
> LeaderAndIsr request (Leader:2,ISR:1,2,LeaderEpoch:30,ControllerEpoch:11) to 
> broker 2 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,023] TRACE Broker 2 received LeaderAndIsr request 
> PartitionState(controllerEpoch=11, leader=2, leaderEpoch=30, isr=[1, 2], 
> zkVersion=46, replicas=[1, 2, 3]) correlation id 18 from controller 1 epoch 
> 11 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,024] TRACE Broker 2 handling LeaderAndIsr request 
> correlationId 18 from controller 1 epoch 11 starting the become-leader 
> transition for partition [replicated.topic,0] (state.change.logger)
> [201

[GitHub] kafka pull request #2076: HOTFIX: improve error message on invalid input rec...

2016-10-28 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/2076

HOTFIX: improve error message on invalid input record timestamp



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka hotfixTSExtractor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2076.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2076


commit c19f5791ba1d201715573035499d66b8ae857a0e
Author: Matthias J. Sax 
Date:   2016-10-28T22:11:53Z

HOTFIX: improve error message on invalid input record timestamp




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4357) Consumer group describe exception when there is no active member (old consumer)

2016-10-28 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-4357:
---
Status: Patch Available  (was: In Progress)

> Consumer group describe exception when there is no active member (old 
> consumer)
> ---
>
> Key: KAFKA-4357
> URL: https://issues.apache.org/jira/browse/KAFKA-4357
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> If the consumer group that is based on old consumer has no active member the 
> following error/exception is raised:
> {code}
> Error: Executing consumer group command failed due to Expected a valid 
> consumer group state, but none found.
> {code}
> The command should instead report the existing offsets within the group (with 
> no data under member column).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4358) Following a hung broker, newly elected leader is unnecessarily slow assuming leadership because of ReplicaFetcherThread

2016-10-28 Thread Nelson Elhage (JIRA)
Nelson Elhage created KAFKA-4358:


 Summary: Following a hung broker, newly elected leader is 
unnecessarily slow assuming leadership because of ReplicaFetcherThread
 Key: KAFKA-4358
 URL: https://issues.apache.org/jira/browse/KAFKA-4358
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.10.0.1
Reporter: Nelson Elhage
Priority: Minor


When a broker handles a `LeaderAndIsr` request, the replica manager blocks 
waiting for idle replication fetcher threads to die before responding to the 
message and being able to service new produce requests.

If requests to a broker start blackholing (e.g. due to network failure, or due 
to the broker hanging), shutting down the `ReplicaFetcherThread` can take a 
long time (around 30s in my testing), blocking recovery of any partitions 
previously lead by that broker.

This is a very similar issue to KAFKA-612.

Instructions to reproduce/demonstrate:

Stand up three brokers and create a replicated topic:

{code}
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &
bin/kafka-server-start.sh config/server3.properties &

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
--partitions 1 --topic replicated.topic
{code}

Identify the leader, and (for simplicity in interpreting the event) make sure 
it's not the same as the cluster controller:

{code}
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic 
replicated.topic
{code}

Start a stream of produce events (with a shortened timeout so we get faster 
visibility into when the cluster recovers):

{code}
echo request.timeout.ms=1000 >> config/producer.properties
bin/kafka-verifiable-producer.sh --throughput 2 --topic replicated.topic 
--broker-list localhost:9092 --producer.config $(pwd)/config/producer.properties
{code}

Now SIGSTOP the leader (3, in my example):

{code}
kill -STOP $(pgrep -f server3.properties)
{code}

The producer will log errors for about 30 seconds, and then recover. However, 
if we read logs, we'll see (excerpting key log lines from `state-change.log`):

{code}
[2016-10-28 20:36:03,128] TRACE Controller 2 epoch 8 sending become-leader 
LeaderAndIsr request (Leader:2,ISR:2,1,LeaderEpoch:22,ControllerEpoch:8) to 
broker 2 for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:36:03,131] TRACE Broker 2 handling LeaderAndIsr request 
correlationId 2 from controller 2 epoch 8 starting the become-leader transition 
for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:17,741] TRACE Controller 1 epoch 11 changed partition 
[replicated.topic,0] from OnlinePartition to OnlinePartition with leader 3 
(state.change.logger)
[2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 changed partition 
[replicated.topic,0] state from OnlinePartition to OfflinePartition 
(state.change.logger)
[2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 started leader election 
for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,016] TRACE Controller 1 epoch 11 elected leader 2 for 
Offline partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 changed partition 
[replicated.topic,0] from OfflinePartition to OnlinePartition with leader 2 
(state.change.logger)
[2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 sending become-leader 
LeaderAndIsr request (Leader:2,ISR:1,2,LeaderEpoch:30,ControllerEpoch:11) to 
broker 2 for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,023] TRACE Broker 2 received LeaderAndIsr request 
PartitionState(controllerEpoch=11, leader=2, leaderEpoch=30, isr=[1, 2], 
zkVersion=46, replicas=[1, 2, 3]) correlation id 18 from controller 1 epoch 11 
for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,024] TRACE Broker 2 handling LeaderAndIsr request 
correlationId 18 from controller 1 epoch 11 starting the become-leader 
transition for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,026] TRACE Broker 2 stopped fetchers as part of 
become-leader request from controller 1 epoch 11 with correlation id 18 for 
partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,026] TRACE Broker 2 completed LeaderAndIsr request 
correlationId 18 from controller 1 epoch 11 for the become-leader transition 
for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:56,058] TRACE Controller 1 epoch 11 received response 
{error_code=0,partitions=[{topic=replicated.topic,partition=0,error_code=0}]} 
for a request sent to broker qa-dev1.northwest.stripe.io:9093 (id: 2 rack: 
null) (state.change.logger)
{code}

Note the ~23s pause between broker 2 logging completion of the LeaderAndIsr 
reques

[jira] [Commented] (KAFKA-4357) Consumer group describe exception when there is no active member (old consumer)

2016-10-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15616669#comment-15616669
 ] 

ASF GitHub Bot commented on KAFKA-4357:
---

GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/2075

KAFKA-4357: Fix consumer group describe output when there is no active 
member (old consumer)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-4357

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2075.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2075






> Consumer group describe exception when there is no active member (old 
> consumer)
> ---
>
> Key: KAFKA-4357
> URL: https://issues.apache.org/jira/browse/KAFKA-4357
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> If the consumer group that is based on old consumer has no active member the 
> following error/exception is raised:
> {code}
> Error: Executing consumer group command failed due to Expected a valid 
> consumer group state, but none found.
> {code}
> The command should instead report the existing offsets within the group (with 
> no data under member column).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2075: KAFKA-4357: Fix consumer group describe output whe...

2016-10-28 Thread vahidhashemian
GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/2075

KAFKA-4357: Fix consumer group describe output when there is no active 
member (old consumer)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-4357

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2075.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2075






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Work started] (KAFKA-4357) Consumer group describe exception when there is no active member (old consumer)

2016-10-28 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4357 started by Vahid Hashemian.
--
> Consumer group describe exception when there is no active member (old 
> consumer)
> ---
>
> Key: KAFKA-4357
> URL: https://issues.apache.org/jira/browse/KAFKA-4357
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> If the consumer group that is based on old consumer has no active member the 
> following error/exception is raised:
> {code}
> Error: Executing consumer group command failed due to Expected a valid 
> consumer group state, but none found.
> {code}
> The command should instead report the existing offsets within the group (with 
> no data under member column).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4357) Consumer group describe exception when there is no active member (old consumer)

2016-10-28 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-4357:
--

 Summary: Consumer group describe exception when there is no active 
member (old consumer)
 Key: KAFKA-4357
 URL: https://issues.apache.org/jira/browse/KAFKA-4357
 Project: Kafka
  Issue Type: Bug
Reporter: Vahid Hashemian
Assignee: Vahid Hashemian


If the consumer group that is based on old consumer has no active member the 
following error/exception is raised:
{code}
Error: Executing consumer group command failed due to Expected a valid consumer 
group state, but none found.
{code}

The command should instead report the existing offsets within the group (with 
no data under member column).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3853) Report offsets for empty groups in ConsumerGroupCommand

2016-10-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15616482#comment-15616482
 ] 

ASF GitHub Bot commented on KAFKA-3853:
---

GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/2074

KAFKA-3853: Report offsets for empty groups in consumer group describe 
command (new consumer)

When there are no active new consumers in a consumer group report the 
offsets within the group instead of reporting that the group has no active 
members.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3853

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2074.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2074


commit b2ab9124b87ea93890e4391f3244a79dd9838db7
Author: Vahid Hashemian 
Date:   2016-10-28T20:13:09Z

KAFKA-3853: Report offsets for empty groups in consumer group describe 
command (new consumer)

When there are no active new consumers in a consumer group report the 
offsets within the group instead of reporting that the group has no active 
members.




> Report offsets for empty groups in ConsumerGroupCommand
> ---
>
> Key: KAFKA-3853
> URL: https://issues.apache.org/jira/browse/KAFKA-3853
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>
> We ought to be able to display offsets for groups which either have no active 
> members or which are not using group management. The owner column can be left 
> empty or set to "N/A". If a group is active, I'm not sure it would make sense 
> to report all offsets, in particular when partitions are unassigned, but if 
> it seems problematic to do so, we could enable the behavior with a flag (e.g. 
> --include-unassigned).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2074: KAFKA-3853: Report offsets for empty groups in con...

2016-10-28 Thread vahidhashemian
GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/2074

KAFKA-3853: Report offsets for empty groups in consumer group describe 
command (new consumer)

When there are no active new consumers in a consumer group report the 
offsets within the group instead of reporting that the group has no active 
members.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3853

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2074.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2074


commit b2ab9124b87ea93890e4391f3244a79dd9838db7
Author: Vahid Hashemian 
Date:   2016-10-28T20:13:09Z

KAFKA-3853: Report offsets for empty groups in consumer group describe 
command (new consumer)

When there are no active new consumers in a consumer group report the 
offsets within the group instead of reporting that the group has no active 
members.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Kafka client circuit breaker

2016-10-28 Thread Andrey Dyachkov
Hi Kafka developers :)

We use Kafka for around half a year as a main backend system for storing
events from big variety of internal microservices. In general It works
quite well and we experience issues only in some rare 'corner cases' where
we either run against a bug or do not understand why Kafka behaves in
particular manner. The fact Kafka is the main system is used under our
software If it fails our system will fail too. That's why I am writing to
ask about the solution which can help us tolerate different issues on Kafka
side. First of all could you tell me why doesn't Kafka client have the
solution for this kind of problems? Have you ever had thoughts about it?
Thank you in advance!
-- 

With great enthusiasm,
Andrey


[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15616185#comment-15616185
 ] 

Guozhang Wang commented on KAFKA-4355:
--

[~mihbor] Thanks for reporting this. A first note is that in {{Metadata}}, the 
{{topics}} set and {{cluster}} object are used in different ways: {{topics}} 
maintains all the topics that this client is interested in (for fetching, or 
producing), and the {{cluster}} object contains the metadata information 
obtained from the brokers. So it is possible that topic {{scheduler}} is in the 
{{topics}} set, indicating that the embedded consumer of the streams client is 
interested in fetching from this topic, whereas {{cluster}} does not have this 
topic in its map, indicating that this broker does not know this topic, 
probably because the brokers hosting this topic is not available during that 
period of time.

As for Streams, I think it should not throw an exception and fail when seeing 
this situation since it is likely to be transient, instead it could just move 
forward without assigning this topic any more, and expecting another rebalance 
to be triggered when this topic is back to be available.

Will fix this logic in {{DefaultPartitionGrouper}} for this JIRA.

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> org.apache.

[jira] [Created] (KAFKA-4356) o.a.k.common.utils.SystemTime.sleep() swallows InterruptedException

2016-10-28 Thread Shikhar Bhushan (JIRA)
Shikhar Bhushan created KAFKA-4356:
--

 Summary: o.a.k.common.utils.SystemTime.sleep() swallows 
InterruptedException
 Key: KAFKA-4356
 URL: https://issues.apache.org/jira/browse/KAFKA-4356
 Project: Kafka
  Issue Type: Bug
Reporter: Shikhar Bhushan
Priority: Minor


{{org.apache.kafka.common.utils.SystemTime.sleep()}} catches and ignores 
{{InterruptedException}}. When doing so normally the interruption state should 
still be restored with {{Thread.currentThread().interrupt()}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3772) MirrorMaker crashes on Corrupted Message

2016-10-28 Thread TUSHAR SHARAD MHASKAR (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15616151#comment-15616151
 ] 

TUSHAR SHARAD MHASKAR commented on KAFKA-3772:
--

I have seen this same issue with 0.9.0.0 version as well.

> MirrorMaker crashes on Corrupted Message
> 
>
> Key: KAFKA-3772
> URL: https://issues.apache.org/jira/browse/KAFKA-3772
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: James Ranson
>  Labels: mirror-maker
>
> We recently came across an issue where a message on our source kafka cluster 
> became corrupted. When MirrorMaker tried to consume this message, the thread 
> crashed and caused the entire process to also crash. Each time we attempted 
> to restart MM, it crashed on the same message. There is no information in the 
> MM logs about which message it was trying to consume (what topic, what 
> offset, etc). So the only way we were able to get past the issue was to go 
> into the zookeeper tree for our mirror consumer group and increment the 
> offset for every partition on every topic until the MM process could start 
> without crashing. This is not a tenable operational solution. MirrorMaker 
> should gracefully skip corrupt messages since they will never be able to be 
> replicated anyway.
> {noformat}2016-05-26 20:02:26,787 FATAL  MirrorMaker$MirrorMakerThread - [{}] 
> [mirrormaker-thread-3] Mirror maker thread failure due to
> kafka.message.InvalidMessageException: Message is corrupt (stored crc = 
> 33747148, computed crc = 3550736267)
>   at kafka.message.Message.ensureValid(Message.scala:167)
>   at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:101)
>   at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>   at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>   at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>   at 
> kafka.tools.MirrorMaker$MirrorMakerOldConsumer.hasData(MirrorMaker.scala:483)
>   at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:394)
> 2016-05-26 20:02:27,580 FATAL  MirrorMaker$MirrorMakerThread - [{}] 
> [mirrormaker-thread-3] Mirror maker thread exited abnormally, stopping the 
> whole mirror maker.{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3772) MirrorMaker crashes on Corrupted Message

2016-10-28 Thread TUSHAR SHARAD MHASKAR (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15616151#comment-15616151
 ] 

TUSHAR SHARAD MHASKAR edited comment on KAFKA-3772 at 10/28/16 6:20 PM:


I have seen this same issue with 0.9.0.0 Mirror Maker version as well.


was (Author: tushr1388):
I have seen this same issue with 0.9.0.0 version as well.

> MirrorMaker crashes on Corrupted Message
> 
>
> Key: KAFKA-3772
> URL: https://issues.apache.org/jira/browse/KAFKA-3772
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: James Ranson
>  Labels: mirror-maker
>
> We recently came across an issue where a message on our source kafka cluster 
> became corrupted. When MirrorMaker tried to consume this message, the thread 
> crashed and caused the entire process to also crash. Each time we attempted 
> to restart MM, it crashed on the same message. There is no information in the 
> MM logs about which message it was trying to consume (what topic, what 
> offset, etc). So the only way we were able to get past the issue was to go 
> into the zookeeper tree for our mirror consumer group and increment the 
> offset for every partition on every topic until the MM process could start 
> without crashing. This is not a tenable operational solution. MirrorMaker 
> should gracefully skip corrupt messages since they will never be able to be 
> replicated anyway.
> {noformat}2016-05-26 20:02:26,787 FATAL  MirrorMaker$MirrorMakerThread - [{}] 
> [mirrormaker-thread-3] Mirror maker thread failure due to
> kafka.message.InvalidMessageException: Message is corrupt (stored crc = 
> 33747148, computed crc = 3550736267)
>   at kafka.message.Message.ensureValid(Message.scala:167)
>   at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:101)
>   at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>   at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>   at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>   at 
> kafka.tools.MirrorMaker$MirrorMakerOldConsumer.hasData(MirrorMaker.scala:483)
>   at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:394)
> 2016-05-26 20:02:27,580 FATAL  MirrorMaker$MirrorMakerThread - [{}] 
> [mirrormaker-thread-3] Mirror maker thread exited abnormally, stopping the 
> whole mirror maker.{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15616118#comment-15616118
 ] 

Michal Borowiecki commented on KAFKA-4355:
--

Perhaps the DefultPartitionGrouper here:
https://github.com/apache/kafka/blob/e7663a306f40e9fcbc3096d17fb0f99fa3d11d1d/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java#L81
should instead of StreamsException throw a RetriableException?
AbstractCoordinator would then keep looping instead of re-throwing it:
https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L320

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
>   at 
> org.apache.kafka.streams.processor.int

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-28 Thread Mayuresh Gharat
I do agree with Guozhang on having applications request an external
service(admin) that talks to kafka, for trimming, in which case this
external service(admin) can check if its OK to send the trim request to
kafka brokers based on a certain conditions.
On broker side we can have authorization by way of ACLs may be, saying that
only this external admin service is allowed to call trim(). In this way we
can actually move the main decision making process out of core.

Thanks,

Mayuresh

On Fri, Oct 28, 2016 at 10:33 AM, Guozhang Wang  wrote:

> Yes trim() should be an admin API and, if security is concerned, it should
> be under admin authorization as well.
>
> For applications that needs this feature, it then boils down to the problem
> that they should request the authorization token from who operates Kafka
> before starting their app to use in their own client, which I think is a
> feasible requirement.
>
>
> Guozhang
>
>
> On Fri, Oct 28, 2016 at 9:42 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > Hi Guozhang,
> >
> > I agree that pushing out the complexity of coordination to the client
> > application makes it more simple for the broker in the sense that it does
> > not have to be the decision maker regarding when to trim and till what
> > offset. An I agree that if we go in this direction, providing an offset
> > parameter makes sense.
> >
> >
> > But since the main motivation for this seems like saving or reclaiming
> the
> > disk space on broker side, I am not 100% sure how good it is to rely on
> the
> > client application to be a good citizen and call the trim API.
> > Also I see the trim() api as more of an admin api rather than client API.
> >
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Fri, Oct 28, 2016 at 7:12 AM, Guozhang Wang 
> wrote:
> >
> > > Here are my thoughts:
> > >
> > > If there are indeed multiple consumer groups on the same topic that
> needs
> > > to coordinate, it is equally complex if the coordination is on the
> broker
> > > or among the applications themselves: for the latter case, you would
> > > imagine some coordination services used (like ZK) to register groups
> for
> > > that topic and let these groups agree upon the minimum offset that is
> > safe
> > > to trim for all of them; for the former case, we just need to move this
> > > coordination service into the broker side, which to me is not a good
> > design
> > > under the principle of making broker simple.
> > >
> > > And as we discussed, there are scenarios where the offset to trim is
> not
> > > necessarily dependent on the committed offsets, even if the topic is
> only
> > > consumed by a single consumer group and we do not need any
> coordination.
> > So
> > > I think it is appropriate to require an "offset parameter" in the trim
> > API.
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Fri, Oct 28, 2016 at 1:27 AM, Becket Qin 
> > wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > I think the trim() interface is generally useful. What I was
> wondering
> > is
> > > > the following:
> > > > if the user has multiple applications to coordinate, it seems simpler
> > for
> > > > the broker to coordinate instead of asking the applications to
> > coordinate
> > > > among themselves. If we let the broker do the coordination and do not
> > > want
> > > > to reuse committed offset for trim(), we kind of need something like
> > > > "offset for trim", which do not seems to be general enough to have.
> But
> > > if
> > > > there is a single application then we don't need to worry about the
> > > > coordination hence this is no longer a problem.
> > > >
> > > > The use cases for multiple consumer groups I am thinking of is some
> > kind
> > > of
> > > > fork in the DAG, i.e. one intermediate result stream used by multiple
> > > > downstream jobs. But that may not be a big deal if the processing is
> > > within
> > > > the same application.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > >
> > > >
> > > > On Tue, Oct 25, 2016 at 11:41 PM, Guozhang Wang 
> > > > wrote:
> > > >
> > > > > Hello Becket,
> > > > >
> > > > > I am not 100 percent sure I get your points, reading the first half
> > of
> > > > the
> > > > > paragraph I thought we were on the same page that "the committed
> > > offsets
> > > > > and the offsets the applications ( most likely the consumers) would
> > > like
> > > > to
> > > > > tell the brokers to trim to, could be totally different", but then
> > you
> > > > said
> > > > > "not sure if the requirement ... is general enough", which confused
> > me
> > > a
> > > > > bit :) Anyways, I think the consumer committed offsets should be
> > > > separated
> > > > > from whatever the proposed APIs for telling the brokers to safely
> > trim
> > > > > their logs since they will not be read any more. And Jun also made
> a
> > > good
> > > > > point about that regarding the replay scenarios, which also applies
> > for
> > > > > users who do not require the flexibility

[jira] [Commented] (KAFKA-4352) Failure in org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterReset

2016-10-28 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15616052#comment-15616052
 ] 

Matthias J. Sax commented on KAFKA-4352:


Thanks for the update [~vahid].

> Failure in 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterReset
> --
>
> Key: KAFKA-4352
> URL: https://issues.apache.org/jira/browse/KAFKA-4352
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 0.10.2.0
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>
> We have seen the following scenario happening frequently. It looks similar to 
> KAFKA-2768 which was thought to be fixed.
> {code}
> Stacktrace
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'version': java.nio.BufferUnderflowException
>   at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:106)
>   at kafka.admin.AdminClient$$anonfun$1.apply(AdminClient.scala:152)
>   at kafka.admin.AdminClient$$anonfun$1.apply(AdminClient.scala:151)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:151)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest$WaitUntilConsumerGroupGotClosed.conditionMet(ResetIntegrationTest.java:305)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:270)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterReset(ResetIntegrationTest.java:135)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMetho

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-28 Thread Guozhang Wang
Yes trim() should be an admin API and, if security is concerned, it should
be under admin authorization as well.

For applications that needs this feature, it then boils down to the problem
that they should request the authorization token from who operates Kafka
before starting their app to use in their own client, which I think is a
feasible requirement.


Guozhang


On Fri, Oct 28, 2016 at 9:42 AM, Mayuresh Gharat  wrote:

> Hi Guozhang,
>
> I agree that pushing out the complexity of coordination to the client
> application makes it more simple for the broker in the sense that it does
> not have to be the decision maker regarding when to trim and till what
> offset. An I agree that if we go in this direction, providing an offset
> parameter makes sense.
>
>
> But since the main motivation for this seems like saving or reclaiming the
> disk space on broker side, I am not 100% sure how good it is to rely on the
> client application to be a good citizen and call the trim API.
> Also I see the trim() api as more of an admin api rather than client API.
>
>
> Thanks,
>
> Mayuresh
>
> On Fri, Oct 28, 2016 at 7:12 AM, Guozhang Wang  wrote:
>
> > Here are my thoughts:
> >
> > If there are indeed multiple consumer groups on the same topic that needs
> > to coordinate, it is equally complex if the coordination is on the broker
> > or among the applications themselves: for the latter case, you would
> > imagine some coordination services used (like ZK) to register groups for
> > that topic and let these groups agree upon the minimum offset that is
> safe
> > to trim for all of them; for the former case, we just need to move this
> > coordination service into the broker side, which to me is not a good
> design
> > under the principle of making broker simple.
> >
> > And as we discussed, there are scenarios where the offset to trim is not
> > necessarily dependent on the committed offsets, even if the topic is only
> > consumed by a single consumer group and we do not need any coordination.
> So
> > I think it is appropriate to require an "offset parameter" in the trim
> API.
> >
> > Guozhang
> >
> >
> >
> >
> > On Fri, Oct 28, 2016 at 1:27 AM, Becket Qin 
> wrote:
> >
> > > Hey Guozhang,
> > >
> > > I think the trim() interface is generally useful. What I was wondering
> is
> > > the following:
> > > if the user has multiple applications to coordinate, it seems simpler
> for
> > > the broker to coordinate instead of asking the applications to
> coordinate
> > > among themselves. If we let the broker do the coordination and do not
> > want
> > > to reuse committed offset for trim(), we kind of need something like
> > > "offset for trim", which do not seems to be general enough to have. But
> > if
> > > there is a single application then we don't need to worry about the
> > > coordination hence this is no longer a problem.
> > >
> > > The use cases for multiple consumer groups I am thinking of is some
> kind
> > of
> > > fork in the DAG, i.e. one intermediate result stream used by multiple
> > > downstream jobs. But that may not be a big deal if the processing is
> > within
> > > the same application.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > > On Tue, Oct 25, 2016 at 11:41 PM, Guozhang Wang 
> > > wrote:
> > >
> > > > Hello Becket,
> > > >
> > > > I am not 100 percent sure I get your points, reading the first half
> of
> > > the
> > > > paragraph I thought we were on the same page that "the committed
> > offsets
> > > > and the offsets the applications ( most likely the consumers) would
> > like
> > > to
> > > > tell the brokers to trim to, could be totally different", but then
> you
> > > said
> > > > "not sure if the requirement ... is general enough", which confused
> me
> > a
> > > > bit :) Anyways, I think the consumer committed offsets should be
> > > separated
> > > > from whatever the proposed APIs for telling the brokers to safely
> trim
> > > > their logs since they will not be read any more. And Jun also made a
> > good
> > > > point about that regarding the replay scenarios, which also applies
> for
> > > > users who do not require the flexibility as you mentioned.
> > > >
> > > > Regarding the coordination complexity among applications themselves,
> my
> > > gut
> > > > feeling is that, in practice, this feature would be mostly used when
> > the
> > > > topic is solely consumed by only one group, and for cases where the
> > topic
> > > > is gonna be consumed by multiple groups, this feature would less
> likely
> > > be
> > > > applicable. And if there are indeed such cases, coordination cannot
> be
> > > > avoidable since otherwise how can a consumer group (hence a dev team
> /
> > > > project / etc) tell if the other group is OK with trimming the data?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > > On Tue, Oct 25, 2016 at 6:58 PM, Becket Qin 
> > > wrote:
> > > >
> > > > > The trim() interface would be useful in general. And I agree with
> > > > Guozhan

Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-10-28 Thread Xavier Léauté
>
> I kind of agree with James that it is a bit questionable how valuable any
> data in a delete marker can be since it will be deleted somewhat
> nondeterministically.
>

One could argue that even in normal topics, assuming a time-based log
retention policy is in place, any message will be deleted somewhat
nondeterministally, so why treat the compacted ones any differently? To me
at least, the retention setting for delete messages seems to be the
counterpart to the time-based retention setting for normal topics.

Currently the semantics of the messages are in the eye of the beholder--you
> can choose to interpret a stream as either being appends or revisions as
> you choose. This proposal is changing that so that the semantics are
> determined by the sender.


Let's imagine someone wanted to augment this stream to include audit logs
for each record update, e.g. which user made the change. One would want to
include that information as part of the message, and have the ability to
mark a deletion.

I don't think it changes the semantics in this case, you can still choose
to interpret the data as a stream of audit log entries (inserts), ignoring
the tombstone flag, or you can interpret it as a table modeling only the
latest version of each record. Whether a compacted or normal topic is used
shouldn't matter to the sender.


[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15615967#comment-15615967
 ] 

Michal Borowiecki commented on KAFKA-4355:
--

My first suspect so far is the ConsumerCoordinator.
In this line: 
https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L301
it sets topics on the metadata from subscriptions, which the debugger shows to 
contain the correct topic name.
4 lines later: 
https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L305
it calls client.ensureFreshMetadata(), which can override the topics list.

Debugger shows that in the problematic case in 
https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L313
 the passed metadata object already has an empty set of topics, while the 
subscriptions object contains the topic name.

So I think the topic was removed from the metadata in line 305. 

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoor

Jenkins build is back to normal : kafka-trunk-jdk8 #1009

2016-10-28 Thread Apache Jenkins Server
See 



Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-28 Thread Mayuresh Gharat
Hi Guozhang,

I agree that pushing out the complexity of coordination to the client
application makes it more simple for the broker in the sense that it does
not have to be the decision maker regarding when to trim and till what
offset. An I agree that if we go in this direction, providing an offset
parameter makes sense.


But since the main motivation for this seems like saving or reclaiming the
disk space on broker side, I am not 100% sure how good it is to rely on the
client application to be a good citizen and call the trim API.
Also I see the trim() api as more of an admin api rather than client API.


Thanks,

Mayuresh

On Fri, Oct 28, 2016 at 7:12 AM, Guozhang Wang  wrote:

> Here are my thoughts:
>
> If there are indeed multiple consumer groups on the same topic that needs
> to coordinate, it is equally complex if the coordination is on the broker
> or among the applications themselves: for the latter case, you would
> imagine some coordination services used (like ZK) to register groups for
> that topic and let these groups agree upon the minimum offset that is safe
> to trim for all of them; for the former case, we just need to move this
> coordination service into the broker side, which to me is not a good design
> under the principle of making broker simple.
>
> And as we discussed, there are scenarios where the offset to trim is not
> necessarily dependent on the committed offsets, even if the topic is only
> consumed by a single consumer group and we do not need any coordination. So
> I think it is appropriate to require an "offset parameter" in the trim API.
>
> Guozhang
>
>
>
>
> On Fri, Oct 28, 2016 at 1:27 AM, Becket Qin  wrote:
>
> > Hey Guozhang,
> >
> > I think the trim() interface is generally useful. What I was wondering is
> > the following:
> > if the user has multiple applications to coordinate, it seems simpler for
> > the broker to coordinate instead of asking the applications to coordinate
> > among themselves. If we let the broker do the coordination and do not
> want
> > to reuse committed offset for trim(), we kind of need something like
> > "offset for trim", which do not seems to be general enough to have. But
> if
> > there is a single application then we don't need to worry about the
> > coordination hence this is no longer a problem.
> >
> > The use cases for multiple consumer groups I am thinking of is some kind
> of
> > fork in the DAG, i.e. one intermediate result stream used by multiple
> > downstream jobs. But that may not be a big deal if the processing is
> within
> > the same application.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Tue, Oct 25, 2016 at 11:41 PM, Guozhang Wang 
> > wrote:
> >
> > > Hello Becket,
> > >
> > > I am not 100 percent sure I get your points, reading the first half of
> > the
> > > paragraph I thought we were on the same page that "the committed
> offsets
> > > and the offsets the applications ( most likely the consumers) would
> like
> > to
> > > tell the brokers to trim to, could be totally different", but then you
> > said
> > > "not sure if the requirement ... is general enough", which confused me
> a
> > > bit :) Anyways, I think the consumer committed offsets should be
> > separated
> > > from whatever the proposed APIs for telling the brokers to safely trim
> > > their logs since they will not be read any more. And Jun also made a
> good
> > > point about that regarding the replay scenarios, which also applies for
> > > users who do not require the flexibility as you mentioned.
> > >
> > > Regarding the coordination complexity among applications themselves, my
> > gut
> > > feeling is that, in practice, this feature would be mostly used when
> the
> > > topic is solely consumed by only one group, and for cases where the
> topic
> > > is gonna be consumed by multiple groups, this feature would less likely
> > be
> > > applicable. And if there are indeed such cases, coordination cannot be
> > > avoidable since otherwise how can a consumer group (hence a dev team /
> > > project / etc) tell if the other group is OK with trimming the data?
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Tue, Oct 25, 2016 at 6:58 PM, Becket Qin 
> > wrote:
> > >
> > > > The trim() interface would be useful in general. And I agree with
> > > Guozhang
> > > > that conceptually letting the application to decide when to delete
> the
> > > > messages is more intuitive and flexible.
> > > >
> > > > That said, I am not sure if putting coordination on the application
> > side
> > > is
> > > > the best option. At a high level, there are two things to be done:
> > > > 1. Coordinate among all the interested consumer groups.
> > > > 2. Telling the brokers to trim the log
> > > >
> > > > For (1), letting different applications coordinate among themselves
> is
> > > more
> > > > involved, and this logic may have to be implemented by different
> > > > applications. As Guozhang mentioned, the most intuitive way may be
> > > looking
> > > > a

[jira] [Created] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-4355:


 Summary: StreamThread intermittently dies with "Topic not found 
during partition assignment" when broker restarted
 Key: KAFKA-4355
 URL: https://issues.apache.org/jira/browse/KAFKA-4355
 Project: Kafka
  Issue Type: Bug
  Components: streams
 Environment: kafka 0.10.0.0
kafka 0.10.1.0
Reporter: Michal Borowiecki
Assignee: Guozhang Wang


When (a) starting kafka streams app before the broker or
(b) restarting the broker while the streams app is running:
the stream thread intermittently dies with "Topic not found during partition 
assignment" StreamsException.
This happens about between one in 5 or one in 10 times.
Stack trace:
{noformat}
Exception in thread "StreamThread-2" 
org.apache.kafka.streams.errors.StreamsException: Topic not found during 
partition assignment: scheduler
at 
org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
at 
org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
{noformat}

Our app has 2 streams in it, consuming from 2 different topics.
Sometimes the exception happens on both stream threads. Sometimes only on one 
of the stream threads.

The exception is preceded by:
{noformat}
[2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group 
pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,240] INFO [StreamThread-2] Marking the coordinator 
lp02485.openbet:19373 (id: 2147483647 rack: null) dead for group pool-scheduler 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,342] INFO [StreamThread-2] Discovered coordinator 
lp02485.openbet:19373 (id: 2147483647 rack: null) for group pool-scheduler. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,342] INFO [StreamThread-2] (Re-)joining group 
pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Completed validating internal topics in partition assignor 
(org.apache.kafka.streams.processor.internals.StreamPartitionAssi

[jira] [Updated] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-4355:
-
Environment: 
kafka 0.10.0.0
kafka 0.10.1.0

uname -a
Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
2016 x86_64 x86_64 x86_64 GNU/Linux

java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)


  was:
kafka 0.10.0.0
kafka 0.10.1.0


> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> {noformat}
> Our app has 2 streams in it, consuming from 2 different topics.
> Sometimes the exception happens on both stream threads. Sometimes only on one 

[jira] [Updated] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-4355:
-
Description: 
When (a) starting kafka streams app before the broker or
(b) restarting the broker while the streams app is running:
the stream thread intermittently dies with "Topic not found during partition 
assignment" StreamsException.
This happens about between one in 5 or one in 10 times.
Stack trace:
{noformat}
Exception in thread "StreamThread-2" 
org.apache.kafka.streams.errors.StreamsException: Topic not found during 
partition assignment: scheduler
at 
org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
at 
org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
{noformat}

Our app has 2 streams in it, consuming from 2 different topics.
Sometimes the exception happens on both stream threads. Sometimes only on one 
of the stream threads.

The exception is preceded by:
{noformat}
[2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group 
pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,240] INFO [StreamThread-2] Marking the coordinator 
lp02485.openbet:19373 (id: 2147483647 rack: null) dead for group pool-scheduler 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,342] INFO [StreamThread-2] Discovered coordinator 
lp02485.openbet:19373 (id: 2147483647 rack: null) for group pool-scheduler. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,342] INFO [StreamThread-2] (Re-)joining group 
pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Completed validating internal topics in partition assignor 
(org.apache.kafka.streams.processor.internals.StreamPartitionAssignor)
[2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Shutting down (org.apache.kafka.streams.processor.internals.StreamThread)
[2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Shutting down (org.apache.kafka.streams.processor.internals.Strea

[jira] [Updated] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-4355:
-
Affects Version/s: 0.10.1.0
   0.10.0.0

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> {noformat}
> Our app has 2 streams in it, consuming from 2 different topics.
> Sometimes the exception happens on both stream threads. Sometimes only on one 
> of the stream threads.
> The exception is preceded by:
> {noformat}
> [2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group 
> pool-scheduler 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2016-10-28 16:17:55,240] INFO [StreamThread-2] Marking the coordinator 
> lp02485.openbet:19373 (id: 2147483647 rack: null) dead for group 
> pool-scheduler 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2016-10-28 16:17:55,342] INFO [StreamThread-2] Discovered coordinator 
> lp02485.openbet:19373 (id: 2147483647 rack: null) for group pool-s

[VOTE] KIP-47 - Add timestamp-based log deletion policy

2016-10-28 Thread Joel Koshy
>
> - It seems that the consumer will need to write log.retention.min.timestamp
> periodically to zookeeper as dynamic configuration of the topic, so that
> broker can pick up log.retention.min.timestamp. However, this introduces
> dependency of consumer on zookeeper which is undesirable. Note that we have
>

We will be eliminating the need for manipulating topic configs directly in
ZK with the admin APIs in KIP-4



> log.retention.min.timestamp. However, it is not clear how client
> application can set the log.retention.min.timestamp to address the
> use-case. For example, if there are more than one consumer in the consumer
> group, which consumer(s) write log.retention.min.timestamp to zookeeper?

How does consumer determine the value of log.retention.min.timestamp?


I don't quite see the issue here: this is really up to the application to
handle and applies to the trim() approach as well.

>
> this KIP. And a malicious or misconfigured client can easily delete all
> messages of any topic. How do we address this problem so that operator
> won't have to worry about this?
>

The admin APIs are Kafka RPCs that can all be authorized.

BTW, I like Jun's solution of using offsets and IMO it works. Jun's
> solution would also address some problems above. Some ideas discussed in
> the thread of KIP-68 may help address some problems above.
>

I agree - the trim() approach evades the timestamp issue by dealing with
offsets directly (unless the user explicitly opts to look up offsets by
timestamp). Once we are convinced that this simpler approach can satisfy
the motivation for both this KIP as well as KIP-68 we should probably just
consolidate these as use-cases of a new KIP for the trim() API.


> On Mon, Oct 24, 2016 at 5:29 PM, Bill Warshaw  > wrote:
>
> > Hi Jun,
> >
> > Those are valid concerns.  For our particular use case, application
> events
> > triggering the timestamp update will never occur more than once an hour,
> > and we maintain a sliding window so that we don't delete messages too
> close
> > to what our consumers may be reading.
> > For more general use cases, developers will need to be aware of these
> > issues, and would need to write their application code with that in mind.
> >
> >
> > To your second point: I initially wanted to just have a trim() admin api.
> > I started implementing it, but ran into difficulties with synchronously
> > acknowledging to the calling code that all brokers had truncated the
> given
> > partitions.  It seemed like we would have to do something similar to how
> > topic deletion is implemented, where the initial broker uses Zookeeper to
> > coordinate the deletion on the other brokers.  If you have a simpler idea
> > in mind, I'd be happy to update this KIP to provide a trim() api instead.
> >
> > On Mon, Oct 24, 2016 at 8:15 PM Jun Rao  > wrote:
> >
> > > Hi, Bill,
> > >
> > > Thanks for the proposal. Sorry for the late reply.
> > >
> > > The motivation of the proposal makes sense: don't delete the messages
> > until
> > > the application tells you so.
> > >
> > > I am wondering if the current proposal is the best way to address the
> > need
> > > though. There are couple of issues that I saw with the proposal. (1)
> > > Messages in the log may not always be stored in increasing timestamp
> > order.
> > > Suppose that the application sets log.retention.min.timestamp to T and
> > > after that messages with timestamp older than T ((either due to delay
> or
> > > reprocessing) are published to that topic. Those newly published
> messages
> > > are likely going to be deleted immediately before the application gets
> a
> > > chance to read them, which is probably not what the application wants.
> > (2)
> > > The configuration for the topic has to be changed continuously to
> > implement
> > > the use case. Intuitively, one probably shouldn't be changing a
> > > configuration all the time.
> > >
> > > Another way to achieve the goal is what Jay mentioned earlier. We could
> > add
> > > a trim() api like the following that will trim the log up to the
> > specified
> > > offsets. This addresses both of the above issues that I mentioned. Will
> > > that work for you?
> > >
> > > void trim(Map offsetsToTruncate)
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Oct 5, 2016 at 1:55 PM, Bill Warshaw  >
> > wrote:
> > >
> > > > Bumping for visibility.  KIP is here:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 47+-+Add+timestamp-based+log+deletion+policy
> > > >
> > > > On Wed, Aug 24, 2016 at 2:32 PM Bill Warshaw  >
> > > wrote:
> > > >
> > > > > Hello Guozhang,
> > > > >
> > > > > KIP-71 seems unrelated to this KIP.  KIP-47 is just adding a new
> > > deletion
> > > > > policy (minimum timestamp), while KIP-71 is allowing deletion and
> > > > > compaction to 

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-28 Thread Guozhang Wang
Here are my thoughts:

If there are indeed multiple consumer groups on the same topic that needs
to coordinate, it is equally complex if the coordination is on the broker
or among the applications themselves: for the latter case, you would
imagine some coordination services used (like ZK) to register groups for
that topic and let these groups agree upon the minimum offset that is safe
to trim for all of them; for the former case, we just need to move this
coordination service into the broker side, which to me is not a good design
under the principle of making broker simple.

And as we discussed, there are scenarios where the offset to trim is not
necessarily dependent on the committed offsets, even if the topic is only
consumed by a single consumer group and we do not need any coordination. So
I think it is appropriate to require an "offset parameter" in the trim API.

Guozhang




On Fri, Oct 28, 2016 at 1:27 AM, Becket Qin  wrote:

> Hey Guozhang,
>
> I think the trim() interface is generally useful. What I was wondering is
> the following:
> if the user has multiple applications to coordinate, it seems simpler for
> the broker to coordinate instead of asking the applications to coordinate
> among themselves. If we let the broker do the coordination and do not want
> to reuse committed offset for trim(), we kind of need something like
> "offset for trim", which do not seems to be general enough to have. But if
> there is a single application then we don't need to worry about the
> coordination hence this is no longer a problem.
>
> The use cases for multiple consumer groups I am thinking of is some kind of
> fork in the DAG, i.e. one intermediate result stream used by multiple
> downstream jobs. But that may not be a big deal if the processing is within
> the same application.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Tue, Oct 25, 2016 at 11:41 PM, Guozhang Wang 
> wrote:
>
> > Hello Becket,
> >
> > I am not 100 percent sure I get your points, reading the first half of
> the
> > paragraph I thought we were on the same page that "the committed offsets
> > and the offsets the applications ( most likely the consumers) would like
> to
> > tell the brokers to trim to, could be totally different", but then you
> said
> > "not sure if the requirement ... is general enough", which confused me a
> > bit :) Anyways, I think the consumer committed offsets should be
> separated
> > from whatever the proposed APIs for telling the brokers to safely trim
> > their logs since they will not be read any more. And Jun also made a good
> > point about that regarding the replay scenarios, which also applies for
> > users who do not require the flexibility as you mentioned.
> >
> > Regarding the coordination complexity among applications themselves, my
> gut
> > feeling is that, in practice, this feature would be mostly used when the
> > topic is solely consumed by only one group, and for cases where the topic
> > is gonna be consumed by multiple groups, this feature would less likely
> be
> > applicable. And if there are indeed such cases, coordination cannot be
> > avoidable since otherwise how can a consumer group (hence a dev team /
> > project / etc) tell if the other group is OK with trimming the data?
> >
> >
> > Guozhang
> >
> >
> >
> > On Tue, Oct 25, 2016 at 6:58 PM, Becket Qin 
> wrote:
> >
> > > The trim() interface would be useful in general. And I agree with
> > Guozhang
> > > that conceptually letting the application to decide when to delete the
> > > messages is more intuitive and flexible.
> > >
> > > That said, I am not sure if putting coordination on the application
> side
> > is
> > > the best option. At a high level, there are two things to be done:
> > > 1. Coordinate among all the interested consumer groups.
> > > 2. Telling the brokers to trim the log
> > >
> > > For (1), letting different applications coordinate among themselves is
> > more
> > > involved, and this logic may have to be implemented by different
> > > applications. As Guozhang mentioned, the most intuitive way may be
> > looking
> > > at the committed offset for each of the groups. But the applications
> may
> > > still need to coordinate among themselves to avoid split brains issues.
> > If
> > > there are many consumers from different applications, the brokers may
> > > potentially see a lot of offset queries. So, while letting the consumer
> > > groups coordinate among themselves provides flexibility, it doesn't
> look
> > > simpler overall. There seems a trade off between easiness of use and
> > > flexibility. For people who require flexibility, consumer side
> > coordination
> > > + trim() interface is the way to go. But for people who don't require
> > that,
> > > committed offset based retention seems simpler and does not need any
> > client
> > > side code change.
> > >
> > > For (2), in the current approach, the consumers tell the broker their
> > > positions by committing offsets. If we use trim(), it would be more
> > 

[GitHub] kafka pull request #2058: kafka-4295: ConsoleConsumer does not delete the te...

2016-10-28 Thread amethystic
Github user amethystic closed the pull request at:

https://github.com/apache/kafka/pull/2058


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Build failed in Jenkins: kafka-trunk-jdk7 #1660

2016-10-28 Thread Apache Jenkins Server
See 

Changes:

[ismael] KAFKA-4326; Refactor LogCleaner for better reuse of common 
copy/compress

--
[...truncated 3907 lines...]

kafka.api.AuthorizerIntegrationTest > testAuthorizationWithTopicNotExisting 
STARTED

kafka.api.AuthorizerIntegrationTest > testAuthorizationWithTopicNotExisting 
PASSED

kafka.api.AuthorizerIntegrationTest > testListOffsetsWithTopicDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testListOffsetsWithTopicDescribe PASSED

kafka.api.AuthorizerIntegrationTest > testUnauthorizedDeleteWithDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testUnauthorizedDeleteWithDescribe PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicAndGroupRead STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicAndGroupRead PASSED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionNotMatchingInternalTopic STARTED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionNotMatchingInternalTopic PASSED

kafka.api.AuthorizerIntegrationTest > testUnauthorizedDeleteWithoutDescribe 
STARTED

kafka.api.AuthorizerIntegrationTest > testUnauthorizedDeleteWithoutDescribe 
PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicWrite STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicWrite PASSED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoGroupAccess STARTED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoGroupAccess PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoAccess STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoAccess PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithoutTopicDescribeAccess 
STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithoutTopicDescribeAccess 
PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoGroupAccess STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoGroupAccess PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithNoAccess STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithNoAccess PASSED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithTopicAndGroupRead 
STARTED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithTopicAndGroupRead 
PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithTopicDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithTopicDescribe PASSED

kafka.api.AuthorizerIntegrationTest > testAuthorizationWithTopicExisting STARTED

kafka.api.AuthorizerIntegrationTest > testAuthorizationWithTopicExisting PASSED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicDescribe PASSED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionMatchingInternalTopic STARTED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionMatchingInternalTopic PASSED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchTopicDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchTopicDescribe PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithTopicAndGroupRead STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithTopicAndGroupRead PASSED

kafka.api.AuthorizerIntegrationTest > 
testSimpleConsumeWithExplicitSeekAndNoGroupAccess STARTED

kafka.api.AuthorizerIntegrationTest > 
testSimpleConsumeWithExplicitSeekAndNoGroupAccess PASSED

kafka.api.SaslPlainPlaintextConsumerTest > testCoordinatorFailover STARTED

kafka.api.SaslPlainPlaintextConsumerTest > testCoordinatorFailover PASSED

kafka.api.SaslPlainPlaintextConsumerTest > testSimpleConsumption STARTED

kafka.api.SaslPlainPlaintextConsumerTest > testSimpleConsumption PASSED

kafka.api.RequestResponseSerializationTest > 
testSerializationAndDeserialization STARTED

kafka.api.RequestResponseSerializationTest > 
testSerializationAndDeserialization PASSED

kafka.api.RequestResponseSerializationTest > testFetchResponseVersion STARTED

kafka.api.RequestResponseSerializationTest > testFetchResponseVersion PASSED

kafka.api.RequestResponseSerializationTest > testProduceResponseVersion STARTED

kafka.api.RequestResponseSerializationTest > testProduceResponseVersion PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaSubscribe STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaSubscribe PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > testProduceConsumeViaAssign 
STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > testProduceConsumeViaAssign 
PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaAssign STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaAssign PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaSubscribe STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
t

[jira] [Created] (KAFKA-4354) System test failure: test_rolling_upgrade_sasl_mechanism_phase_two.new_sasl_mechanism=PLAIN

2016-10-28 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-4354:
--

 Summary: System test failure: 
test_rolling_upgrade_sasl_mechanism_phase_two.new_sasl_mechanism=PLAIN
 Key: KAFKA-4354
 URL: https://issues.apache.org/jira/browse/KAFKA-4354
 Project: Kafka
  Issue Type: Bug
Reporter: Ismael Juma


First time I am seeing this failure although not sure if it's due to a code 
change or just an environmental issue.

{code}
test_id:
2016-10-28--001.kafkatest.tests.core.security_rolling_upgrade_test.TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two.new_sasl_mechanism=PLAIN
status: FAIL
run time:   5 minutes 50.440 seconds


1341 acked message did not make it to the Consumer. They are: 16384, 16385, 
16386, 16387, 16388, 16389, 16390, 16391, 16392, 16393, 16394, 16395, 15055, 
15056, 15057, 15058, 15059, 15060, 15061, 15062...plus 1321 more. Total Acked: 
12394, Total Consumed: 11053. We validated that the first 1000 of these missing 
messages correctly made it into Kafka's data files. This suggests they were 
lost on their way to the consumer.(There are also 173 duplicate messages in the 
log - but that is an acceptable outcome)

Traceback (most recent call last):
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py",
 line 106, in run_all_tests
data = self.run_single_test()
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py",
 line 162, in run_single_test
return self.current_test_context.function(self.current_test)
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape/mark/_mark.py",
 line 331, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/core/security_rolling_upgrade_test.py",
 line 189, in test_rolling_upgrade_sasl_mechanism_phase_two
self.run_produce_consume_validate(self.roll_in_sasl_mechanism, 
self.kafka.security_protocol, new_sasl_mechanism)
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/produce_consume_validate.py",
 line 105, in run_produce_consume_validate
raise e
AssertionError: 1341 acked message did not make it to the Consumer. They are: 
16384, 16385, 16386, 16387, 16388, 16389, 16390, 16391, 16392, 16393, 16394, 
16395, 15055, 15056, 15057, 15058, 15059, 15060, 15061, 15062...plus 1321 more. 
Total Acked: 12394, Total Consumed: 11053. We validated that the first 1000 of 
these missing messages correctly made it into Kafka's data files. This suggests 
they were lost on their way to the consumer.(There are also 173 duplicate 
messages in the log - but that is an acceptable outcome)
{code}

https://jenkins.confluent.io/job/system-test-kafka/392/console
http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-10-28--001.1477655548--apache--trunk--34e9cc5/report.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4351) Topic regex behavioral change with MirrorMaker new consumer

2016-10-28 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4351:
---
Fix Version/s: 0.10.2.0

> Topic regex behavioral change with MirrorMaker new consumer
> ---
>
> Key: KAFKA-4351
> URL: https://issues.apache.org/jira/browse/KAFKA-4351
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.9.0.1
>Reporter: Dustin Cote
>Assignee: huxi
>Priority: Minor
> Fix For: 0.10.2.0
>
>
> There is a behavioral change when you use MirrorMaker with the new consumer 
> versus the old consumer with respect to regex parsing of the input topic 
> list.  If one combines a regex with a comma separated list for the topic list 
> with old consumer implementation of MirrorMaker, it works ok.  Example:
> {code}
> --whitelist '.+(my|mine),topic,anothertopic'
> {code}
> If you pass this in with the new consumer implementation, no topics are 
> found.  The workaround is to use something like:
> {code}
> --whitelist '.+(my|mine)|topic|anothertopic'
> {code}
> We should make an effort to be consistent between the two implementations as 
> it's a bit surprising when migrating. I've verified the problem exists on 
> 0.9.0.1 but I don't see at first any fix to the problem in later versions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4351) Topic regex behavioral change with MirrorMaker new consumer

2016-10-28 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4351:
---
Status: Patch Available  (was: Reopened)

> Topic regex behavioral change with MirrorMaker new consumer
> ---
>
> Key: KAFKA-4351
> URL: https://issues.apache.org/jira/browse/KAFKA-4351
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.9.0.1
>Reporter: Dustin Cote
>Assignee: huxi
>Priority: Minor
> Fix For: 0.10.2.0
>
>
> There is a behavioral change when you use MirrorMaker with the new consumer 
> versus the old consumer with respect to regex parsing of the input topic 
> list.  If one combines a regex with a comma separated list for the topic list 
> with old consumer implementation of MirrorMaker, it works ok.  Example:
> {code}
> --whitelist '.+(my|mine),topic,anothertopic'
> {code}
> If you pass this in with the new consumer implementation, no topics are 
> found.  The workaround is to use something like:
> {code}
> --whitelist '.+(my|mine)|topic|anothertopic'
> {code}
> We should make an effort to be consistent between the two implementations as 
> it's a bit surprising when migrating. I've verified the problem exists on 
> 0.9.0.1 but I don't see at first any fix to the problem in later versions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4326) Refactor LogCleaner to remove duplicate log copying logic

2016-10-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15615052#comment-15615052
 ] 

ASF GitHub Bot commented on KAFKA-4326:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2053


> Refactor LogCleaner to remove duplicate log copying logic
> -
>
> Key: KAFKA-4326
> URL: https://issues.apache.org/jira/browse/KAFKA-4326
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
> Fix For: 0.10.2.0
>
>
> I think there's some code duplication in the log cleaner with respect to the 
> copying of the log and re-compression after cleaning. We have similar logic 
> already in {{ByteBufferMessageSet}} which we can potentially reuse. This 
> improves encapsulation so that message format changes in the future become a 
> little easier (since they touch fewer components).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4326) Refactor LogCleaner to remove duplicate log copying logic

2016-10-28 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-4326.

   Resolution: Fixed
Fix Version/s: 0.10.2.0

Issue resolved by pull request 2053
[https://github.com/apache/kafka/pull/2053]

> Refactor LogCleaner to remove duplicate log copying logic
> -
>
> Key: KAFKA-4326
> URL: https://issues.apache.org/jira/browse/KAFKA-4326
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
> Fix For: 0.10.2.0
>
>
> I think there's some code duplication in the log cleaner with respect to the 
> copying of the log and re-compression after cleaning. We have similar logic 
> already in {{ByteBufferMessageSet}} which we can potentially reuse. This 
> improves encapsulation so that message format changes in the future become a 
> little easier (since they touch fewer components).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2053: KAFKA-4326: Refactor LogCleaner for better reuse o...

2016-10-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2053


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2073: MINOR: Fix issue in `AsyncProducerTest` where it e...

2016-10-28 Thread ijuma
GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/2073

MINOR: Fix issue in `AsyncProducerTest` where it expects the `port` config 
to be set

This test fails locally when I run it, but somehow Jenkins builds are 
passed. Not clear how.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka async-producer-test-port-config

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2073.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2073


commit 150c3890a4d41c1f8e9ea9c6d7ff07ba3630bbeb
Author: Ismael Juma 
Date:   2016-10-28T09:40:28Z

Fix issue in `AsyncProducerTest` where it expects the `port` config to be 
set




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-28 Thread Becket Qin
Hey Guozhang,

I think the trim() interface is generally useful. What I was wondering is
the following:
if the user has multiple applications to coordinate, it seems simpler for
the broker to coordinate instead of asking the applications to coordinate
among themselves. If we let the broker do the coordination and do not want
to reuse committed offset for trim(), we kind of need something like
"offset for trim", which do not seems to be general enough to have. But if
there is a single application then we don't need to worry about the
coordination hence this is no longer a problem.

The use cases for multiple consumer groups I am thinking of is some kind of
fork in the DAG, i.e. one intermediate result stream used by multiple
downstream jobs. But that may not be a big deal if the processing is within
the same application.

Thanks,

Jiangjie (Becket) Qin



On Tue, Oct 25, 2016 at 11:41 PM, Guozhang Wang  wrote:

> Hello Becket,
>
> I am not 100 percent sure I get your points, reading the first half of the
> paragraph I thought we were on the same page that "the committed offsets
> and the offsets the applications ( most likely the consumers) would like to
> tell the brokers to trim to, could be totally different", but then you said
> "not sure if the requirement ... is general enough", which confused me a
> bit :) Anyways, I think the consumer committed offsets should be separated
> from whatever the proposed APIs for telling the brokers to safely trim
> their logs since they will not be read any more. And Jun also made a good
> point about that regarding the replay scenarios, which also applies for
> users who do not require the flexibility as you mentioned.
>
> Regarding the coordination complexity among applications themselves, my gut
> feeling is that, in practice, this feature would be mostly used when the
> topic is solely consumed by only one group, and for cases where the topic
> is gonna be consumed by multiple groups, this feature would less likely be
> applicable. And if there are indeed such cases, coordination cannot be
> avoidable since otherwise how can a consumer group (hence a dev team /
> project / etc) tell if the other group is OK with trimming the data?
>
>
> Guozhang
>
>
>
> On Tue, Oct 25, 2016 at 6:58 PM, Becket Qin  wrote:
>
> > The trim() interface would be useful in general. And I agree with
> Guozhang
> > that conceptually letting the application to decide when to delete the
> > messages is more intuitive and flexible.
> >
> > That said, I am not sure if putting coordination on the application side
> is
> > the best option. At a high level, there are two things to be done:
> > 1. Coordinate among all the interested consumer groups.
> > 2. Telling the brokers to trim the log
> >
> > For (1), letting different applications coordinate among themselves is
> more
> > involved, and this logic may have to be implemented by different
> > applications. As Guozhang mentioned, the most intuitive way may be
> looking
> > at the committed offset for each of the groups. But the applications may
> > still need to coordinate among themselves to avoid split brains issues.
> If
> > there are many consumers from different applications, the brokers may
> > potentially see a lot of offset queries. So, while letting the consumer
> > groups coordinate among themselves provides flexibility, it doesn't look
> > simpler overall. There seems a trade off between easiness of use and
> > flexibility. For people who require flexibility, consumer side
> coordination
> > + trim() interface is the way to go. But for people who don't require
> that,
> > committed offset based retention seems simpler and does not need any
> client
> > side code change.
> >
> > For (2), in the current approach, the consumers tell the broker their
> > positions by committing offsets. If we use trim(), it would be more
> > explicit. I am actually a little concerned about reusing the committed
> > offset for log retention. It essentially overloads the offset commit with
> > both checkpointing and consume-based log retention, which may not work
> when
> > people want to separate those two functions. People can use app side
> > coordination + trim() to workaround this issue. But I am not sure if that
> > the requirement of separating offset commit from consume-based log
> > retention is general enough to be addressed specifically.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Tue, Oct 25, 2016 at 3:00 PM, Joel Koshy  wrote:
> >
> > > +1 - I was thinking the exact same thing.
> > >
> > > On Tue, Oct 25, 2016 at 2:52 PM, Jun Rao  wrote:
> > >
> > > > One of the main reasons for retaining messages on the broker after
> > > > consumption is to support replay. A common reason for replay is to
> fix
> > > and
> > > > application error. So, it seems that it's a bit hard to delete log
> > > segments
> > > > just based on the committed offsets that the broker knows. An
> > alternative
> > > > approach is to support an api that can t

[jira] [Commented] (KAFKA-4351) Topic regex behavioral change with MirrorMaker new consumer

2016-10-28 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15614635#comment-15614635
 ] 

huxi commented on KAFKA-4351:
-

thanks for helping reopen the JIRA. yes, I incorrectly set the wrong status.

> Topic regex behavioral change with MirrorMaker new consumer
> ---
>
> Key: KAFKA-4351
> URL: https://issues.apache.org/jira/browse/KAFKA-4351
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.9.0.1
>Reporter: Dustin Cote
>Assignee: huxi
>Priority: Minor
>
> There is a behavioral change when you use MirrorMaker with the new consumer 
> versus the old consumer with respect to regex parsing of the input topic 
> list.  If one combines a regex with a comma separated list for the topic list 
> with old consumer implementation of MirrorMaker, it works ok.  Example:
> {code}
> --whitelist '.+(my|mine),topic,anothertopic'
> {code}
> If you pass this in with the new consumer implementation, no topics are 
> found.  The workaround is to use something like:
> {code}
> --whitelist '.+(my|mine)|topic|anothertopic'
> {code}
> We should make an effort to be consistent between the two implementations as 
> it's a bit surprising when migrating. I've verified the problem exists on 
> 0.9.0.1 but I don't see at first any fix to the problem in later versions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (KAFKA-4351) Topic regex behavioral change with MirrorMaker new consumer

2016-10-28 Thread Dustin Cote (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dustin Cote reopened KAFKA-4351:


Looks like this got marked resolved accidentally when the PR was made.  
Reopening but cc'ing [~huxi_2b].  Thanks for picking it up.

> Topic regex behavioral change with MirrorMaker new consumer
> ---
>
> Key: KAFKA-4351
> URL: https://issues.apache.org/jira/browse/KAFKA-4351
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.9.0.1
>Reporter: Dustin Cote
>Assignee: huxi
>Priority: Minor
>
> There is a behavioral change when you use MirrorMaker with the new consumer 
> versus the old consumer with respect to regex parsing of the input topic 
> list.  If one combines a regex with a comma separated list for the topic list 
> with old consumer implementation of MirrorMaker, it works ok.  Example:
> {code}
> --whitelist '.+(my|mine),topic,anothertopic'
> {code}
> If you pass this in with the new consumer implementation, no topics are 
> found.  The workaround is to use something like:
> {code}
> --whitelist '.+(my|mine)|topic|anothertopic'
> {code}
> We should make an effort to be consistent between the two implementations as 
> it's a bit surprising when migrating. I've verified the problem exists on 
> 0.9.0.1 but I don't see at first any fix to the problem in later versions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4348) On Mac OS, KafkaConsumer.poll returns 0 when there are still messages on Kafka server

2016-10-28 Thread Yiquan Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15614582#comment-15614582
 ] 

Yiquan Zhou commented on KAFKA-4348:


The issue occurs when there are large numbers of records on the server to be 
read. What I did for reproducing the issue is to run the verifiable producer 
(acks=1 by default I think) and wait until all 200k records or more to be sent 
("sent":20,"acked":20) and THEN start the consumer. The consumer will 
read the max number of records corresponding to "max.partition.fetch.bytes" at 
each poll, 0 during several seconds and then some more records.

If I start the consumer at the same time or before running the producer, the 
consumer will poll fewer records each time but continuously (no "poll 0 
records"). So it works in this case.

> On Mac OS, KafkaConsumer.poll returns 0 when there are still messages on 
> Kafka server
> -
>
> Key: KAFKA-4348
> URL: https://issues.apache.org/jira/browse/KAFKA-4348
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.1
> Environment: Mac OS X EI Capitan, Java 1.8.0_111
>Reporter: Yiquan Zhou
>  Labels: consumer, mac, polling
>
> Steps to reproduce:
> 1. start the zookeeper and kafka server using the default properties from the 
> distribution: 
> $ bin/zookeeper-server-start.sh config/zookeeper.properties
> $ bin/kafka-server-start.sh config/server.properties 
> 2. create a Kafka consumer using the Java API KafkaConsumer.poll(long 
> timeout). It polls the records from the server every second (timeout set to 
> 1000) and prints the number of records polled. The code can be found here: 
> https://gist.github.com/yiquanzhou/a94569a2c4ec8992444c83f3c393f596
> 3. use bin/kafka-verifiable-producer.sh to generate some messages: 
> $ bin/kafka-verifiable-producer.sh --topic connect-test --max-messages 20 
> --broker-list localhost:9092
> wait until all 200k messages are generated and sent to the server. 
> 4. Run the consumer Java code. In the output console of the consumer, we can 
> see that the consumer starts to poll some records, then it polls 0 records 
> for several seconds before polling some more. like this:
> polled 27160 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 26886 records
> polled 26886 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 26701 records
> polled 26214 records
> The bug slows down the consumption of messages a lot. And in our use case, 
> the consumer wrongly assumes that all messages are read from the topic.
> It is only reproducible on Mac OS X but neither on Linux nor Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-10-28 Thread James Cheng
Michael,

What information would you want to include on a delete tombstone, and what 
would you use it for?

-James

Sent from my iPhone

> On Oct 27, 2016, at 9:17 PM, Michael Pearce  wrote:
> 
> Hi Jay,
> 
> I think use case that is the issue that Konstantin mentioned in the kip-82 
> thread , and also we have at IG is clear use case.
> 
> Many companies are using message wrappers, these are useful because as per 
> kip-82 see their use cases (I don't think I need to re iterate the large list 
> here) many of these need the headers even on a null value.
> 
> The issue this though then causes is that you cannot send these messages onto 
> a compacted topic and ever have a delete/tombstone. And so companies are 
> doing things like double send one with an message envelope so it gets 
> transported followed by and empty message. Or by having a seperate process 
> looking for the empty envelope and pushing back an empty value record to make 
> the broker tombstone it off. As mentioned in the kip-82 thread these cause 
> nasty race issues and prod issues.
> 
> LinkedIn were also very clear in if you use compaction currently there they 
> cannot use their managed Kafka services that rely on their headers 
> implementation. This was flagged in the kip-82 discussion also.
> 
> For streams it would be fairly easy to keep its current behaviour by when 
> sending a null value to have logic to also add the delete marker. This would 
> be the same for any framework built on Kafka if their desire was to keep the 
> same logic spark and samza come to mind.
> 
> Like wise as noted, we could make this configurable globally and topic level 
> as per this thread where we are discussing the section about comparability 
> and rollout.
> 
> 
> 
> Rgds
> Mike
> 
> From: Jay Kreps 
> Sent: Thursday, October 27, 2016 10:54:45 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> 
> I kind of agree with James that it is a bit questionable how valuable any
> data in a delete marker can be since it will be deleted somewhat
> nondeterministically.
> 
> Let's definitely ensure the change is worth the resulting pain and
> additional complexity in the data model.
> 
> I think the two things we maybe conflated in the original compaction work
> was the semantics of the message and its retention policy (I'm not sure,
> but maybe).
> 
> In some sense a normal Kafka topic is a stream of pure appends (inserts). A
> compacted topic is a series of revisions to the keyed entity--updates or
> deletes.
> 
> Currently the semantics of the messages are in the eye of the beholder--you
> can choose to interpret a stream as either being appends or revisions as
> you choose. This proposal is changing that so that the semantics are
> determined by the sender.
> 
> So in Kafka Streams you could have a KTable of "customer account updates"
> which would model the latest version of each customer account record; you
> could also have a KStream which would model the stream of updates being
> made to customer account records. You can create either of these off the
> same topic---the semantics come from your interpretation not the data
> itself. Both of these interpretations actually make sense: if you want to
> count the number of accounts in a given geographical region you want to
> compute that off the KTable, if you want to count the number of account
> modifications you want to compute that off the KStream.
> 
> This proposal changes this slightly. Now we are saying the semantics of the
> message are set by the sender. I'm not sure if this is better or worse--it
> seems a little at odds with what we are doing in streams. If we are going
> to change it I wonder if there aren't actually three types of message
> {INSERT, UPDATE, DELETE}. (And by update I really mean "upsert"). Compacted
> topics only make sense if your topic contains only UPDATE and/or DELETE
> messages. "Normal" topics are pure inserts.
> 
> If we are making this change how does it effect streams? What happens if I
> send a DELETE message to a non-compacted topic where deletes are impossible?
> 
> -Jay
> 
> 
> 
> On Tue, Oct 25, 2016 at 9:09 AM, Michael Pearce 
> wrote:
> 
>> Hi All,
>> 
>> I would like to discuss the following KIP proposal:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 87+-+Add+Compaction+Tombstone+Flag
>> 
>> This is off the back of the discussion on KIP-82  / KIP meeting where it
>> was agreed to separate this issue and feature. See:
>> http://mail-archives.apache.org/mod_mbox/kafka-dev/201610.
>> mbox/%3cCAJS3ho8OcR==EcxsJ8OP99pD2hz=iiGecWsv-
>> EZsBsNyDcKr=g...@mail.gmail.com%3e
>> 
>> Thanks
>> Mike
>> 
>> The information contained in this email is strictly confidential and for
>> the use of the addressee only, unless otherwise indicated. If you are not
>> the intended recipient, please do not read, copy, use or disclose to others
>> this message or any attachment. Please a