[jira] [Commented] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread

2017-12-19 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16296552#comment-16296552
 ] 

huxihx commented on KAFKA-6366:
---

[~hachikuji] I've been thinking another situation which might happen after the 
Heartbeat thread threw StackOverflowError. 

Since it's of type 'Error', so `failed` in HeartbeatThread class will not be 
updated, therefore the thread will be mistakenly thought as normal instead of 
failed although it already exited. Do we need to set `failed` when facing an 
Error thrown?

> StackOverflowError in kafka-coordinator-heartbeat-thread
> 
>
> Key: KAFKA-6366
> URL: https://issues.apache.org/jira/browse/KAFKA-6366
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Joerg Heinicke
> Attachments: 6366.v1.txt
>
>
> With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing 
> once a StackOverflowError in the heartbeat thread occurred due to 
> connectivity issues of the consumers to the coordinating broker:
> Immediately before the exception there are hundreds, if not thousands of log 
> entries of following type:
> 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] INFO  - [Consumer clientId=consumer-4, 
> groupId=my-consumer-group] Marking the coordinator : (id: 
> 2147483645 rack: null) dead
> The exceptions always happen somewhere in the DateFormat code, even 
> though at different lines.
> 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] ERROR - Uncaught exception in thread 
> 'kafka-coordinator-heartbeat-thread | my-consumer-group':
> java.lang.StackOverflowError
>  at 
> java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362)
>  at 
> java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340)
>  at java.util.Calendar.getDisplayName(Calendar.java:2110)
>  at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
>  at java.text.DateFormat.format(DateFormat.java:345)
>  at 
> org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443)
>  at 
> org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65)
>  at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
>  at 
> org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
>  at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
>  at 
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
>  at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
>  at org.apache.log4j.Category.callAppenders(Category.java:206)
>  at org.apache.log4j.Category.forcedLog(Category.java:391)
>  at org.apache.log4j.Category.log(Category.java:856)
>  at 
> org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
> ...
> the following 9 lines are repeated around hundred times.
> ...
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)
>  

[jira] [Created] (KAFKA-6385) Rack awareness ignored by kafka-reassign-partitions

2017-12-19 Thread Gal Barak (JIRA)
Gal Barak created KAFKA-6385:


 Summary: Rack awareness ignored by kafka-reassign-partitions
 Key: KAFKA-6385
 URL: https://issues.apache.org/jira/browse/KAFKA-6385
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.0
 Environment: Ubuntu 16.04
Reporter: Gal Barak
Priority: Minor
 Attachments: actual.txt, topic-to-move.json

Hi,
It seems that the kafka-reassign-partitions script ignores rack awareness, when 
suggesting a new partition layout. Came across it when doing some initial 
testing with Kafka.

+To reproduce:+
#  Create a Kafka cluster with 3 brokers (1,2,3). Use 3 different racks 
(broker.rack definition. Example: "A", "B" and "C").
#* I used a non-root directory in zookeeper (i.e. - {{:2181,:2181,:2182/}})
#* The tested topic was automatically created, according to a default 
configuration of 12 partitions and 3 replicas per topic.
# Install a 4th broker, and assign it to the same rack as the 1st broker ("A").
# Create a topics-to-move.json file for a single topic. The file I used was 
uploaded as topic-to-move.json.
# Run the kafka-reassign-partitions script:
{{kafka-reassign-partitions --zookeeper :2181,:2181,:2182/ 
--topics-to-move-json-file  --broker-list "1,2,3,4" 
--generate}}

+Expected result:+
A suggested reassignment that makes sure that no partitions uses both broker 1 
and broker 4 as its replicas.

+Actual results of the command:+
The full result is attached as a file (actual.txt). It includes partitions with 
replicas that are on both brokers 1 and 4, which are two servers on the same 
rack.
Example: {"topic":"","partition":6,"replicas":[1,2,4]}

+Additional notes:+
* I did not test starting the cluster from scratch. The same behavior might be 
present when topic partitions are created automatically (in which case, the 
priority might be higher).
* I'm not sure it's related. But the original assignment seems to be 
problematic as well: If a single server (of the 3) failed, a different single 
server became the leader for all of its partitions. For example, if broker 1 
failed, server 2 became the leader for all of the partitions for which 1 was 
previously the leader, instead of having the load distributed evenly between 
brokers 2 and 3.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6365) How to add a client to list of available clients?

2017-12-19 Thread Lev Gorodinski (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16296819#comment-16296819
 ] 

Lev Gorodinski commented on KAFKA-6365:
---

Ping [~hachikuji] :)

> How to add a client to list of available clients?
> -
>
> Key: KAFKA-6365
> URL: https://issues.apache.org/jira/browse/KAFKA-6365
> Project: Kafka
>  Issue Type: Wish
>Reporter: Lev Gorodinski
>Priority: Trivial
>
> I'd like to add a client to: 
> https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET
> The client is: https://github.com/jet/kafunk
> .NET written in F# supports 0.8 0.9 0.10



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-6365) How to add a client to list of available clients?

2017-12-19 Thread Lev Gorodinski (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lev Gorodinski reopened KAFKA-6365:
---

Hi, looks I don't have edit permissions on 
https://cwiki.apache.org/confluence/display/KAFKA/Clients when I login using 
eulerfx.

> How to add a client to list of available clients?
> -
>
> Key: KAFKA-6365
> URL: https://issues.apache.org/jira/browse/KAFKA-6365
> Project: Kafka
>  Issue Type: Wish
>Reporter: Lev Gorodinski
>Priority: Trivial
>
> I'd like to add a client to: 
> https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET
> The client is: https://github.com/jet/kafunk
> .NET written in F# supports 0.8 0.9 0.10



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6317) Maven artifact for kafka should not depend on log4j

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16296943#comment-16296943
 ] 

ASF GitHub Bot commented on KAFKA-6317:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4297


> Maven artifact for kafka should not depend on log4j
> ---
>
> Key: KAFKA-6317
> URL: https://issues.apache.org/jira/browse/KAFKA-6317
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>
> It should only depend on slf4j-api. The release tarball should still depend 
> on log4j and slf4j-log4j12.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6320) move ZK metrics in KafkaHealthCheck to ZookeeperClient

2017-12-19 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-6320:
--

Assignee: Jun Rao

> move ZK metrics in KafkaHealthCheck to ZookeeperClient
> --
>
> Key: KAFKA-6320
> URL: https://issues.apache.org/jira/browse/KAFKA-6320
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 1.0.0
>Reporter: Jun Rao
>Assignee: Jun Rao
> Fix For: 1.1.0
>
>
> In KAFKA-5473, we will be de-commissioning the usage of KafkaHealthCheck. So, 
> we need to move the ZK metrics SessionState and ZooKeeper${eventType}PerSec 
> in that class to somewhere else (e.g. ZookeeperClient).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6365) How to add a client to list of available clients?

2017-12-19 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297108#comment-16297108
 ] 

Jason Gustafson commented on KAFKA-6365:


[~eulerfx] Sorry, I may have missed one of the permissions. Can you try again?

> How to add a client to list of available clients?
> -
>
> Key: KAFKA-6365
> URL: https://issues.apache.org/jira/browse/KAFKA-6365
> Project: Kafka
>  Issue Type: Wish
>Reporter: Lev Gorodinski
>Priority: Trivial
>
> I'd like to add a client to: 
> https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET
> The client is: https://github.com/jet/kafunk
> .NET written in F# supports 0.8 0.9 0.10



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6365) How to add a client to list of available clients?

2017-12-19 Thread Lev Gorodinski (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297133#comment-16297133
 ] 

Lev Gorodinski commented on KAFKA-6365:
---

Works now, thank you!

> How to add a client to list of available clients?
> -
>
> Key: KAFKA-6365
> URL: https://issues.apache.org/jira/browse/KAFKA-6365
> Project: Kafka
>  Issue Type: Wish
>Reporter: Lev Gorodinski
>Priority: Trivial
>
> I'd like to add a client to: 
> https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET
> The client is: https://github.com/jet/kafunk
> .NET written in F# supports 0.8 0.9 0.10



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6365) How to add a client to list of available clients?

2017-12-19 Thread Lev Gorodinski (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lev Gorodinski resolved KAFKA-6365.
---
Resolution: Fixed

> How to add a client to list of available clients?
> -
>
> Key: KAFKA-6365
> URL: https://issues.apache.org/jira/browse/KAFKA-6365
> Project: Kafka
>  Issue Type: Wish
>Reporter: Lev Gorodinski
>Priority: Trivial
>
> I'd like to add a client to: 
> https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET
> The client is: https://github.com/jet/kafunk
> .NET written in F# supports 0.8 0.9 0.10



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Issue Comment Deleted] (KAFKA-6328) Exclude node groups belonging to global stores in InternalTopologyBuilder#makeNodeGroups

2017-12-19 Thread Richard Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-6328:
--
Comment: was deleted

(was: [~guozhang] I have the minor suspicion that global node groups might 
actually be needed due to the failure of several tests in the QA for my PR. It 
might be that my approach is wrong. But having checked my code over, I did not 
spot anything that was out of place. So I think this should warrant some double 
checking.

Here is the test error:

{panel:title=Test Error for PR#4340}
java.lang.StackOverflowError
at java.util.HashSet.(HashSet.java:119)
at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.makeNodeGroups(InternalTopologyBuilder.java:813)
at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.nodeGroups(InternalTopologyBuilder.java:801)
at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.globalNodeGroups(InternalTopologyBuilder.java:882)
at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.makeNodeGroups(InternalTopologyBuilder.java:814)
at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.nodeGroups(InternalTopologyBuilder.java:801)
at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.globalNodeGroups(InternalTopologyBuilder.java:882)

{panel}

)

> Exclude node groups belonging to global stores in 
> InternalTopologyBuilder#makeNodeGroups
> 
>
> Key: KAFKA-6328
> URL: https://issues.apache.org/jira/browse/KAFKA-6328
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Guozhang Wang
>Assignee: Richard Yu
>  Labels: newbie
> Attachments: kafka-6328.diff
>
>
> Today when we group processor nodes into groups (i.e. sub-topologies), we 
> assign the sub-topology id for global tables' dummy groups as well. As a 
> result, the subtopology ids (and hence task ids) are not consecutive anymore. 
> This is quite confusing for users trouble shooting and debugging; in 
> addition, the node group for global stores are not useful as well: we simply 
> exclude it in all the caller functions of makeNodeGroups.
> It would be better to simply exclude the global store's node groups in this 
> function so that the subtopology ids and task ids are consecutive.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6386) Deprecate KafkaStreams constructor talking StreamsConfig parameter

2017-12-19 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-6386:
--

 Summary: Deprecate KafkaStreams constructor talking StreamsConfig 
parameter
 Key: KAFKA-6386
 URL: https://issues.apache.org/jira/browse/KAFKA-6386
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
Reporter: Matthias J. Sax
Priority: Minor


Currently, {{KafkaStreams}} constructor has overloads that take either 
{{Properties}} or {{StreamsConfig}} a parameters.

Because {{StreamsConfig}} is immutable and is created from a {{Properties}} 
object itself, the constructors accepting {{StreamsConfig}} are not useful and 
add only boiler plate code. Thus, we should deprecate those constructors in 
order to remove them eventually.

This JIRA includes a public API changes and thus requires a KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6386) Deprecate KafkaStreams constructor taking StreamsConfig parameter

2017-12-19 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6386:
---
Summary: Deprecate KafkaStreams constructor taking StreamsConfig parameter  
(was: Deprecate KafkaStreams constructor talking StreamsConfig parameter)

> Deprecate KafkaStreams constructor taking StreamsConfig parameter
> -
>
> Key: KAFKA-6386
> URL: https://issues.apache.org/jira/browse/KAFKA-6386
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, needs-kip, newbie
>
> Currently, {{KafkaStreams}} constructor has overloads that take either 
> {{Properties}} or {{StreamsConfig}} a parameters.
> Because {{StreamsConfig}} is immutable and is created from a {{Properties}} 
> object itself, the constructors accepting {{StreamsConfig}} are not useful 
> and add only boiler plate code. Thus, we should deprecate those constructors 
> in order to remove them eventually.
> This JIRA includes a public API changes and thus requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6386) Deprecate KafkaStreams constructor taking StreamsConfig parameter

2017-12-19 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6386:
---
Description: 
Currently, {{KafkaStreams}} constructor has overloads that take either 
{{Properties}} or {{StreamsConfig}} a parameters.

Because {{StreamsConfig}} is immutable and is created from a {{Properties}} 
object itself, the constructors accepting {{StreamsConfig}} are not useful and 
adds only boiler plate code. Thus, we should deprecate those constructors in 
order to remove them eventually.

This JIRA includes a public API changes and thus requires a KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

  was:
Currently, {{KafkaStreams}} constructor has overloads that take either 
{{Properties}} or {{StreamsConfig}} a parameters.

Because {{StreamsConfig}} is immutable and is created from a {{Properties}} 
object itself, the constructors accepting {{StreamsConfig}} are not useful and 
add only boiler plate code. Thus, we should deprecate those constructors in 
order to remove them eventually.

This JIRA includes a public API changes and thus requires a KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals


> Deprecate KafkaStreams constructor taking StreamsConfig parameter
> -
>
> Key: KAFKA-6386
> URL: https://issues.apache.org/jira/browse/KAFKA-6386
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, needs-kip, newbie
>
> Currently, {{KafkaStreams}} constructor has overloads that take either 
> {{Properties}} or {{StreamsConfig}} a parameters.
> Because {{StreamsConfig}} is immutable and is created from a {{Properties}} 
> object itself, the constructors accepting {{StreamsConfig}} are not useful 
> and adds only boiler plate code. Thus, we should deprecate those constructors 
> in order to remove them eventually.
> This JIRA includes a public API changes and thus requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called before StreamThread.start

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297249#comment-16297249
 ] 

ASF GitHub Bot commented on KAFKA-6383:
---

GitHub user rodesai opened a pull request:

https://github.com/apache/kafka/pull/4343

KAFKA-6383: complete shutdown for CREATED StreamThreads

When transitioning StreamThread from CREATED to PENDING_SHUTDOWN
free up resources from the caller, rather than the stream thread,
since in this case the stream thread was never actually started.

In KakfaStreams.close, shut down the streams threads from the
close thread. StreamThread.shutdown may now block, so call this
from the close thread so that the timeout is honored.

*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*

*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rodesai/kafka KAFKA-6383

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4343.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4343


commit ab98a90027189dc12e3df99caf3d4f2aab8fefa0
Author: Rohan Desai 
Date:   2017-12-19T18:14:20Z

KAFKA-6383: complete shutdown for CREATED StreamThreads

When transitioning StreamThread from CREATED to PENDING_SHUTDOWN
free up resources from the caller, rather than the stream thread,
since in this case the stream thread was never actually started.

In KakfaStreams.close, shut down the streams threads from the
close thread. StreamThread.shutdown may now block, so call this
from the close thread so that the timeout is honored.




> StreamThread.shutdown doesn't clean up completely when called before 
> StreamThread.start
> ---
>
> Key: KAFKA-6383
> URL: https://issues.apache.org/jira/browse/KAFKA-6383
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Rohan Desai
>
> The following code leaks a producer network thread:
> {code}
> ks = new KafkaStreams(...);
> ks.close();
> {code}
> The underlying issue is that KafkaStreams creates a bunch of StreamsThreads 
> via StreamThread.create, which in turn creates a bunch of stuff (including a 
> producer). These resources are cleaned up only when the thread exits. So if 
> the thread was never started, then they are never cleaned up. 
> StreamThread.shutdown should clean up if it sees that the thread has never 
> been started.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6321) ConsumerGroupCommand should use the new consumer to query the log end offsets.

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297426#comment-16297426
 ] 

ASF GitHub Bot commented on KAFKA-6321:
---

GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/4344

KAFKA-6321: Consolidate calls to KafkaConsumer's `beginningOffsets()` and 
`endOffsets()` in ConsumerGroupCommand

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-6321

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4344.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4344


commit b69663447364a32a8d14ebd212f3f9ee99520fa0
Author: Vahid Hashemian 
Date:   2017-12-19T20:54:31Z

KAFKA-6321: Consolidate calls to KafkaConsumer's `beginningOffsets()` and 
`endOffsets()` in ConsumerGroupCommand




> ConsumerGroupCommand should use the new consumer to query the log end offsets.
> --
>
> Key: KAFKA-6321
> URL: https://issues.apache.org/jira/browse/KAFKA-6321
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Assignee: Vahid Hashemian
> Fix For: 1.1.0
>
>
> Currently the ConsumerGroupCommand is querying the log end offsets one 
> partition at a time. It should switch to use Consumer.endOffsets().



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6387) Worker's producer and consumer configs should inherit from worker configs

2017-12-19 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-6387:


 Summary: Worker's producer and consumer configs should inherit 
from worker configs
 Key: KAFKA-6387
 URL: https://issues.apache.org/jira/browse/KAFKA-6387
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 1.0.0
Reporter: Randall Hauch


Currently, the worker configuration file defines the connection properties for 
the three separate types of connections being made to the Kafka cluster:

# the worker group membership,
# producers for source connectors,
# the consumers for sink connectors. 

The configs are namespaced because to properly support things like interceptors 
where the configs for 2 and 3 would conflict (same config name, different 
value).

However, it would be beneficial when such control is not required for the 
producers and consumers to inherit the top-level configurations yet be able to 
override them with the {{producer.}} and {{consumer.}} namespaced 
configurations. This way the producer- and consumer-specific configurations 
need only be specified if/when they need to override the top-level 
configurations. This may be necessary, for example, to have different ACLs than 
the connector tasks compared to the producers and consumers.

This will require a minimal KIP to explain the new behavior. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2017-12-19 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297562#comment-16297562
 ] 

Jason Gustafson commented on KAFKA-4879:


[~baluchicken] It looks like progress on this issue has stalled. Do you mind if 
I pick it up?

I'm not sure we've reached consensus on the solution. The underlying issue is 
that the consumer blocks to find starting offsets for partitions which are 
assigned. If we are in {{poll()}}, then we will be stuck there until the 
partition is re-created. If we are in {{position()}} or one of the relative 
{{seek()}} methods, we will be similarly stuck. 

1. In {{poll()}}, if we can't find the starting offset for a partition, I think 
we ought to begin fetching for other partitions and periodically recheck 
metadata in the background. We can potentially let the leader of the group 
rebalance if enough time passes and an assigned partition still doesn't exist. 
One question is whether we can propagate back to the user an 
UnknownTopicException at any point, but I think we can punt on this problem for 
now.

2. I think for the other methods, we have a choice between adding overloaded 
methods that accept a timeout parameter or adding a config like 
{{max.block.ms}} to match the producer. I'm somewhat partial to the first one 
since it is more flexible. If we're going to do a KIP for this, we may as well 
cover {{commitSync()}} and some of the other blocking APIs as well.

Thoughts?

> KafkaConsumer.position may hang forever when deleting a topic
> -
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Shixiong Zhu
>Assignee: Balint Molnar
> Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
>   // Sleep 30 seconds to give us enough time to delete the topic.
>   Thread.sleep(3);
> } catch (InterruptedException e) {
>   e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
>   System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6323) punctuate with WALL_CLOCK_TIME triggered immediately

2017-12-19 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297570#comment-16297570
 ] 

Guozhang Wang commented on KAFKA-6323:
--

[~frederica] I'd suggest the following:

1) STREAM_TIME punctuation: I agree with [~mjsax] for aligning on scheduled 
timestamp; to be more specific, suppose user provided interval is {{T}}, we 
would first schedule the next timstamp as {{T}} exactly; then at any point 
suppose our next scheduled the next timestamp {{T1}}, and stream time has 
advanced to {{T2}} because of received data where {{T2 >= T1}}, then we just 
punctuate with parameter {{floor(T2, T)}} and schedule the next punctuation at 
{{floor(T2, T) + T}}.

2) WALL_CLOCK_TIME: we do not try to align on interval, i.e. with user provided 
interval {{T}}, next scheduled time is {{now + T}}, and at the time we did the 
check with scheduled timestamp {{T1}}, if the current system time is {{T2 (T2 
>= T1)}} we punctuate at {{T2}} and schedule the next punctuation at timestamp 
{{T2 + T}}. The argument is that with long GC / 
single-record-taking-long-time-to-process / etc scenarios, we can never have a 
precise or predictable punctuation based on system wall-clock time, so instead 
we'd just try to expose the exact current system time when punctuation is 
triggered.

> punctuate with WALL_CLOCK_TIME triggered immediately
> 
>
> Key: KAFKA-6323
> URL: https://issues.apache.org/jira/browse/KAFKA-6323
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Frederic Arno
>Assignee: Frederic Arno
> Fix For: 1.1.0, 1.0.1
>
>
> When working on a custom Processor from which I am scheduling a punctuation 
> using WALL_CLOCK_TIME. I've noticed that whatever the punctuation interval I 
> set, a call to my Punctuator is always triggered immediately.
> Having a quick look at kafka-streams' code, I could find that all 
> PunctuationSchedule's timestamps are matched against the current time in 
> order to decide whether or not to trigger the punctuator 
> (org.apache.kafka.streams.processor.internals.PunctuationQueue#mayPunctuate). 
> However, I've only seen code that initializes PunctuationSchedule's timestamp 
> to 0, which I guess is what is causing an immediate punctuation.
> At least when using WALL_CLOCK_TIME, shouldn't the PunctuationSchedule's 
> timestamp be initialized to current time + interval?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread

2017-12-19 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297569#comment-16297569
 ] 

Jason Gustafson commented on KAFKA-6366:


[~huxi_2b] Sure, it seems reasonable to catch {{Throwable}} in the heartbeat 
thread. That can probably be addressed separately.

> StackOverflowError in kafka-coordinator-heartbeat-thread
> 
>
> Key: KAFKA-6366
> URL: https://issues.apache.org/jira/browse/KAFKA-6366
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Joerg Heinicke
> Attachments: 6366.v1.txt
>
>
> With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing 
> once a StackOverflowError in the heartbeat thread occurred due to 
> connectivity issues of the consumers to the coordinating broker:
> Immediately before the exception there are hundreds, if not thousands of log 
> entries of following type:
> 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] INFO  - [Consumer clientId=consumer-4, 
> groupId=my-consumer-group] Marking the coordinator : (id: 
> 2147483645 rack: null) dead
> The exceptions always happen somewhere in the DateFormat code, even 
> though at different lines.
> 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] ERROR - Uncaught exception in thread 
> 'kafka-coordinator-heartbeat-thread | my-consumer-group':
> java.lang.StackOverflowError
>  at 
> java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362)
>  at 
> java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340)
>  at java.util.Calendar.getDisplayName(Calendar.java:2110)
>  at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
>  at java.text.DateFormat.format(DateFormat.java:345)
>  at 
> org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443)
>  at 
> org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65)
>  at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
>  at 
> org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
>  at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
>  at 
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
>  at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
>  at org.apache.log4j.Category.callAppenders(Category.java:206)
>  at org.apache.log4j.Category.forcedLog(Category.java:391)
>  at org.apache.log4j.Category.log(Category.java:856)
>  at 
> org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
> ...
> the following 9 lines are repeated around hundred times.
> ...
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(Req

[jira] [Updated] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread

2017-12-19 Thread Joerg Heinicke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joerg Heinicke updated KAFKA-6366:
--
Attachment: ConverterProcessor.zip

Log file from start until error.

> StackOverflowError in kafka-coordinator-heartbeat-thread
> 
>
> Key: KAFKA-6366
> URL: https://issues.apache.org/jira/browse/KAFKA-6366
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Joerg Heinicke
> Attachments: 6366.v1.txt, ConverterProcessor.zip
>
>
> With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing 
> once a StackOverflowError in the heartbeat thread occurred due to 
> connectivity issues of the consumers to the coordinating broker:
> Immediately before the exception there are hundreds, if not thousands of log 
> entries of following type:
> 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] INFO  - [Consumer clientId=consumer-4, 
> groupId=my-consumer-group] Marking the coordinator : (id: 
> 2147483645 rack: null) dead
> The exceptions always happen somewhere in the DateFormat code, even 
> though at different lines.
> 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] ERROR - Uncaught exception in thread 
> 'kafka-coordinator-heartbeat-thread | my-consumer-group':
> java.lang.StackOverflowError
>  at 
> java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362)
>  at 
> java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340)
>  at java.util.Calendar.getDisplayName(Calendar.java:2110)
>  at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
>  at java.text.DateFormat.format(DateFormat.java:345)
>  at 
> org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443)
>  at 
> org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65)
>  at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
>  at 
> org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
>  at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
>  at 
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
>  at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
>  at org.apache.log4j.Category.callAppenders(Category.java:206)
>  at org.apache.log4j.Category.forcedLog(Category.java:391)
>  at org.apache.log4j.Category.log(Category.java:856)
>  at 
> org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
> ...
> the following 9 lines are repeated around hundred times.
> ...
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFut

[jira] [Issue Comment Deleted] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread

2017-12-19 Thread Joerg Heinicke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joerg Heinicke updated KAFKA-6366:
--
Comment: was deleted

(was: Log file from start until error.)

> StackOverflowError in kafka-coordinator-heartbeat-thread
> 
>
> Key: KAFKA-6366
> URL: https://issues.apache.org/jira/browse/KAFKA-6366
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Joerg Heinicke
> Attachments: 6366.v1.txt, ConverterProcessor.zip
>
>
> With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing 
> once a StackOverflowError in the heartbeat thread occurred due to 
> connectivity issues of the consumers to the coordinating broker:
> Immediately before the exception there are hundreds, if not thousands of log 
> entries of following type:
> 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] INFO  - [Consumer clientId=consumer-4, 
> groupId=my-consumer-group] Marking the coordinator : (id: 
> 2147483645 rack: null) dead
> The exceptions always happen somewhere in the DateFormat code, even 
> though at different lines.
> 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] ERROR - Uncaught exception in thread 
> 'kafka-coordinator-heartbeat-thread | my-consumer-group':
> java.lang.StackOverflowError
>  at 
> java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362)
>  at 
> java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340)
>  at java.util.Calendar.getDisplayName(Calendar.java:2110)
>  at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
>  at java.text.DateFormat.format(DateFormat.java:345)
>  at 
> org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443)
>  at 
> org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65)
>  at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
>  at 
> org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
>  at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
>  at 
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
>  at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
>  at org.apache.log4j.Category.callAppenders(Category.java:206)
>  at org.apache.log4j.Category.forcedLog(Category.java:391)
>  at org.apache.log4j.Category.log(Category.java:856)
>  at 
> org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
> ...
> the following 9 lines are repeated around hundred times.
> ...
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fir

[jira] [Commented] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread

2017-12-19 Thread Joerg Heinicke (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297642#comment-16297642
 ] 

Joerg Heinicke commented on KAFKA-6366:
---

I attached the log file from start to until the error occurs at 21:03. As you 
can see the issues were already going on for hours. What also seems to be 
apparent is that starting from 17:46 for most of the times only one thread 
seems to be affected: pool-5-thread-3 resp. clientId=consumer-2 (from the log 
file both seem to be index 1 based).

> StackOverflowError in kafka-coordinator-heartbeat-thread
> 
>
> Key: KAFKA-6366
> URL: https://issues.apache.org/jira/browse/KAFKA-6366
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Joerg Heinicke
> Attachments: 6366.v1.txt, ConverterProcessor.zip
>
>
> With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing 
> once a StackOverflowError in the heartbeat thread occurred due to 
> connectivity issues of the consumers to the coordinating broker:
> Immediately before the exception there are hundreds, if not thousands of log 
> entries of following type:
> 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] INFO  - [Consumer clientId=consumer-4, 
> groupId=my-consumer-group] Marking the coordinator : (id: 
> 2147483645 rack: null) dead
> The exceptions always happen somewhere in the DateFormat code, even 
> though at different lines.
> 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] ERROR - Uncaught exception in thread 
> 'kafka-coordinator-heartbeat-thread | my-consumer-group':
> java.lang.StackOverflowError
>  at 
> java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362)
>  at 
> java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340)
>  at java.util.Calendar.getDisplayName(Calendar.java:2110)
>  at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
>  at java.text.DateFormat.format(DateFormat.java:345)
>  at 
> org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443)
>  at 
> org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65)
>  at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
>  at 
> org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
>  at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
>  at 
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
>  at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
>  at org.apache.log4j.Category.callAppenders(Category.java:206)
>  at org.apache.log4j.Category.forcedLog(Category.java:391)
>  at org.apache.log4j.Category.log(Category.java:856)
>  at 
> org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
> ...
> the following 9 lines are repeated around hundred times.
> ...
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)

[jira] [Comment Edited] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread

2017-12-19 Thread Joerg Heinicke (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297642#comment-16297642
 ] 

Joerg Heinicke edited comment on KAFKA-6366 at 12/19/17 11:39 PM:
--

I attached the log file [^ConverterProcessor.zip] from start to until the error 
occurs at 21:03. As you can see the issues were already going on for hours. 
What also seems to be apparent is that starting from 17:46 for most of the 
times only one thread seems to be affected: pool-5-thread-3 resp. 
clientId=consumer-2 (from the log file both seem to be index 1 based).


was (Author: joerg.heinicke):
I attached the log file from start to until the error occurs at 21:03. As you 
can see the issues were already going on for hours. What also seems to be 
apparent is that starting from 17:46 for most of the times only one thread 
seems to be affected: pool-5-thread-3 resp. clientId=consumer-2 (from the log 
file both seem to be index 1 based).

> StackOverflowError in kafka-coordinator-heartbeat-thread
> 
>
> Key: KAFKA-6366
> URL: https://issues.apache.org/jira/browse/KAFKA-6366
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Joerg Heinicke
> Attachments: 6366.v1.txt, ConverterProcessor.zip
>
>
> With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing 
> once a StackOverflowError in the heartbeat thread occurred due to 
> connectivity issues of the consumers to the coordinating broker:
> Immediately before the exception there are hundreds, if not thousands of log 
> entries of following type:
> 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] INFO  - [Consumer clientId=consumer-4, 
> groupId=my-consumer-group] Marking the coordinator : (id: 
> 2147483645 rack: null) dead
> The exceptions always happen somewhere in the DateFormat code, even 
> though at different lines.
> 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] ERROR - Uncaught exception in thread 
> 'kafka-coordinator-heartbeat-thread | my-consumer-group':
> java.lang.StackOverflowError
>  at 
> java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362)
>  at 
> java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340)
>  at java.util.Calendar.getDisplayName(Calendar.java:2110)
>  at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
>  at java.text.DateFormat.format(DateFormat.java:345)
>  at 
> org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443)
>  at 
> org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65)
>  at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
>  at 
> org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
>  at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
>  at 
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
>  at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
>  at org.apache.log4j.Category.callAppenders(Category.java:206)
>  at org.apache.log4j.Category.forcedLog(Category.java:391)
>  at org.apache.log4j.Category.log(Category.java:856)
>  at 
> org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
> ...
> the following 9 lines are repeated around hundred times.
> ...
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetw

[jira] [Created] (KAFKA-6388) Error while trying to roll a segment that already exists

2017-12-19 Thread David Hay (JIRA)
David Hay created KAFKA-6388:


 Summary: Error while trying to roll a segment that already exists
 Key: KAFKA-6388
 URL: https://issues.apache.org/jira/browse/KAFKA-6388
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0
Reporter: David Hay
Assignee: Neha Narkhede
Priority: Blocker


I tried setting up a 5 broker 0.8 cluster and sending messages to 100s of 
topics on it. For a couple of topic partitions, the produce requests never 
succeed since they fail on the leader with the following error - 

[2012-12-05 22:54:05,711] WARN [Kafka Log on Broker 2], Newly rolled segment 
file 000
0.log already exists; deleting it first (kafka.log.Log)
[2012-12-05 22:54:05,711] WARN [Kafka Log on Broker 2], Newly rolled segment 
file 000
0.index already exists; deleting it first (kafka.log.Log)
[2012-12-05 22:54:05,715] ERROR [ReplicaFetcherThread-1-0-on-broker-2], Error 
due to  (kafka.server.R
eplicaFetcherThread)
kafka.common.KafkaException: Trying to roll a new log segment for topic 
partition NusWriteEvent-4 with start offset 0 while it already exsits
at kafka.log.Log.rollToOffset(Log.scala:456)
at kafka.log.Log.roll(Log.scala:434)
at kafka.log.Log.maybeRoll(Log.scala:423)
at kafka.log.Log.append(Log.scala:257)
at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:51)
at 
kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:125)
at 
kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:108)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:108)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50)





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6388) Error while trying to roll a segment that already exists

2017-12-19 Thread David Hay (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Hay updated KAFKA-6388:
-
Affects Version/s: (was: 0.8.0)
   1.0.0

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Assignee: Neha Narkhede
>Priority: Blocker
>
> I tried setting up a 5 broker 0.8 cluster and sending messages to 100s of 
> topics on it. For a couple of topic partitions, the produce requests never 
> succeed since they fail on the leader with the following error - 
> [2012-12-05 22:54:05,711] WARN [Kafka Log on Broker 2], Newly rolled segment 
> file 000
> 0.log already exists; deleting it first (kafka.log.Log)
> [2012-12-05 22:54:05,711] WARN [Kafka Log on Broker 2], Newly rolled segment 
> file 000
> 0.index already exists; deleting it first (kafka.log.Log)
> [2012-12-05 22:54:05,715] ERROR [ReplicaFetcherThread-1-0-on-broker-2], Error 
> due to  (kafka.server.R
> eplicaFetcherThread)
> kafka.common.KafkaException: Trying to roll a new log segment for topic 
> partition NusWriteEvent-4 with start offset 0 while it already exsits
> at kafka.log.Log.rollToOffset(Log.scala:456)
> at kafka.log.Log.roll(Log.scala:434)
> at kafka.log.Log.maybeRoll(Log.scala:423)
> at kafka.log.Log.append(Log.scala:257)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:51)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:125)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:108)
> at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:108)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6388) Error while trying to roll a segment that already exists

2017-12-19 Thread David Hay (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Hay updated KAFKA-6388:
-
Description: 
Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in our 
attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).

After spending 30 min or more spewing log messages like this:
{noformat}
[2017-12-19 16:44:28,998] INFO Replica loaded for partition 
screening.save.results.screening.save.results.processor.error-43 with initial 
high watermark 0 (kafka.cluster.Replica)
{noformat}

Eventually, the replica thread throws the error below (also referenced in the 
original issue).  If I remove that partition from the data directory and bounce 
the broker, it eventually rebalances (assuming it doesn't hit a different 
partition with the same error).

{noformat}
2017-12-19 15:16:24,227] WARN Newly rolled segment file 
0002.log already exists; deleting it first (kafka.log.Log)
[2017-12-19 15:16:24,227] WARN Newly rolled segment file 
0002.index already exists; deleting it first (kafka.log.Log)
[2017-12-19 15:16:24,227] WARN Newly rolled segment file 
0002.timeindex already exists; deleting it first (kafka.log.Log)
[2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
fetcher for partitions __consumer_offsets-20 
(kafka.server.ReplicaFetcherManager)
[2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
kafka.common.KafkaException: Error processing data for partition 
sr.new.sr.new.processor.error-38 offset 2
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
at scala.Option.foreach(Option.scala:257)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
already exists.
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
at kafka.log.Log.roll(Log.scala:1297)
at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
at kafka.log.Log.append(Log.scala:624)
at kafka.log.Log.appendAsFollower(Log.scala:607)
at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:184)
... 13 more
[2017-12-19 15:16:24,302] INFO [ReplicaFetcher replicaId=2, leaderId=1, 
fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
{noformat}

  was:
I tried setting up a 5 broker 0.8 cluster and sending messages to 100s of 
topics on it. For a couple of topic partitions, the produce requests never 
succeed since they fail on the leader with the following error - 

[2012-12-05 22:54:05,711] WARN [Kafka Log on Broker 2], Newly rolled segment 
file 000
0.log already exists; deleting it first (kafka.log.Log)
[2012-12-05 22:54:05,711] WARN [Kafka Log on Broker 2], Newly rolled segment 
file 000
0.index alre

[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2017-12-19 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297655#comment-16297655
 ] 

Jason Gustafson commented on KAFKA-6388:


[~dhay] One thing to check is whether the index corresponding to the segment 
starting at offset 2 on sr.new.sr.new.processor.error-38 has a zero size. I 
have seen cases where this caused the broker to unexpectedly roll on an already 
empty log file.

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Assignee: Neha Narkhede
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionDa

[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2017-12-19 Thread David Hay (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297667#comment-16297667
 ] 

David Hay commented on KAFKA-6388:
--

I apparently have 288 partitions with a 0 length .index file.  Does that mean 
if I remove those partitions from the data directory, they'll be recreated 
correctly from the leader?  Seems like Kafka should automatically detect that 
and recover gracefully.


> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Assignee: Neha Narkhede
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(Re

[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2017-12-19 Thread David Hay (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297669#comment-16297669
 ] 

David Hay commented on KAFKA-6388:
--

I just got the error again.  Looking at the data directory for the bad 
partition, I see this:
{noformat}
total 0
-rw-r--r--. 1 www www 10485760 Dec 19 16:56 .index
-rw-r--r--. 1 www www0 Dec 19 16:56 .log
-rw-r--r--. 1 www www 10485756 Dec 19 16:56 .timeindex
-rw-r--r--. 1 www www0 Dec 19 16:56 leader-epoch-checkpoint
{noformat}

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Assignee: Neha Narkhede
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.serve

[jira] [Commented] (KAFKA-6150) Make Repartition Topics Transient

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297672#comment-16297672
 ] 

ASF GitHub Bot commented on KAFKA-6150:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4315


> Make Repartition Topics Transient
> -
>
> Key: KAFKA-6150
> URL: https://issues.apache.org/jira/browse/KAFKA-6150
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>  Labels: operability
> Fix For: 1.1.0
>
>
> Unlike changelog topics, the repartition topics could just be short-lived. 
> Today users have different ways to configure them with short retention such 
> as enforce a short retention period or use AppendTime for repartition topics. 
> All these would be cumbersome and Streams should just do this for the users.
> One way to do it is use the “purgeData” admin API (KIP-107) such that after 
> the offset of the input topics are committed, if the input topics are 
> actually repartition topics, we would purge the data immediately.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2017-12-19 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297674#comment-16297674
 ] 

Ismael Juma commented on KAFKA-6388:


It does look like the code is inconsistent.

{code}
val newOffset = math.max(expectedNextOffset, logEndOffset)
val logFile = Log.logFile(dir, newOffset)
val offsetIdxFile = offsetIndexFile(dir, newOffset)
val timeIdxFile = timeIndexFile(dir, newOffset)
val txnIdxFile = transactionIndexFile(dir, newOffset)
for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if 
file.exists) {
  warn(s"Newly rolled segment file ${file.getAbsolutePath} already 
exists; deleting it first")
  Files.delete(file.toPath)
}

Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())

// take a snapshot of the producer state to facilitate recovery. It is 
useful to have the snapshot
// offset align with the new segment offset since this ensures we can 
recover the segment by beginning
// with the corresponding snapshot file and scanning the segment data. 
Because the segment base offset
// may actually be ahead of the current producer state end offset 
(which corresponds to the log end offset),
// we manually override the state offset here prior to taking the 
snapshot.
producerStateManager.updateMapEndOffset(newOffset)
producerStateManager.takeSnapshot()

val segment = LogSegment.open(dir,
  baseOffset = newOffset,
  config,
  time = time,
  fileAlreadyExists = false,
  initFileSize = initFileSize,
  preallocate = config.preallocate)
val prev = addSegment(segment)
if (prev != null)
  throw new KafkaException("Trying to roll a new log segment for topic 
partition %s with start offset %d while it already exists.".format(name, 
newOffset))
{code}

Note how it tries to delete the files if they already exist, but then an 
exception is thrown if the segment exists in memory. This also means that we 
potentially attempt to delete a file that is currently open.

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Assignee: Neha Narkhede
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.

[jira] [Assigned] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called before StreamThread.start

2017-12-19 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-6383:


Assignee: Rohan Desai

> StreamThread.shutdown doesn't clean up completely when called before 
> StreamThread.start
> ---
>
> Key: KAFKA-6383
> URL: https://issues.apache.org/jira/browse/KAFKA-6383
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Rohan Desai
>Assignee: Rohan Desai
>
> The following code leaks a producer network thread:
> {code}
> ks = new KafkaStreams(...);
> ks.close();
> {code}
> The underlying issue is that KafkaStreams creates a bunch of StreamsThreads 
> via StreamThread.create, which in turn creates a bunch of stuff (including a 
> producer). These resources are cleaned up only when the thread exits. So if 
> the thread was never started, then they are never cleaned up. 
> StreamThread.shutdown should clean up if it sees that the thread has never 
> been started.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called before StreamThread.start

2017-12-19 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297675#comment-16297675
 ] 

Guozhang Wang commented on KAFKA-6383:
--

[~rohanpd] I have added you to the contributor list, you can assign yourself to 
JIRAs now.

> StreamThread.shutdown doesn't clean up completely when called before 
> StreamThread.start
> ---
>
> Key: KAFKA-6383
> URL: https://issues.apache.org/jira/browse/KAFKA-6383
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Rohan Desai
>Assignee: Rohan Desai
>
> The following code leaks a producer network thread:
> {code}
> ks = new KafkaStreams(...);
> ks.close();
> {code}
> The underlying issue is that KafkaStreams creates a bunch of StreamsThreads 
> via StreamThread.create, which in turn creates a bunch of stuff (including a 
> producer). These resources are cleaned up only when the thread exits. So if 
> the thread was never started, then they are never cleaned up. 
> StreamThread.shutdown should clean up if it sees that the thread has never 
> been started.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6388) Error while trying to roll a segment that already exists

2017-12-19 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-6388:
--

Assignee: (was: Neha Narkhede)

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:184)
> ... 13 more
> [2017-12-19 15:16:24,302] INFO [ReplicaFetcher replicaId=2, leaderId=1, 
> fetche

[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2017-12-19 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297723#comment-16297723
 ] 

Jason Gustafson commented on KAFKA-6388:


[~dhay] I'm not saying there's not a bug here ;). Just trying to help you 
recover. The case I'm aware of is when the index file of the last segment is 
empty. Have you seen any instances of this? There shouldn't be any need to 
delete the segment itself, just the index. But make sure to restart the broker 
after deleting the index. 

Can you post the exact exception for the partition whose log directory you 
posted above?

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.Repl

[jira] [Commented] (KAFKA-6371) FetchMetadata creates unneeded Strings on instantiation

2017-12-19 Thread Maytee Chinavanichkit (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297828#comment-16297828
 ] 

Maytee Chinavanichkit commented on KAFKA-6371:
--

[~huxi_2b] Yes!

> FetchMetadata creates unneeded Strings on instantiation
> ---
>
> Key: KAFKA-6371
> URL: https://issues.apache.org/jira/browse/KAFKA-6371
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1, 0.10.1.1, 0.10.2.1
>Reporter: Maytee Chinavanichkit
>Assignee: Maytee Chinavanichkit
>Priority: Minor
> Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2
>
>
> My colleague and I were taking a heap dump to investigate the memory usage of 
> a broker. From the dump, we saw a number of object strings with the message 
> {{onlyCommitted: }} and {{partitionStatus: ]}}. Upon 
> investigation, these objects were being instantiated when the 
> {{FetchMetadata}} object is constructed. The toString method here is 
> malformed and the last two lines are executed as a block instead of being 
> concatenated. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5407) Mirrormaker dont start after upgrade

2017-12-19 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297972#comment-16297972
 ] 

Manikumar commented on KAFKA-5407:
--

[~hachikuji] Your thoughts on propagating offsets topic errors?

> Mirrormaker dont start after upgrade
> 
>
> Key: KAFKA-5407
> URL: https://issues.apache.org/jira/browse/KAFKA-5407
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.1
> Environment: Operating system
> CentOS 6.8
> HW
> Board Mfg : HP
>  Board Product : ProLiant DL380p Gen8
> CPU's x2
> Product Manufacturer  : Intel
>  Product Name  :  Intel(R) Xeon(R) CPU E5-2660 v2 @ 2.20GHz
>  Memory Type   : DDR3 SDRAM
>  SDRAM Capacity: 2048 MB
>  Total Memory: : 64GB
> Hardrives size and layout:
> 9 drives using jbod
> drive size 3.6TB each
>Reporter: Fernando Vega
>Priority: Critical
> Attachments: broker.hkg1.new, debug.hkg1.new, 
> mirrormaker-repl-sjc2-to-hkg1.log.8
>
>
> Currently Im upgrading the cluster from 0.8.2-beta to 0.10.2.1
> So I followed the rolling procedure:
> Here the config files:
> Consumer
> {noformat}
> #
> # Cluster: repl
> # Topic list(goes into command line): 
> REPL-ams1-global,REPL-atl1-global,REPL-sjc2-global,REPL-ams1-global-PN_HXIDMAP_.*,REPL-atl1-global-PN_HXIDMAP_.*,REPL-sjc2-global-PN_HXIDMAP_.*,REPL-ams1-global-PN_HXCONTEXTUALV2_.*,REPL-atl1-global-PN_HXCONTEXTUALV2_.*,REPL-sjc2-global-PN_HXCONTEXTUALV2_.*
> bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092
> group.id=hkg1_cluster
> auto.commit.interval.ms=6
> partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
> {noformat}
> Producer
> {noformat}
>  hkg1
> # # Producer
> # # hkg1
> bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092
> compression.type=gzip
> acks=0
> {noformat}
> Broker
> {noformat}
> auto.leader.rebalance.enable=true
> delete.topic.enable=true
> socket.receive.buffer.bytes=1048576
> socket.send.buffer.bytes=1048576
> default.replication.factor=2
> auto.create.topics.enable=true
> num.partitions=1
> num.network.threads=8
> num.io.threads=40
> log.retention.hours=1
> log.roll.hours=1
> num.replica.fetchers=8
> zookeeper.connection.timeout.ms=3
> zookeeper.session.timeout.ms=3
> inter.broker.protocol.version=0.10.2
> log.message.format.version=0.8.2
> {noformat}
> I tried also using stock configuraiton with no luck.
> The error that I get is this:
> {noformat}
> 2017-06-07 12:24:45,476] INFO ConsumerConfig values:
>   auto.commit.interval.ms = 6
>   auto.offset.reset = latest
>   bootstrap.servers = [app454.sjc2.mytest.com:9092, 
> app455.sjc2.mytest.com:9092, app456.sjc2.mytest.com:9092, 
> app457.sjc2.mytest.com:9092, app458.sjc2.mytest.com:9092, 
> app459.sjc2.mytest.com:9092]
>   check.crcs = true
>   client.id = MirrorMaker_hkg1-1
>   connections.max.idle.ms = 54
>   enable.auto.commit = false
>   exclude.internal.topics = true
>   fetch.max.bytes = 52428800
>   fetch.max.wait.ms = 500
>   fetch.min.bytes = 1
>   group.id = MirrorMaker_hkg1
>   heartbeat.interval.ms = 3000
>   interceptor.classes = null
>   key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>   max.partition.fetch.bytes = 1048576
>   max.poll.interval.ms = 30
>   max.poll.records = 500
>   metadata.max.age.ms = 30
>   metric.reporters = []
>   metrics.num.samples = 2
>   metrics.recording.level = INFO
>   metrics.sample.window.ms = 3
>   partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RoundRobinAssignor]
>   receive.buffer.bytes = 65536
>   reconnect.backoff.ms = 50
>   request.timeout.ms = 305000
>   retry.backoff.ms = 100
>   sasl.jaas.config = null
>   sasl.kerberos.kinit.cmd = /usr/bin/kinit
>   sasl.kerberos.min.time.before.relogin = 6
>   sasl.kerberos.service.name = null
>   sasl.kerberos.ticket.renew.jitter = 0.05
>   sasl.kerberos.ticket.renew.window.factor = 0.8
>   sasl.mechanism = GSSAPI
>   security.protocol = PLAINTEXT
>   send.buffer.bytes = 131072
>   session.timeout.ms = 1
>   ssl.cipher.suites = null
>   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>   ssl.endpoint.identification.algorithm = null
>   ssl.key.password = null
>   ssl.keymanager.algorithm = SunX509
>   ssl.keystore.location = null
>   ssl.keystore.password = null
>   ssl.keystore.type = JKS
>   ssl.protocol = TLS
>   ssl.provider = null
>   ssl.secure.random.implementation = null
>   ssl.trustmanager.algorithm = PKIX
>   ssl.truststore.location = null
>   ssl.trus

[jira] [Assigned] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2017-12-19 Thread Balint Molnar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Balint Molnar reassigned KAFKA-4879:


Assignee: Jason Gustafson  (was: Balint Molnar)

> KafkaConsumer.position may hang forever when deleting a topic
> -
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Shixiong Zhu
>Assignee: Jason Gustafson
> Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
>   // Sleep 30 seconds to give us enough time to delete the topic.
>   Thread.sleep(3);
> } catch (InterruptedException e) {
>   e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
>   System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2017-12-19 Thread Balint Molnar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297995#comment-16297995
 ] 

Balint Molnar commented on KAFKA-4879:
--

[~jasong35] Sure, I reassigned to you.

> KafkaConsumer.position may hang forever when deleting a topic
> -
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Shixiong Zhu
>Assignee: Jason Gustafson
> Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
>   // Sleep 30 seconds to give us enough time to delete the topic.
>   Thread.sleep(3);
> } catch (InterruptedException e) {
>   e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
>   System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)