[jira] [Comment Edited] (KAFKA-6678) Upgrade dependencies with later release versions

2018-04-18 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434768#comment-16434768
 ] 

Ted Yu edited comment on KAFKA-6678 at 4/19/18 4:47 AM:


Looks like 2.0 release in June would drop support for Java 7.

thanks


was (Author: yuzhih...@gmail.com):
Which release would drop support for Java 7 ?

thanks

> Upgrade dependencies with later release versions
> 
>
> Key: KAFKA-6678
> URL: https://issues.apache.org/jira/browse/KAFKA-6678
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Ted Yu
>Priority: Major
> Attachments: k-update.txt
>
>
> {code}
> The following dependencies have later release versions:
>  - net.sourceforge.argparse4j:argparse4j [0.7.0 -> 0.8.1]
>  - org.bouncycastle:bcpkix-jdk15on [1.58 -> 1.59]
>  - com.puppycrawl.tools:checkstyle [6.19 -> 8.8]
>  - org.owasp:dependency-check-gradle [3.0.2 -> 3.1.1]
>  - org.ajoberstar:grgit [1.9.3 -> 2.1.1]
>  - org.glassfish.jersey.containers:jersey-container-servlet [2.25.1 -> 2.26]
>  - org.eclipse.jetty:jetty-client [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-server [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-servlet [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-servlets [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.openjdk.jmh:jmh-core [1.19 -> 1.20]
>  - org.openjdk.jmh:jmh-core-benchmarks [1.19 -> 1.20]
>  - org.openjdk.jmh:jmh-generator-annprocess [1.19 -> 1.20]
>  - org.lz4:lz4-java [1.4 -> 1.4.1]
>  - org.apache.maven:maven-artifact [3.5.2 -> 3.5.3]
>  - org.jacoco:org.jacoco.agent [0.7.9 -> 0.8.0]
>  - org.jacoco:org.jacoco.ant [0.7.9 -> 0.8.0]
>  - org.rocksdb:rocksdbjni [5.7.3 -> 5.11.3]
>  - org.scala-lang:scala-library [2.11.12 -> 2.12.4]
>  - com.typesafe.scala-logging:scala-logging_2.11 [3.7.2 -> 3.8.0]
>  - org.scala-lang:scala-reflect [2.11.12 -> 2.12.4]
>  - org.scalatest:scalatest_2.11 [3.0.4 -> 3.0.5]
> {code}
> Looks like we can consider upgrading scalatest, jmh-core and checkstyle



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6730) Simplify state store recovery

2018-04-18 Thread Richard Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443565#comment-16443565
 ] 

Richard Yu commented on KAFKA-6730:
---

[~mjsax] If possible, could I take this JIRA?

> Simplify state store recovery
> -
>
> Key: KAFKA-6730
> URL: https://issues.apache.org/jira/browse/KAFKA-6730
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
> Fix For: 1.2.0
>
>
> In the current code base, we restore state stores in the main thread (in 
> contrast to older code that did restore state stored in the rebalance call 
> back). This has multiple advantages and allows us the further simplify 
> restore code.
> In the original code base, during a long restore phase, it was possible that 
> a instance misses a rebalance and drops out of the consumer group. To detect 
> this case, we apply a check during the restore phase, that the end-offset of 
> the changelog topic does not change. A changed offset implies a missed 
> rebalance as another thread started to write into the changelog topic (ie, 
> the current thread does not own the task/store/changelog-topic anymore).
> With the new code, that restores in the main-loop, it's ensured that `poll()` 
> is called regularly and thus, a rebalance will be detected automatically. 
> This make the check about an changing changelog-end-offset unnecessary.
> We can simplify the restore logic, to just consuming until `poll()` does not 
> return any data. For this case, we fetch the end-offset to see if we did 
> fully restore. If yes, we resume processing, if not, we continue the restore.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6730) Simplify state store recovery

2018-04-18 Thread Richard Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443524#comment-16443524
 ] 

Richard Yu commented on KAFKA-6730:
---

In the new code, would StoreChangeLogReader#restorePartition be removed then? 
This is considering the fact that much of the logic inside the restorePartition 
method will not be necessary for the new proposed version.

> Simplify state store recovery
> -
>
> Key: KAFKA-6730
> URL: https://issues.apache.org/jira/browse/KAFKA-6730
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
> Fix For: 1.2.0
>
>
> In the current code base, we restore state stores in the main thread (in 
> contrast to older code that did restore state stored in the rebalance call 
> back). This has multiple advantages and allows us the further simplify 
> restore code.
> In the original code base, during a long restore phase, it was possible that 
> a instance misses a rebalance and drops out of the consumer group. To detect 
> this case, we apply a check during the restore phase, that the end-offset of 
> the changelog topic does not change. A changed offset implies a missed 
> rebalance as another thread started to write into the changelog topic (ie, 
> the current thread does not own the task/store/changelog-topic anymore).
> With the new code, that restores in the main-loop, it's ensured that `poll()` 
> is called regularly and thus, a rebalance will be detected automatically. 
> This make the check about an changing changelog-end-offset unnecessary.
> We can simplify the restore logic, to just consuming until `poll()` does not 
> return any data. For this case, we fetch the end-offset to see if we did 
> fully restore. If yes, we resume processing, if not, we continue the restore.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6802) Improve logging when topics aren't known and assignments skipped

2018-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443288#comment-16443288
 ] 

ASF GitHub Bot commented on KAFKA-6802:
---

guozhangwang closed pull request #4891: KAFKA-6802: Improved logging for 
missing topics during task assignment
URL: https://github.com/apache/kafka/pull/4891
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index c86171c3ab0..cee94886854 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -82,7 +82,11 @@ protected int maxNumPartitions(Cluster metadata, Set 
topics) {
 List partitions = 
metadata.partitionsForTopic(topic);
 
 if (partitions.isEmpty()) {
-log.info("Skipping assigning topic {} to tasks since its 
metadata is not available yet", topic);
+
+log.warn("Skipping creating tasks for the topic group {} since 
topic {}'s metadata is not available yet;"
+ + " no tasks for this topic group will be assigned to 
any client.\n"
+ + " Make sure all supplied topics in the topology are 
created before starting"
+ + " as this could lead to unexpected results", 
topics, topic);
 return StreamsPartitionAssignor.NOT_AVAILABLE;
 } else {
 int numPartitions = partitions.size();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index c81105ef821..1f00c0478ea 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -470,7 +470,11 @@ public Subscription subscription(final Set topics) 
{
 for (final PartitionInfo partitionInfo : partitionInfoList) {
 final TopicPartition partition = new 
TopicPartition(partitionInfo.topic(), partitionInfo.partition());
 if (!allAssignedPartitions.contains(partition)) {
-log.warn("Partition {} is not assigned to any tasks: 
{}", partition, partitionsForTask);
+log.warn("Partition {} is not assigned to any tasks: 
{}"
+ + " Possible causes of a partition not 
getting assigned"
+ + " is that another topic defined in the 
topology has not been"
+ + " created when starting your streams 
application,"
+ + " resulting in no tasks created for this 
topology at all.", partition, partitionsForTask);
 }
 }
 } else {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve logging when topics aren't known and assignments skipped
> 
>
> Key: KAFKA-6802
> URL: https://issues.apache.org/jira/browse/KAFKA-6802
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6775) AbstractProcessor created in SimpleBenchmark should call super#init

2018-04-18 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6775.
--
   Resolution: Fixed
Fix Version/s: 1.2.0

> AbstractProcessor created in SimpleBenchmark should call super#init
> ---
>
> Key: KAFKA-6775
> URL: https://issues.apache.org/jira/browse/KAFKA-6775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ted Yu
>Assignee: Jimin Hsieh
>Priority: Minor
>  Labels: easy-fix, newbie
> Fix For: 1.2.0
>
>
> Around line 610:
> {code}
> return new AbstractProcessor() {
> @Override
> public void init(ProcessorContext context) {
> }
> {code}
> super.init should be called above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6775) AbstractProcessor created in SimpleBenchmark should call super#init

2018-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443259#comment-16443259
 ] 

ASF GitHub Bot commented on KAFKA-6775:
---

guozhangwang closed pull request #4859: KAFKA-6775: Fix the issue of without 
init super class's
URL: https://github.com/apache/kafka/pull/4859
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java 
b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index faab52e2118..e1b45f52e40 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -439,6 +439,7 @@ private void processStreamWithStateStore(final String 
topic) {
 @SuppressWarnings("unchecked")
 @Override
 public void init(final ProcessorContext context) {
+super.init(context);
 store = (KeyValueStore) 
context.getStateStore("store");
 }
 
@@ -446,12 +447,6 @@ public void init(final ProcessorContext context) {
 public void process(final Integer key, final byte[] value) 
{
 store.put(key, value);
 }
-
-@Override
-public void punctuate(final long timestamp) {}
-
-@Override
-public void close() {}
 };
 }
 }, "store");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 87ca82918a9..9e62e3fc9ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -47,6 +47,7 @@
 
 @Override
 public void init(final ProcessorContext context) {
+super.init(context);
 System.out.println("initializing processor: topic=" + 
topic + " taskId=" + context.taskId());
 numRecordsProcessed = 0;
 }
@@ -59,12 +60,6 @@ public void process(final Object key, final Object value) {
 System.out.println("processed " + 
numRecordsProcessed + " records from topic=" + topic);
 }
 }
-
-@Override
-public void punctuate(final long timestamp) {}
-
-@Override
-public void close() {}
 };
 }
 };


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> AbstractProcessor created in SimpleBenchmark should call super#init
> ---
>
> Key: KAFKA-6775
> URL: https://issues.apache.org/jira/browse/KAFKA-6775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ted Yu
>Assignee: Jimin Hsieh
>Priority: Minor
>  Labels: easy-fix, newbie
> Fix For: 1.2.0
>
>
> Around line 610:
> {code}
> return new AbstractProcessor() {
> @Override
> public void init(ProcessorContext context) {
> }
> {code}
> super.init should be called above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6803) Caching is turned off for stream-stream Join

2018-04-18 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6803:


 Summary: Caching is turned off for stream-stream Join
 Key: KAFKA-6803
 URL: https://issues.apache.org/jira/browse/KAFKA-6803
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


For other types joins: table-table and table-stream joins, caching is turned on 
in `Materialized` by default. However, in stream-stream join we hard coded 
internally to disable caching.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2018-04-18 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443182#comment-16443182
 ] 

Ismael Juma commented on KAFKA-3042:


[~junrao], is this fixed in 1.1.0?

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.10.0.0
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
>Assignee: Dong Lin
>Priority: Major
>  Labels: reliability
> Fix For: 1.2.0
>
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-2334) Prevent HW from going back during leader failover

2018-04-18 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reopened KAFKA-2334:


> Prevent HW from going back during leader failover 
> --
>
> Key: KAFKA-2334
> URL: https://issues.apache.org/jira/browse/KAFKA-2334
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8.2.1
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: reliability
>
> Consider the following scenario:
> 0. Kafka use replication factor of 2, with broker B1 as the leader, and B2 as 
> the follower. 
> 1. A producer keep sending to Kafka with ack=-1.
> 2. A consumer repeat issuing ListOffset request to Kafka.
> And the following sequence:
> 0. B1 current log-end-offset (LEO) 0, HW-offset 0; and same with B2.
> 1. B1 receive a ProduceRequest of 100 messages, append to local log (LEO 
> becomes 100) and hold the request in purgatory.
> 2. B1 receive a FetchRequest starting at offset 0 from follower B2, and 
> returns the 100 messages.
> 3. B2 append its received message to local log (LEO becomes 100).
> 4. B1 receive another FetchRequest starting at offset 100 from B2, knowing 
> that B2's LEO has caught up to 100, and hence update its own HW, and 
> satisfying the ProduceRequest in purgatory, and sending the FetchResponse 
> with HW 100 back to B2 ASYNCHRONOUSLY.
> 5. B1 successfully sends the ProduceResponse to the producer, and then fails, 
> hence the FetchResponse did not reach B2, whose HW remains 0.
> From the consumer's point of view, it could first see the latest offset of 
> 100 (from B1), and then see the latest offset of 0 (from B2), and then the 
> latest offset gradually catch up to 100.
> This is because we use HW to guard the ListOffset and 
> Fetch-from-ordinary-consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6406) Topic deletion fails and kafka shuts down (on windows only)

2018-04-18 Thread Kevin Vasko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443083#comment-16443083
 ] 

Kevin Vasko edited comment on KAFKA-6406 at 4/18/18 7:41 PM:
-

I get this same error message. I am on windows 10 and Kafka 2.11-1.1.0. To 
reproduce simply do

 

{{bin}}{{/zookeeper-server-start}}{{.sh config}}{{/zookeeper}}{{.properties}}

{{bin}}{{/kafka-server-start}}{{.sh config}}{{/server}}{{.properties}}

{{bin}}{{/kafka-topics}}{{.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test}}

{{bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete test --topic 
test}}

{{Kafka immediately throws an access denied error message.}}

 

 

{{[2018-04-18 14:29:02,843] ERROR Error while renaming dir for test-0 in log 
dir C:\tmp\kafka-logs (kafka.server.LogDirFailureChannel)}}
 {{java.nio.file.AccessDeniedException: C:\tmp\kafka-logs\test-0 -> 
C:\tmp\kafka-logs\test-0.9b7da533fd9d4b1f8ad0e783f16ad1ee-delete}}
 {{    at sun.nio.fs.WindowsException.translateToIOException(Unknown 
Source)}}
 {{    at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)}}
 {{    at sun.nio.fs.WindowsFileCopy.move(Unknown Source)}}
 {{    at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source)}}
 {{    at java.nio.file.Files.move(Unknown Source)}}
 {{    at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)}}
 {{    at kafka.log.Log$$anonfun$renameDir$1.apply$mcV$sp(Log.scala:579)}}
 {{    at kafka.log.Log$$anonfun$renameDir$1.apply(Log.scala:577)}}
 {{    at kafka.log.Log$$anonfun$renameDir$1.apply(Log.scala:577)}}
 {{    at kafka.log.Log.maybeHandleIOException(Log.scala:1678)}}
 {{    at kafka.log.Log.renameDir(Log.scala:577)}}
 {{    at kafka.log.LogManager.asyncDelete(LogManager.scala:813)}}
 {{    at 
kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:240)}}
 {{    at 
kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:235)}}
 {{    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)}}
 {{    at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)}}
 {{    at kafka.cluster.Partition.delete(Partition.scala:235)}}
 {{    at 
kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:347)}}
 {{    at 
kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:377)}}
 {{    at 
kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:375)}}
 {{    at scala.collection.Iterator$class.foreach(Iterator.scala:891)}}
 {{    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)}}
 {{    at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)}}
 {{    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}
 {{    at 
kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:375)}}
 {{    at 
kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:198)}}
 {{    at kafka.server.KafkaApis.handle(KafkaApis.scala:109)}}
 {{    at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
 {{    at java.lang.Thread.run(Unknown Source)}}
 {{    Suppressed: java.nio.file.AccessDeniedException: 
C:\tmp\kafka-logs\test-0 -> 
C:\tmp\kafka-logs\test-0.9b7da533fd9d4b1f8ad0e783f16ad1ee-delete}}
 {{    at 
sun.nio.fs.WindowsException.translateToIOException(Unknown Source)}}
 {{    at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown 
Source)}}
 {{    at sun.nio.fs.WindowsFileCopy.move(Unknown Source)}}
 {{    at sun.nio.fs.WindowsFileSystemProvider.move(Unknown 
Source)}}
 {{    at java.nio.file.Files.move(Unknown Source)}}
 {{    at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)}}
 {{    ... 23 more}}
 {{[2018-04-18 14:29:02,847] INFO [ReplicaManager broker=0] Stopping serving 
replicas in dir C:\tmp\kafka-logs (kafka.server.ReplicaManager)}}
 {{[2018-04-18 14:29:02,850] INFO [ReplicaFetcherManager on broker 0] Removed 
fetcher for partitions  (kafka.server.ReplicaFetcherManager)}}
 {{[2018-04-18 14:29:02,851] INFO [ReplicaAlterLogDirsManager on broker 0] 
Removed fetcher for partitions  (kafka.server.ReplicaAlterLogDirsManager)}}
 {{[2018-04-18 14:29:02,855] INFO [ReplicaManager broker=0] Broker 0 stopped 
fetcher for partitions  and stopped moving logs for partitions  because they 
are in the failed log directory C:\tmp\kafka-logs. 
(kafka.server.ReplicaManager)}}
 {{[2018-04-18 14:29:02,855] INFO Stopping serving logs in dir 
C:\tmp\kafka-logs (kafka.log.LogManager)}}
 {{[2018-04-18 14:29:02,858] ERROR Shutdown broker because all log dirs in 
C:\tmp\kafka-logs have failed (kafka.log.LogManager)}}


was (Author: kur1j):
I get this same error message. I am on win

[jira] [Comment Edited] (KAFKA-6406) Topic deletion fails and kafka shuts down (on windows only)

2018-04-18 Thread Kevin Vasko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443083#comment-16443083
 ] 

Kevin Vasko edited comment on KAFKA-6406 at 4/18/18 7:41 PM:
-

I get this same error message. I am on windows 10 and Kafka 2.11-1.1.0. To 
reproduce simply do

 

{{bin}}{{/zookeeper-server-start}}{{.sh config}}{{/zookeeper}}{{.properties}}

{{bin}}{{/kafka-server-start}}{{.sh config}}{{/server}}{{.properties}}{{}}

{{bin}}{{/kafka-topics}}{{.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test}}

{{bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete test --topic 
test}}

{{Kafka immediately throws an access denied error message.}}

 

 

{{[2018-04-18 14:29:02,843] ERROR Error while renaming dir for test-0 in log 
dir C:\tmp\kafka-logs (kafka.server.LogDirFailureChannel)}}
 {{java.nio.file.AccessDeniedException: C:\tmp\kafka-logs\test-0 -> 
C:\tmp\kafka-logs\test-0.9b7da533fd9d4b1f8ad0e783f16ad1ee-delete}}
 {{    at sun.nio.fs.WindowsException.translateToIOException(Unknown 
Source)}}
 {{    at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)}}
 {{    at sun.nio.fs.WindowsFileCopy.move(Unknown Source)}}
 {{    at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source)}}
 {{    at java.nio.file.Files.move(Unknown Source)}}
 {{    at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)}}
 {{    at kafka.log.Log$$anonfun$renameDir$1.apply$mcV$sp(Log.scala:579)}}
 {{    at kafka.log.Log$$anonfun$renameDir$1.apply(Log.scala:577)}}
 {{    at kafka.log.Log$$anonfun$renameDir$1.apply(Log.scala:577)}}
 {{    at kafka.log.Log.maybeHandleIOException(Log.scala:1678)}}
 {{    at kafka.log.Log.renameDir(Log.scala:577)}}
 {{    at kafka.log.LogManager.asyncDelete(LogManager.scala:813)}}
 {{    at 
kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:240)}}
 {{    at 
kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:235)}}
 {{    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)}}
 {{    at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)}}
 {{    at kafka.cluster.Partition.delete(Partition.scala:235)}}
 {{    at 
kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:347)}}
 {{    at 
kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:377)}}
 {{    at 
kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:375)}}
 {{    at scala.collection.Iterator$class.foreach(Iterator.scala:891)}}
 {{    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)}}
 {{    at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)}}
 {{    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}
 {{    at 
kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:375)}}
 {{    at 
kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:198)}}
 {{    at kafka.server.KafkaApis.handle(KafkaApis.scala:109)}}
 {{    at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
 {{    at java.lang.Thread.run(Unknown Source)}}
 {{    Suppressed: java.nio.file.AccessDeniedException: 
C:\tmp\kafka-logs\test-0 -> 
C:\tmp\kafka-logs\test-0.9b7da533fd9d4b1f8ad0e783f16ad1ee-delete}}
 {{    at 
sun.nio.fs.WindowsException.translateToIOException(Unknown Source)}}
 {{    at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown 
Source)}}
 {{    at sun.nio.fs.WindowsFileCopy.move(Unknown Source)}}
 {{    at sun.nio.fs.WindowsFileSystemProvider.move(Unknown 
Source)}}
 {{    at java.nio.file.Files.move(Unknown Source)}}
 {{    at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)}}
 {{    ... 23 more}}
 {{[2018-04-18 14:29:02,847] INFO [ReplicaManager broker=0] Stopping serving 
replicas in dir C:\tmp\kafka-logs (kafka.server.ReplicaManager)}}
 {{[2018-04-18 14:29:02,850] INFO [ReplicaFetcherManager on broker 0] Removed 
fetcher for partitions  (kafka.server.ReplicaFetcherManager)}}
 {{[2018-04-18 14:29:02,851] INFO [ReplicaAlterLogDirsManager on broker 0] 
Removed fetcher for partitions  (kafka.server.ReplicaAlterLogDirsManager)}}
 {{[2018-04-18 14:29:02,855] INFO [ReplicaManager broker=0] Broker 0 stopped 
fetcher for partitions  and stopped moving logs for partitions  because they 
are in the failed log directory C:\tmp\kafka-logs. 
(kafka.server.ReplicaManager)}}
 {{[2018-04-18 14:29:02,855] INFO Stopping serving logs in dir 
C:\tmp\kafka-logs (kafka.log.LogManager)}}
 {{[2018-04-18 14:29:02,858] ERROR Shutdown broker because all log dirs in 
C:\tmp\kafka-logs have failed (kafka.log.LogManager)}}


was (Author: kur1j):
I get this same error message. I am on

[jira] [Commented] (KAFKA-6406) Topic deletion fails and kafka shuts down (on windows only)

2018-04-18 Thread Kevin Vasko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443083#comment-16443083
 ] 

Kevin Vasko commented on KAFKA-6406:


I get this same error message. I am on windows 10 and Kafka 2.11-1.1.0. To 
reproduce simply do

 

{{bin}}{{/zookeeper-server-start}}{{.sh config}}{{/zookeeper}}{{.properties}}

{{bin}}{{/kafka-server-start}}{{.sh config}}{{/server}}{{.properties}}{{}}

{{bin}}{{/kafka-topics}}{{.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic }}{{test}}{{}}

{{bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete test --topic 
test}}

{{Kafka immediately throws an access denied error message.}}

 

 

{{[2018-04-18 14:29:02,843] ERROR Error while renaming dir for test-0 in log 
dir C:\tmp\kafka-logs (kafka.server.LogDirFailureChannel)}}
{{java.nio.file.AccessDeniedException: C:\tmp\kafka-logs\test-0 -> 
C:\tmp\kafka-logs\test-0.9b7da533fd9d4b1f8ad0e783f16ad1ee-delete}}
{{    at sun.nio.fs.WindowsException.translateToIOException(Unknown 
Source)}}
{{    at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)}}
{{    at sun.nio.fs.WindowsFileCopy.move(Unknown Source)}}
{{    at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source)}}
{{    at java.nio.file.Files.move(Unknown Source)}}
{{    at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)}}
{{    at kafka.log.Log$$anonfun$renameDir$1.apply$mcV$sp(Log.scala:579)}}
{{    at kafka.log.Log$$anonfun$renameDir$1.apply(Log.scala:577)}}
{{    at kafka.log.Log$$anonfun$renameDir$1.apply(Log.scala:577)}}
{{    at kafka.log.Log.maybeHandleIOException(Log.scala:1678)}}
{{    at kafka.log.Log.renameDir(Log.scala:577)}}
{{    at kafka.log.LogManager.asyncDelete(LogManager.scala:813)}}
{{    at 
kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:240)}}
{{    at 
kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:235)}}
{{    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)}}
{{    at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)}}
{{    at kafka.cluster.Partition.delete(Partition.scala:235)}}
{{    at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:347)}}
{{    at 
kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:377)}}
{{    at 
kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:375)}}
{{    at scala.collection.Iterator$class.foreach(Iterator.scala:891)}}
{{    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)}}
{{    at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)}}
{{    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}
{{    at 
kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:375)}}
{{    at 
kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:198)}}
{{    at kafka.server.KafkaApis.handle(KafkaApis.scala:109)}}
{{    at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
{{    at java.lang.Thread.run(Unknown Source)}}
{{    Suppressed: java.nio.file.AccessDeniedException: 
C:\tmp\kafka-logs\test-0 -> 
C:\tmp\kafka-logs\test-0.9b7da533fd9d4b1f8ad0e783f16ad1ee-delete}}
{{    at sun.nio.fs.WindowsException.translateToIOException(Unknown 
Source)}}
{{    at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown 
Source)}}
{{    at sun.nio.fs.WindowsFileCopy.move(Unknown Source)}}
{{    at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source)}}
{{    at java.nio.file.Files.move(Unknown Source)}}
{{    at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)}}
{{    ... 23 more}}
{{[2018-04-18 14:29:02,847] INFO [ReplicaManager broker=0] Stopping serving 
replicas in dir C:\tmp\kafka-logs (kafka.server.ReplicaManager)}}
{{[2018-04-18 14:29:02,850] INFO [ReplicaFetcherManager on broker 0] Removed 
fetcher for partitions  (kafka.server.ReplicaFetcherManager)}}
{{[2018-04-18 14:29:02,851] INFO [ReplicaAlterLogDirsManager on broker 0] 
Removed fetcher for partitions  (kafka.server.ReplicaAlterLogDirsManager)}}
{{[2018-04-18 14:29:02,855] INFO [ReplicaManager broker=0] Broker 0 stopped 
fetcher for partitions  and stopped moving logs for partitions  because they 
are in the failed log directory C:\tmp\kafka-logs. 
(kafka.server.ReplicaManager)}}
{{[2018-04-18 14:29:02,855] INFO Stopping serving logs in dir C:\tmp\kafka-logs 
(kafka.log.LogManager)}}
{{[2018-04-18 14:29:02,858] ERROR Shutdown broker because all log dirs in 
C:\tmp\kafka-logs have failed (kafka.log.LogManager)}}

> Topic deletion fails and kafka shuts down (on windows only)
> ---
>
> Key

[jira] [Updated] (KAFKA-6802) Improve logging when topics aren't known and assignments skipped

2018-04-18 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-6802:
---
Component/s: streams

> Improve logging when topics aren't known and assignments skipped
> 
>
> Key: KAFKA-6802
> URL: https://issues.apache.org/jira/browse/KAFKA-6802
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6802) Improve logging when topics aren't known and assignments skipped

2018-04-18 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-6802:
---
Fix Version/s: 1.2.0

> Improve logging when topics aren't known and assignments skipped
> 
>
> Key: KAFKA-6802
> URL: https://issues.apache.org/jira/browse/KAFKA-6802
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6802) Improve logging when topics aren't known and assignments skipped

2018-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443022#comment-16443022
 ] 

ASF GitHub Bot commented on KAFKA-6802:
---

bbejeck opened a new pull request #4891: KAFKA-6802: Improved logging for 
missing topics during task assignment
URL: https://github.com/apache/kafka/pull/4891
 
 
   If users don't create all topics before starting a streams application, they 
could get unexpected results.  For example, sharing a state store between 
sub-topologies where one input topic is not created ahead time results in log 
message that that "Partition X is not assigned to any tasks" does not give any 
clues as to how this could have occurred.
   
   Also, this PR changes the log level from `INFO` to `WARN` when metadata does 
not have partitions for a given topic.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve logging when topics aren't known and assignments skipped
> 
>
> Key: KAFKA-6802
> URL: https://issues.apache.org/jira/browse/KAFKA-6802
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6802) Improve logging when topics aren't known and assignments skipped

2018-04-18 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-6802:
--

 Summary: Improve logging when topics aren't known and assignments 
skipped
 Key: KAFKA-6802
 URL: https://issues.apache.org/jira/browse/KAFKA-6802
 Project: Kafka
  Issue Type: Improvement
Reporter: Bill Bejeck
Assignee: Bill Bejeck






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6798) Kafka leader rebalance failures

2018-04-18 Thread Riley Zimmerman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442941#comment-16442941
 ] 

Riley Zimmerman edited comment on KAFKA-6798 at 4/18/18 5:46 PM:
-

Around once a minute I see this, otherwise nothing bad in the zookeeper logs:
{noformat}
2018-04-18 17:00:56,896 [myid:0] - WARN 
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@368] - caught end of 
stream exception
EndOfStreamException: Unable to read additional data from client sessionid 0x0, 
likely client has closed socket
at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
at java.lang.Thread.run(Thread.java:811)
{noformat}
I'm thinking this could be a dupe of 
https://issues.apache.org/jira/browse/KAFKA-2729 (and that I need to accelerate 
getting a kafka 1.1.0 image from my dev team).  

Should there be zookeeper session expiration in proper operation?  Or is this 
whole sequence bad?
{noformat}
[2018-04-14 16:10:03,672] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-04-14 16:10:03,974] INFO Opening socket connection to server 
server-zookeeper-1.server-zookeeper.default.svc.cluster.local/10.1.42.11:2181. 
Will not attempt to authenticate using SASL (unknown error) 
(org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,982] INFO Socket connection established to 
server-zookeeper-1.server-zookeeper.default.svc.cluster.local/10.1.42.11:2181, 
initiating session (org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,983] WARN Unable to reconnect to ZooKeeper service, 
session 0x262c0a0b1752762 has expired (org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,983] INFO Unable to reconnect to ZooKeeper service, 
session 0x262c0a0b1752762 has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,983] INFO Initiating client connection, 
connectString=amserver-zookeeper:2181 sessionTimeout=6000 
watcher=org.I0Itec.zkclient.ZkClient@ac73a732 (org.apache.zookeeper.ZooKeeper)
[2018-04-14 16:10:04,166] INFO Opening socket connection to server 
amserver-zookeeper-1.amserver-zookeeper.default.svc.cluster.local/10.1.42.11:2181.
 Will not attempt to authenticate using SASL (unknown error) 
(org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:04,166] INFO Socket connection established to 
server-zookeeper-1.server-zookeeper.default.svc.cluster.local/10.1.42.11:2181, 
initiating session (org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:04,169] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient){noformat}
I'm looking for any 4 hour kubernetes/calico operation/timeout that could cause 
this.  

 


was (Author: rdzimmer):
Around once a minute I see this, otherwise nothing bad in the zookeeper logs:
{noformat}
2018-04-18 17:00:56,896 [myid:0] - WARN 
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@368] - caught end of 
stream exception
EndOfStreamException: Unable to read additional data from client sessionid 0x0, 
likely client has closed socket
at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
at java.lang.Thread.run(Thread.java:811)
{noformat}
I'm thinking this could be a dupe of 
https://issues.apache.org/jira/browse/KAFKA-2729 (and that I need to 
accelerating getting a kafka 1.1.0 image from my dev team).  

Should there be zookeeper session expiration in proper operation?  Or is this 
whole sequence bad?
{noformat}
[2018-04-14 16:10:03,672] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-04-14 16:10:03,974] INFO Opening socket connection to server 
server-zookeeper-1.server-zookeeper.default.svc.cluster.local/10.1.42.11:2181. 
Will not attempt to authenticate using SASL (unknown error) 
(org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,982] INFO Socket connection established to 
server-zookeeper-1.server-zookeeper.default.svc.cluster.local/10.1.42.11:2181, 
initiating session (org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,983] WARN Unable to reconnect to ZooKeeper service, 
session 0x262c0a0b1752762 has expired (org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,983] INFO Unable to reconnect to ZooKeeper service, 
session 0x262c0a0b1752762 has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,983] INFO Initiating client connection, 
connectString=amserver-zookeeper:2181 sessionTimeout=6000 
watcher=org.I0Itec.zkclient.ZkClient@ac73a732 (org.apache.zookeeper.ZooKeeper)
[2018-04-14 16:10:04,166] INFO Opening socket connection to server 
amserver-zookeeper-1.amserver-zookeeper.default.svc.cluster.local/10.1.42.11:2181.
 Will not attempt to authenticate using SASL (unknown error) 
(org.apache

[jira] [Commented] (KAFKA-6798) Kafka leader rebalance failures

2018-04-18 Thread Riley Zimmerman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442941#comment-16442941
 ] 

Riley Zimmerman commented on KAFKA-6798:


Around once a minute I see this, otherwise nothing bad in the zookeeper logs:
{noformat}
2018-04-18 17:00:56,896 [myid:0] - WARN 
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@368] - caught end of 
stream exception
EndOfStreamException: Unable to read additional data from client sessionid 0x0, 
likely client has closed socket
at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
at java.lang.Thread.run(Thread.java:811)
{noformat}
I'm thinking this could be a dupe of 
https://issues.apache.org/jira/browse/KAFKA-2729 (and that I need to 
accelerating getting a kafka 1.1.0 image from my dev team).  

Should there be zookeeper session expiration in proper operation?  Or is this 
whole sequence bad?
{noformat}
[2018-04-14 16:10:03,672] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-04-14 16:10:03,974] INFO Opening socket connection to server 
server-zookeeper-1.server-zookeeper.default.svc.cluster.local/10.1.42.11:2181. 
Will not attempt to authenticate using SASL (unknown error) 
(org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,982] INFO Socket connection established to 
server-zookeeper-1.server-zookeeper.default.svc.cluster.local/10.1.42.11:2181, 
initiating session (org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,983] WARN Unable to reconnect to ZooKeeper service, 
session 0x262c0a0b1752762 has expired (org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,983] INFO Unable to reconnect to ZooKeeper service, 
session 0x262c0a0b1752762 has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,983] INFO Initiating client connection, 
connectString=amserver-zookeeper:2181 sessionTimeout=6000 
watcher=org.I0Itec.zkclient.ZkClient@ac73a732 (org.apache.zookeeper.ZooKeeper)
[2018-04-14 16:10:04,166] INFO Opening socket connection to server 
amserver-zookeeper-1.amserver-zookeeper.default.svc.cluster.local/10.1.42.11:2181.
 Will not attempt to authenticate using SASL (unknown error) 
(org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:04,166] INFO Socket connection established to 
server-zookeeper-1.server-zookeeper.default.svc.cluster.local/10.1.42.11:2181, 
initiating session (org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:04,169] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient){noformat}
I'm looking for any 4 hour kubernetes/calico operation/timeout that could cause 
this.  

 

> Kafka leader rebalance failures
> ---
>
> Key: KAFKA-6798
> URL: https://issues.apache.org/jira/browse/KAFKA-6798
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1, 1.0.1
>Reporter: Riley Zimmerman
>Priority: Critical
>
> I am running 3 Kafka (version 0.10.2.1 and more recently moved to 1.0.1) with 
> 3 Zookeeper (v3.4.9) as statefulsets in a kubernetes v1.9.1 deployment.  My 
> partitions are replication factor 3.  My main workload involves a kafka 
> streams consumer/producer (storing offsets in kafka) and a second kafka 
> consumer storing offsets in zookeeper (only commits every 30 seconds).  There 
> are ~200,000 kafka messages going through each per minute.  The log.retention 
> settings are all 4 hours.  I have auto.leader.rebalance.enabled.  
> I am randomly having failures during the rebalances.  The result is that 
> partitions for both topics and consumer_offsets go out of sync and the 
> partition leader becomes -1.  After 4 hours there is another (auto?) 
> rebalance and sometimes it sorts itself out.  Sometimes it runs for weeks 
> without problems, other times it it happens multiple times in a few days.  It 
> appears to happen earlier in test runs if it is going to happen.   
> {noformat}
> Topic:__consumer_offsetsPartitionCount:50   ReplicationFactor:3   
>   
> Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
> Topic: __consumer_offsets   Partition: 0Leader: -1  
> Replicas: 2,0,1 Isr:
> Topic: __consumer_offsets   Partition: 1Leader: 0   
> Replicas: 0,1,2 Isr: 1,2,0
> Topic: __consumer_offsets   Partition: 2Leader: 1   
> Replicas: 1,2,0 Isr: 2,1,0
> Topic: __consumer_offsets   Partition: 3Leader: -1  
> Replicas: 2,1,0 Isr:
> {noformat}
> {noformat}
> [2018-03-20 12:42:32,180] WARN [Controller 2]: Partition [agent.metadata,5] 
> failed to complete preferred replica leader election. Leader is -1 
> (kafka.controller.KafkaController)
> {noformat}
> {noformat}
> [2018-03-20 11:02:32,099] TRACE 

[jira] [Commented] (KAFKA-6798) Kafka leader rebalance failures

2018-04-18 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442861#comment-16442861
 ] 

Manikumar commented on KAFKA-6798:
--

Yes, rebalances are due to zookeeper disconnects. Since it is happening on all 
brokers at the same, this may due to some network configuration/timeouts in 
your environment.  Can you check zookeeper logs for issues?

> Kafka leader rebalance failures
> ---
>
> Key: KAFKA-6798
> URL: https://issues.apache.org/jira/browse/KAFKA-6798
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1, 1.0.1
>Reporter: Riley Zimmerman
>Priority: Critical
>
> I am running 3 Kafka (version 0.10.2.1 and more recently moved to 1.0.1) with 
> 3 Zookeeper (v3.4.9) as statefulsets in a kubernetes v1.9.1 deployment.  My 
> partitions are replication factor 3.  My main workload involves a kafka 
> streams consumer/producer (storing offsets in kafka) and a second kafka 
> consumer storing offsets in zookeeper (only commits every 30 seconds).  There 
> are ~200,000 kafka messages going through each per minute.  The log.retention 
> settings are all 4 hours.  I have auto.leader.rebalance.enabled.  
> I am randomly having failures during the rebalances.  The result is that 
> partitions for both topics and consumer_offsets go out of sync and the 
> partition leader becomes -1.  After 4 hours there is another (auto?) 
> rebalance and sometimes it sorts itself out.  Sometimes it runs for weeks 
> without problems, other times it it happens multiple times in a few days.  It 
> appears to happen earlier in test runs if it is going to happen.   
> {noformat}
> Topic:__consumer_offsetsPartitionCount:50   ReplicationFactor:3   
>   
> Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
> Topic: __consumer_offsets   Partition: 0Leader: -1  
> Replicas: 2,0,1 Isr:
> Topic: __consumer_offsets   Partition: 1Leader: 0   
> Replicas: 0,1,2 Isr: 1,2,0
> Topic: __consumer_offsets   Partition: 2Leader: 1   
> Replicas: 1,2,0 Isr: 2,1,0
> Topic: __consumer_offsets   Partition: 3Leader: -1  
> Replicas: 2,1,0 Isr:
> {noformat}
> {noformat}
> [2018-03-20 12:42:32,180] WARN [Controller 2]: Partition [agent.metadata,5] 
> failed to complete preferred replica leader election. Leader is -1 
> (kafka.controller.KafkaController)
> {noformat}
> {noformat}
> [2018-03-20 11:02:32,099] TRACE Controller 2 epoch 27 started leader election 
> for partition [__consumer_offsets,30] (state.change.logger)
> [2018-03-20 11:02:32,101] ERROR Controller 2 epoch 27 encountered error while 
> electing leader for partition [__consumer_offsets,30] due to: Preferred 
> replica 2 for partition [__consumer_offsets,30] is either not alive or not in 
> the isr. Current leader and ISR: [{"leader":-1,"leader_epoch":59,"isr":[]}]. 
> (state.change.logger)
> [2018-03-20 11:02:32,101] ERROR Controller 2 epoch 27 initiated state change 
> for partition [__consumer_offsets,30] from OnlinePartition to OnlinePartition 
> failed (state.change.logger)
> kafka.common.StateChangeFailedException: encountered error while electing 
> leader for partition [__consumer_offsets,30] due to: Preferred replica 2 for 
> partition [__consumer_offsets,30] is either not alive or not in the isr. 
> Current leader and ISR: [{"leader":-1,"leader_epoch":59,"isr":[]}].
>   at 
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:362)
>   at 
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:202)
>   at 
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:141)
>   at 
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:140)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
>   at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:140)
>   at 
> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:662)
>   at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$16$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1230)
>   at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$16$$anonfun$apply$5.apply(KafkaController.scala:1225)
>   at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$16$$anonfun$apply$5.apply(KafkaController.scala:1225)
>   at kafka.utils.CoreUtils

[jira] [Commented] (KAFKA-3365) Add a documentation field for types and update doc generation

2018-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442773#comment-16442773
 ] 

ASF GitHub Bot commented on KAFKA-3365:
---

hachikuji closed pull request #4735: KAFKA-3365 Add a documentation field for 
types and update doc generation
URL: https://github.com/apache/kafka/pull/4735
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 5e4c35643c2..4f2eb15051e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -634,6 +634,13 @@ project(':core') {
 standardOutput = new File(generatedDocsDir, 
"protocol_errors.html").newOutputStream()
   }
 
+  task genProtocolTypesDocs(type: JavaExec) {
+classpath = sourceSets.main.runtimeClasspath
+main = 'org.apache.kafka.common.protocol.types.Type'
+if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
+standardOutput = new File(generatedDocsDir, 
"protocol_types.html").newOutputStream()
+  }
+
   task genProtocolApiKeyDocs(type: JavaExec) {
 classpath = sourceSets.main.runtimeClasspath
 main = 'org.apache.kafka.common.protocol.ApiKeys'
@@ -697,7 +704,7 @@ project(':core') {
 standardOutput = new File(generatedDocsDir, 
"producer_metrics.html").newOutputStream()
   }
 
-  task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 
'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
+  task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolTypesDocs', 
'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
'genAdminClientConfigDocs', 
'genProducerConfigDocs', 'genConsumerConfigDocs',
'genKafkaConfigDocs', 'genTopicConfigDocs',
':connect:runtime:genConnectConfigDocs', 
':connect:runtime:genConnectTransformationDocs',
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
index 4213ecd08f0..6609dfd5196 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
@@ -16,12 +16,16 @@
  */
 package org.apache.kafka.common.protocol.types;
 
+import org.apache.kafka.common.protocol.types.Type.DocumentedType;
+
 import java.nio.ByteBuffer;
 
 /**
  * Represents a type for an array of a particular type
  */
-public class ArrayOf extends Type {
+public class ArrayOf extends DocumentedType {
+
+private static final String ARRAY_TYPE_NAME = "ARRAY";
 
 private final Type type;
 private final boolean nullable;
@@ -93,7 +97,7 @@ public Type type() {
 
 @Override
 public String toString() {
-return "ARRAY(" + type + ")";
+return ARRAY_TYPE_NAME + "(" + type + ")";
 }
 
 @Override
@@ -110,4 +114,18 @@ public String toString() {
 throw new SchemaException("Not an Object[].");
 }
 }
+
+@Override
+public String typeName() {
+return ARRAY_TYPE_NAME;
+}
+
+@Override
+public String documentation() {
+return "Represents a sequence of objects of a given type T. " +
+"Type T can be either a primitive type (e.g. " + STRING + ") 
or a structure. " +
+"First, the length N is given as an " + INT32 + ". Then N 
instances of type T follow. " +
+"A null array is represented with a length of -1. " +
+"In protocol documentation an array of T instances is referred 
to as [T].";
+}
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index faa1540b499..cbcd4491d37 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -193,6 +193,4 @@ public void visit(Schema schema) {}
 public void visit(ArrayOf array) {}
 public void visit(Type field) {}
 }
-
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 57d31f459fb..85916d57d2d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -63,6 +63,29 @@ public boolean isNullable() {
 return false;
 }
 
+/**
+ * A Type that can return its description for documentation purposes.
+ */
+public static abstract class DocumentedType extends Type {
+
+/**
+ * Short nam

[jira] [Resolved] (KAFKA-3365) Add a documentation field for types and update doc generation

2018-04-18 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-3365.

Resolution: Fixed

> Add a documentation field for types and update doc generation
> -
>
> Key: KAFKA-3365
> URL: https://issues.apache.org/jira/browse/KAFKA-3365
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Andras Beni
>Priority: Major
>
> Currently the type class does not allow a documentation field. This means we 
> can't auto generate a high level documentation summary for each type in the 
> protocol. Adding this field and updating the generated output would be 
> valuable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6798) Kafka leader rebalance failures

2018-04-18 Thread Riley Zimmerman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442765#comment-16442765
 ] 

Riley Zimmerman commented on KAFKA-6798:


Looking at the verbosegc, I do see large mark/sweeps every few hours.  However, 
they are not regular and are not matching up with the zookeeper timeouts.  
Worst time I see in PMAT is 1884ms "GC Completed".  The Used Memory is going 
from ~1GB to ~430MB after the big mark/sweeps every few hours.  
{noformat}
cat gc.out | grep mark



























{noformat}
 

> Kafka leader rebalance failures
> ---
>
> Key: KAFKA-6798
> URL: https://issues.apache.org/jira/browse/KAFKA-6798
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1, 1.0.1
>Reporter: Riley Zimmerman
>Priority: Critical
>
> I am running 3 Kafka (version 0.10.2.1 and more recently moved to 1.0.1) with 
> 3 Zookeeper (v3.4.9) as statefulsets in a kubernetes v1.9.1 deployment.  My 
> partitions are replication factor 3.  My main workload involves a kafka 
> streams consumer/producer (storing offsets in kafka) and a second kafka 
> consumer storing offsets in zookeeper (only commits every 30 seconds).  There 
> are ~200,000 kafka messages going through each per minute.  The log.retention 
> settings are all 4 hours.  I have auto.leader.rebalance.enabled.  
> I am randomly having failures during the rebalances.  The result is that 
> partitions for both topics and consumer_offsets go out of sync and the 
> partition leader becomes -1.  After 4 hours there is another (auto?) 
> rebalance and sometimes it sorts itself out.  Sometimes it runs for weeks 
> without problems, other times it it happens multiple times in a few days.  It 
> appears to happen earlier in test runs if it is going to happen.   
> {noformat}
> Topic:__consumer_offsetsPartitionCount:50   ReplicationFactor:3   
>   
> Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
> Topic: __consumer_offsets   Partition: 0Leader: -1  
> Replicas: 2,0,1 Isr:
> Topic: __consumer_offsets   Partition: 1Leader: 0   
> Replicas: 0,1,2 Isr: 1,2,0
> Topic: __consumer_offsets   Partition: 2Leader: 1   
> Replicas: 1,2,0 Isr: 2,1,0
> Topic: __consumer_offsets   Partition: 3Leader: -1  
> Replicas: 2,1,0 Isr:
> {noformat}
> {noformat}
> [2018-03-20 12:42:32,180] WARN [Controller 2]: Partition [agent.metadata,5] 
> failed to complete preferred replica leader election. Leader is -1 
> (kafka.controller.KafkaController)
> {noformat}
> {noformat}
> [2018-03-20 11:02:32,099] TRACE Controller 2 epoch 27 started leader election 
> for partition [__consumer_offsets,30] (state.change.logger)
> [2018-03-20 11:02:32,101] ERROR Controller 2 epoch 27 encountered error while 
> electing leader for partition [__consumer_offsets,30] due to: Preferred 
> replica 2 for partition [__consumer_offsets,30] is either not alive or not in 
> the isr. Current leader and ISR: [{"leader":-1,"leader_epoch":59,"isr":[]}]. 
> (state.change.logger)
> [2018-03-20 11:02:32,101] ERROR Controller 2 epoch 27 initiated state change 
> for partition [__consumer_offsets,30] from OnlinePartition to OnlinePartition 
> failed (state.change.logger)
> kafka.common.StateChangeFailedException: encountered error while electing 
> leader for partition [__consumer_offsets,30] due to: Preferred replica 2 for 
> partition [__consumer_offsets,30] is either not alive or not in the isr. 
> Current leader and ISR: [{"leader":-1,"leader_epoch":59,"isr":[]}].
>   at 
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:362)
>   at 
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:202)
>   at 
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:141)
>   at 
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:140)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
>   at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:140)
>   at 
> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:662)
>   at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$16$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1230)
>   at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$16$$anonfun$apply$5.apply(KafkaController.scala:1225)
>   at 
> kafka.controller.KafkaController$$anonfun$ka

[jira] [Commented] (KAFKA-6798) Kafka leader rebalance failures

2018-04-18 Thread Riley Zimmerman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442683#comment-16442683
 ] 

Riley Zimmerman commented on KAFKA-6798:


I was just about to post that I saw you recommend 30s timeout in 
[https://www.slideshare.net/HadoopSummit/apache-kafka-best-practices].  I will 
try increasing that.  I'm also considering increasing num.replica.fetchers (1) 
and num.network.threads (2).  I see the num.network.threads default is 3, I 
must have carried over 2 from the past in my setup.  


My log (data) retention is 4 hours.  Would a 4 hour log retention cause 
something to happen every 4 hours?  The data is coming in constantly each 
minute at the same rate.  The disk space usage is fairly level (no drop every 4 
hours).  

In my reading up on things that could cause this, it sounds like the rebalance 
is a result of the zookeeper state change (disconnect), correct?  All 3 kafka 
brokers have the same pattern at the same time.  It's interesting that the 
describe on the topics shows them go out of ISR:3 ~10 seconds before I see the 
zookeeper disconnect messages in the logs.  Not sure if that's a cause or 
side-effect.  

I'll check the verbosegc data next too.  Possibly a big GC is blocking things 
for too long.  

> Kafka leader rebalance failures
> ---
>
> Key: KAFKA-6798
> URL: https://issues.apache.org/jira/browse/KAFKA-6798
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1, 1.0.1
>Reporter: Riley Zimmerman
>Priority: Critical
>
> I am running 3 Kafka (version 0.10.2.1 and more recently moved to 1.0.1) with 
> 3 Zookeeper (v3.4.9) as statefulsets in a kubernetes v1.9.1 deployment.  My 
> partitions are replication factor 3.  My main workload involves a kafka 
> streams consumer/producer (storing offsets in kafka) and a second kafka 
> consumer storing offsets in zookeeper (only commits every 30 seconds).  There 
> are ~200,000 kafka messages going through each per minute.  The log.retention 
> settings are all 4 hours.  I have auto.leader.rebalance.enabled.  
> I am randomly having failures during the rebalances.  The result is that 
> partitions for both topics and consumer_offsets go out of sync and the 
> partition leader becomes -1.  After 4 hours there is another (auto?) 
> rebalance and sometimes it sorts itself out.  Sometimes it runs for weeks 
> without problems, other times it it happens multiple times in a few days.  It 
> appears to happen earlier in test runs if it is going to happen.   
> {noformat}
> Topic:__consumer_offsetsPartitionCount:50   ReplicationFactor:3   
>   
> Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
> Topic: __consumer_offsets   Partition: 0Leader: -1  
> Replicas: 2,0,1 Isr:
> Topic: __consumer_offsets   Partition: 1Leader: 0   
> Replicas: 0,1,2 Isr: 1,2,0
> Topic: __consumer_offsets   Partition: 2Leader: 1   
> Replicas: 1,2,0 Isr: 2,1,0
> Topic: __consumer_offsets   Partition: 3Leader: -1  
> Replicas: 2,1,0 Isr:
> {noformat}
> {noformat}
> [2018-03-20 12:42:32,180] WARN [Controller 2]: Partition [agent.metadata,5] 
> failed to complete preferred replica leader election. Leader is -1 
> (kafka.controller.KafkaController)
> {noformat}
> {noformat}
> [2018-03-20 11:02:32,099] TRACE Controller 2 epoch 27 started leader election 
> for partition [__consumer_offsets,30] (state.change.logger)
> [2018-03-20 11:02:32,101] ERROR Controller 2 epoch 27 encountered error while 
> electing leader for partition [__consumer_offsets,30] due to: Preferred 
> replica 2 for partition [__consumer_offsets,30] is either not alive or not in 
> the isr. Current leader and ISR: [{"leader":-1,"leader_epoch":59,"isr":[]}]. 
> (state.change.logger)
> [2018-03-20 11:02:32,101] ERROR Controller 2 epoch 27 initiated state change 
> for partition [__consumer_offsets,30] from OnlinePartition to OnlinePartition 
> failed (state.change.logger)
> kafka.common.StateChangeFailedException: encountered error while electing 
> leader for partition [__consumer_offsets,30] due to: Preferred replica 2 for 
> partition [__consumer_offsets,30] is either not alive or not in the isr. 
> Current leader and ISR: [{"leader":-1,"leader_epoch":59,"isr":[]}].
>   at 
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:362)
>   at 
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:202)
>   at 
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:141)
>   at 
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:140)
>   

[jira] [Commented] (KAFKA-6798) Kafka leader rebalance failures

2018-04-18 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442656#comment-16442656
 ] 

Manikumar commented on KAFKA-6798:
--

It does look like zk session timeout issue. Not sure why happening for every 
four hours. Are you observing same pattern on all broker nodes?  may be  
Network work or GC issue..  We can check GC logs. Can you try increasing zk 
session timeout and check?

> Kafka leader rebalance failures
> ---
>
> Key: KAFKA-6798
> URL: https://issues.apache.org/jira/browse/KAFKA-6798
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1, 1.0.1
>Reporter: Riley Zimmerman
>Priority: Critical
>
> I am running 3 Kafka (version 0.10.2.1 and more recently moved to 1.0.1) with 
> 3 Zookeeper (v3.4.9) as statefulsets in a kubernetes v1.9.1 deployment.  My 
> partitions are replication factor 3.  My main workload involves a kafka 
> streams consumer/producer (storing offsets in kafka) and a second kafka 
> consumer storing offsets in zookeeper (only commits every 30 seconds).  There 
> are ~200,000 kafka messages going through each per minute.  The log.retention 
> settings are all 4 hours.  I have auto.leader.rebalance.enabled.  
> I am randomly having failures during the rebalances.  The result is that 
> partitions for both topics and consumer_offsets go out of sync and the 
> partition leader becomes -1.  After 4 hours there is another (auto?) 
> rebalance and sometimes it sorts itself out.  Sometimes it runs for weeks 
> without problems, other times it it happens multiple times in a few days.  It 
> appears to happen earlier in test runs if it is going to happen.   
> {noformat}
> Topic:__consumer_offsetsPartitionCount:50   ReplicationFactor:3   
>   
> Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
> Topic: __consumer_offsets   Partition: 0Leader: -1  
> Replicas: 2,0,1 Isr:
> Topic: __consumer_offsets   Partition: 1Leader: 0   
> Replicas: 0,1,2 Isr: 1,2,0
> Topic: __consumer_offsets   Partition: 2Leader: 1   
> Replicas: 1,2,0 Isr: 2,1,0
> Topic: __consumer_offsets   Partition: 3Leader: -1  
> Replicas: 2,1,0 Isr:
> {noformat}
> {noformat}
> [2018-03-20 12:42:32,180] WARN [Controller 2]: Partition [agent.metadata,5] 
> failed to complete preferred replica leader election. Leader is -1 
> (kafka.controller.KafkaController)
> {noformat}
> {noformat}
> [2018-03-20 11:02:32,099] TRACE Controller 2 epoch 27 started leader election 
> for partition [__consumer_offsets,30] (state.change.logger)
> [2018-03-20 11:02:32,101] ERROR Controller 2 epoch 27 encountered error while 
> electing leader for partition [__consumer_offsets,30] due to: Preferred 
> replica 2 for partition [__consumer_offsets,30] is either not alive or not in 
> the isr. Current leader and ISR: [{"leader":-1,"leader_epoch":59,"isr":[]}]. 
> (state.change.logger)
> [2018-03-20 11:02:32,101] ERROR Controller 2 epoch 27 initiated state change 
> for partition [__consumer_offsets,30] from OnlinePartition to OnlinePartition 
> failed (state.change.logger)
> kafka.common.StateChangeFailedException: encountered error while electing 
> leader for partition [__consumer_offsets,30] due to: Preferred replica 2 for 
> partition [__consumer_offsets,30] is either not alive or not in the isr. 
> Current leader and ISR: [{"leader":-1,"leader_epoch":59,"isr":[]}].
>   at 
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:362)
>   at 
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:202)
>   at 
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:141)
>   at 
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:140)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
>   at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:140)
>   at 
> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:662)
>   at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$16$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1230)
>   at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$16$$anonfun$apply$5.apply(KafkaController.scala:1225)
>   at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$16$$anonfun$apply$5.apply(KafkaController.scala

[jira] [Created] (KAFKA-6801) Restrict Consumer to fetch data from secure port only, and deny from non-secure port.

2018-04-18 Thread VinayKumar (JIRA)
VinayKumar created KAFKA-6801:
-

 Summary: Restrict Consumer to fetch data from secure port only, 
and deny from non-secure port.
 Key: KAFKA-6801
 URL: https://issues.apache.org/jira/browse/KAFKA-6801
 Project: Kafka
  Issue Type: Task
  Components: admin, config, consumer, security
Affects Versions: 0.10.2.1
Reporter: VinayKumar


I have listeners configured with 2 ports as below:  (9092 -> Plaintext, 9092 -> 
SASL_PLAIN)
listeners=PLAINTEXT://:9092, SASL_PLAIN://:9093

For a topic, I want restrict Consumers to consume data from 9093 port only, and 
consuming data from 9092 port should be denied.

I've gone through ACL concept, but haven't seen an option to restrict Consumer 
pulling data from non-secure port (in this case- 9092)

Can someone please let me know if this is configurable ?
Can my requirement be fulfilled. Please provide necessary info.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6798) Kafka leader rebalance failures

2018-04-18 Thread Riley Zimmerman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442546#comment-16442546
 ] 

Riley Zimmerman commented on KAFKA-6798:


I have setup a script monitoring the liveness and readiness probe status from 
kubernetes for the 3 kafka and zookeeper.  At no time in the test did they go 
out of ready state or get restarted, including when the rebalance failures 
happened. 

I also have a script performing a `describe` on my main topic once every 5 
seconds running from each of the 3 kafka.  The command takes 3~6 seconds to 
complete, so I'm getting a view of the topic roughly every 10 seconds.  I can 
see when the ISR goes out of sync.  From the describe I can see that at ~Sat 
Apr 14 16:09:45 UTC the 6 partitions are all in ISR 0,1,2 (or 0,2,1...).  At 
~Sat Apr 14 16:09:52 UTC they are no longer all in sync based on the describe.  
Isr: 0,2
Inside the kafka server logs, the errors start at the beginning of minute Sat 
Apr 14 16:10, after the describe has shown they are out of sync.  
{noformat}
[2018-04-14 12:10:03,391] ERROR [KafkaApi-0] Error when handling request 
{controller_id=2,controller_epoch=1,partition_states=[{topic=metric.json,partition=7,controller_epoch=1,leader=0,leader_epoch=1,isr=[0,2],zk_version=1,replicas=[1,0,2]},
 ... 
,live_brokers=[{id=0,end_points=[{port=9092,host=10.1.42.8,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null},{id=2,end_points=[{port=9092,host=10.1.104.11,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null}]}
 (kafka.server.KafkaApis)
[2018-04-14 12:10:10,484] ERROR [ReplicaFetcherThread-0-1], Error for partition 
[metric.protobuf,0] to broker 
1:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread) 
[2018-04-14 12:10:12,108] ERROR [ReplicaFetcherThread-0-0], Current offset 
6201038 for partition [metric.protobuf,1] out of range; reset offset to 6200065 
(kafka.server.ReplicaFetcherThread){noformat}
 

I have a script running on my kubernetes master watching the 3 zookeepers by 
running `kubectl exec -it server-zookeeper-${1} /opt/stat.sh` each second.   
/opt/stat.sh is doing a `echo stat | nc `.  My max zk clients is 60, 
and I'm never above 20 in the stats.  Not seeing anything odd there, the 
connection always succeeds each second and the stats return successfully.  

However, most importantly I see these messages for zookeeper:
{noformat}
| grep -i zoo | grep -i connect{noformat}
{noformat}
[2018-04-14 16:10:03,672] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-04-14 16:10:03,974] INFO Opening socket connection to server 
server-zookeeper-1.server-zookeeper.default.svc.cluster.local/10.1.42.11:2181. 
Will not attempt to authenticate using SASL (unknown error) 
(org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,982] INFO Socket connection established to 
server-zookeeper-1.server-zookeeper.default.svc.cluster.local/10.1.42.11:2181, 
initiating session (org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,983] WARN Unable to reconnect to ZooKeeper service, 
session 0x262c0a0b1752762 has expired (org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,983] INFO Unable to reconnect to ZooKeeper service, 
session 0x262c0a0b1752762 has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,983] INFO Initiating client connection, 
connectString=amserver-zookeeper:2181 sessionTimeout=6000 
watcher=org.I0Itec.zkclient.ZkClient@ac73a732 (org.apache.zookeeper.ZooKeeper)
[2018-04-14 16:10:04,166] INFO Opening socket connection to server 
amserver-zookeeper-1.amserver-zookeeper.default.svc.cluster.local/10.1.42.11:2181.
 Will not attempt to authenticate using SASL (unknown error) 
(org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:04,166] INFO Socket connection established to 
server-zookeeper-1.server-zookeeper.default.svc.cluster.local/10.1.42.11:2181, 
initiating session (org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:04,169] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
{noformat}
These appear to happen (basically) every 4 hours:
{noformat}
| grep 'INFO Client session timed out, have not heard from server in'
[2018-04-14 12:10:00,365] INFO Client session timed out, have not heard from 
server in 10038ms for sessionid 0x62c09e25d2, closing socket connection and 
attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-04-14 16:10:03,570] INFO Client session timed out, have not heard from 
server in 11751ms for sessionid 0x262c0a0b1752762, closing socket connection 
and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-04-14 20:10:07,232] INFO Client session timed out, have not heard from 
server in 13073ms for sessionid 0x162c09e25d93120, closing socket connection 
and attempting reconnect (org.apach

[jira] [Commented] (KAFKA-5830) kafka-configs.sh should allow deletion of all configs for an entity

2018-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442491#comment-16442491
 ] 

ASF GitHub Bot commented on KAFKA-5830:
---

mimaison closed pull request #3785: KAFKA-5830: kafka-configs.sh should allow 
deletion of all configs for…
URL: https://github.com/apache/kafka/pull/3785
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 37b4e4c..bc1f7e0e73a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -54,7 +54,7 @@ object ConfigCommand extends Config {
 
 val opts = new ConfigCommandOptions(args)
 
-if(args.length == 0)
+if (args.isEmpty)
   CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity config 
for a topic, client, user or broker")
 
 opts.checkArgs()
@@ -81,6 +81,7 @@ object ConfigCommand extends Config {
   private[admin] def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions, 
utils: AdminUtilities = AdminUtils) {
 val configsToBeAdded = parseConfigsToBeAdded(opts)
 val configsToBeDeleted = parseConfigsToBeDeleted(opts)
+val deleteAllConfigs = opts.options.has(opts.deleteAllConfigs)
 val entity = parseEntity(opts)
 val entityType = entity.root.entityType
 val entityName = entity.fullSanitizedName
@@ -99,6 +100,9 @@ object ConfigCommand extends Config {
 configs ++= configsToBeAdded
 configsToBeDeleted.foreach(configs.remove(_))
 
+if (deleteAllConfigs)
+  configs.clear()
+
 utils.changeConfigs(zkUtils, entityType, entityName, configs)
 
 println(s"Completed Updating config for entity: $entity.")
@@ -285,12 +289,12 @@ object ConfigCommand extends Config {
 .withRequiredArg
 .describedAs("urls")
 .ofType(classOf[String])
-val alterOpt = parser.accepts("alter", "Alter the configuration for the 
entity.")
-val describeOpt = parser.accepts("describe", "List configs for the given 
entity.")
-val entityType = parser.accepts("entity-type", "Type of entity 
(topics/clients/users/brokers)")
+val alterOpt = parser.accepts("alter", "Alter the configuration for the 
given entity.")
+val describeOpt = parser.accepts("describe", "List configurations for the 
given entity.")
+val entityType = parser.accepts("entity-type", "Type of entity 
(topics/clients/users/brokers).")
 .withRequiredArg
 .ofType(classOf[String])
-val entityName = parser.accepts("entity-name", "Name of entity (topic 
name/client id/user principal name/broker id)")
+val entityName = parser.accepts("entity-name", "Name of entity (topic 
name/client id/user principal name/broker id).")
 .withRequiredArg
 .ofType(classOf[String])
 val entityDefault = parser.accepts("entity-default", "Default entity name 
for clients/users (applies to corresponding entity type in command line)")
@@ -304,28 +308,29 @@ object ConfigCommand extends Config {
 s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may 
be specified together to update config for clients of a specific user.")
 .withRequiredArg
 .ofType(classOf[String])
-val deleteConfig = parser.accepts("delete-config", "config keys to remove 
'k1,k2'")
+val deleteConfig = parser.accepts("delete-config", "Configuration keys to 
remove for the given entity: 'k1,k2'.")
 .withRequiredArg
 .ofType(classOf[String])
 .withValuesSeparatedBy(',')
+val deleteAllConfigs = parser.accepts("delete-all-configs", "Delete all 
configurations for the given entity.")
 val helpOpt = parser.accepts("help", "Print usage information.")
-val forceOpt = parser.accepts("force", "Suppress console prompts")
+val forceOpt = parser.accepts("force", "Suppress console prompts.")
 val options = parser.parse(args : _*)
 
-val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, 
entityName, addConfig, deleteConfig, helpOpt)
+val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, 
entityName, addConfig, deleteConfig, helpOpt, deleteAllConfigs)
 
 def checkArgs() {
   // should have exactly one action
   val actions = Seq(alterOpt, describeOpt).count(options.has _)
-  if(actions != 1)
+  if (actions != 1)
 CommandLineUtils.printUsageAndDie(parser, "Command must include 
exactly one action: --describe, --alter")
 
   // check required args
   CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOp

[jira] [Commented] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently

2018-04-18 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442490#comment-16442490
 ] 

Manikumar commented on KAFKA-6335:
--

[~Sonia] Can you give more details? Are you seeing test failure or an issue 
with SimpleAclAuthorizer? Can you also confirm the Kafka version? 

> SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails 
> intermittently
> --
>
> Key: KAFKA-6335
> URL: https://issues.apache.org/jira/browse/KAFKA-6335
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Manikumar
>Priority: Major
> Fix For: 1.2.0
>
>
> From 
> https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/
>  :
> {code}
> java.lang.AssertionError: expected acls Set(User:36 has Allow permission for 
> operations: Read from hosts: *, User:7 has Allow permission for operations: 
> Read from hosts: *, User:21 has Allow permission for operations: Read from 
> hosts: *, User:39 has Allow permission for operations: Read from hosts: *, 
> User:43 has Allow permission for operations: Read from hosts: *, User:3 has 
> Allow permission for operations: Read from hosts: *, User:35 has Allow 
> permission for operations: Read from hosts: *, User:15 has Allow permission 
> for operations: Read from hosts: *, User:16 has Allow permission for 
> operations: Read from hosts: *, User:22 has Allow permission for operations: 
> Read from hosts: *, User:26 has Allow permission for operations: Read from 
> hosts: *, User:11 has Allow permission for operations: Read from hosts: *, 
> User:38 has Allow permission for operations: Read from hosts: *, User:8 has 
> Allow permission for operations: Read from hosts: *, User:28 has Allow 
> permission for operations: Read from hosts: *, User:32 has Allow permission 
> for operations: Read from hosts: *, User:25 has Allow permission for 
> operations: Read from hosts: *, User:41 has Allow permission for operations: 
> Read from hosts: *, User:44 has Allow permission for operations: Read from 
> hosts: *, User:48 has Allow permission for operations: Read from hosts: *, 
> User:2 has Allow permission for operations: Read from hosts: *, User:9 has 
> Allow permission for operations: Read from hosts: *, User:14 has Allow 
> permission for operations: Read from hosts: *, User:46 has Allow permission 
> for operations: Read from hosts: *, User:13 has Allow permission for 
> operations: Read from hosts: *, User:5 has Allow permission for operations: 
> Read from hosts: *, User:29 has Allow permission for operations: Read from 
> hosts: *, User:45 has Allow permission for operations: Read from hosts: *, 
> User:6 has Allow permission for operations: Read from hosts: *, User:37 has 
> Allow permission for operations: Read from hosts: *, User:23 has Allow 
> permission for operations: Read from hosts: *, User:19 has Allow permission 
> for operations: Read from hosts: *, User:24 has Allow permission for 
> operations: Read from hosts: *, User:17 has Allow permission for operations: 
> Read from hosts: *, User:34 has Allow permission for operations: Read from 
> hosts: *, User:12 has Allow permission for operations: Read from hosts: *, 
> User:42 has Allow permission for operations: Read from hosts: *, User:4 has 
> Allow permission for operations: Read from hosts: *, User:47 has Allow 
> permission for operations: Read from hosts: *, User:18 has Allow permission 
> for operations: Read from hosts: *, User:31 has Allow permission for 
> operations: Read from hosts: *, User:49 has Allow permission for operations: 
> Read from hosts: *, User:33 has Allow permission for operations: Read from 
> hosts: *, User:1 has Allow permission for operations: Read from hosts: *, 
> User:27 has Allow permission for operations: Read from hosts: *) but got 
> Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 
> has Allow permission for operations: Read from hosts: *, User:21 has Allow 
> permission for operations: Read from hosts: *, User:39 has Allow permission 
> for operations: Read from hosts: *, User:43 has Allow permission for 
> operations: Read from hosts: *, User:3 has Allow permission for operations: 
> Read from hosts: *, User:35 has Allow permission for operations: Read from 
> hosts: *, User:15 has Allow permission for operations: Read from hosts: *, 
> User:16 has Allow permission for operations: Read from hosts: *, User:22 has 
> Allow permission for operations: Read from hosts: *, User:26 has Allow 
> permission for operations: Read from hosts: *, User:11 has Allow permission 
> for operations: Read from hosts: *, User:38 has Allow permission for 
> 

[jira] [Commented] (KAFKA-5483) Shutdown of scheduler should come after LogManager

2018-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442485#comment-16442485
 ] 

ASF GitHub Bot commented on KAFKA-5483:
---

mimaison closed pull request #3452: KAFKA-5483: Shutdown of scheduler should 
come after LogManager
URL: https://github.com/apache/kafka/pull/3452
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0a87750d949..a67276959dd 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -590,8 +590,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 if (requestHandlerPool != null)
   CoreUtils.swallow(requestHandlerPool.shutdown())
 
-CoreUtils.swallow(kafkaScheduler.shutdown())
-
 if (apis != null)
   CoreUtils.swallow(apis.close())
 CoreUtils.swallow(authorizer.foreach(_.close()))
@@ -608,6 +606,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 if (logManager != null)
   CoreUtils.swallow(logManager.shutdown())
 
+CoreUtils.swallow(kafkaScheduler.shutdown())
+
 if (kafkaController != null)
   CoreUtils.swallow(kafkaController.shutdown())
 if (zkUtils != null)


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Shutdown of scheduler should come after LogManager
> --
>
> Key: KAFKA-5483
> URL: https://issues.apache.org/jira/browse/KAFKA-5483
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Mickael Maison
>Priority: Major
>
> It seems like we shutdown the scheduler used by LogManager before shutting 
> down LogManager itself. This can lead to an IllegalStateException
> {code}
> "[2017-06-06 18:10:19,025] ERROR [ReplicaFetcherThread-14-111], Error due to  
> (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: error processing data for partition 
> [akiraPricedProduct.global,10] offset 191893
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:170)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:141)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:141)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:138)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:138)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:136)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> Caused by: java.lang.IllegalStateException: Kafka scheduler is not running.
> at kafka.utils.KafkaScheduler.ensureRunning(KafkaScheduler.scala:132)
> at kafka.utils.KafkaScheduler.schedule(KafkaScheduler.scala:106)
> at kafka.log.Log.roll(Log.scala:794)
> at kafka.log.Log.maybeRoll(Log.scala:744)
> at kafka.log.Log.append(Log.scala:405)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processF

[jira] [Commented] (KAFKA-6800) Update documentation for SASL/PLAIN and SCRAM to use callbacks

2018-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442469#comment-16442469
 ] 

ASF GitHub Bot commented on KAFKA-6800:
---

rajinisivaram opened a new pull request #4890: KAFKA-6800: Update SASL/PLAIN 
and SCRAM docs to use KIP-86 callbacks
URL: https://github.com/apache/kafka/pull/4890
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update documentation for SASL/PLAIN and SCRAM to use callbacks
> --
>
> Key: KAFKA-6800
> URL: https://issues.apache.org/jira/browse/KAFKA-6800
> Project: Kafka
>  Issue Type: Task
>  Components: documentation, security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.2.0
>
>
> Refer to custom callbacks introduced in KIP-86 in SASL documentation instead 
> of replacing login module. Also include 
> `org.apache.kafka.common.security.plain` and 
> `org.apache.kafka.common.security.scram` in javadocs since these are now part 
> of the public API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently

2018-04-18 Thread Sonia Garudi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442400#comment-16442400
 ] 

Sonia Garudi commented on KAFKA-6335:
-

Hi, I encountered the same error. Is there any update on this?

> SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails 
> intermittently
> --
>
> Key: KAFKA-6335
> URL: https://issues.apache.org/jira/browse/KAFKA-6335
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Manikumar
>Priority: Major
> Fix For: 1.2.0
>
>
> From 
> https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/
>  :
> {code}
> java.lang.AssertionError: expected acls Set(User:36 has Allow permission for 
> operations: Read from hosts: *, User:7 has Allow permission for operations: 
> Read from hosts: *, User:21 has Allow permission for operations: Read from 
> hosts: *, User:39 has Allow permission for operations: Read from hosts: *, 
> User:43 has Allow permission for operations: Read from hosts: *, User:3 has 
> Allow permission for operations: Read from hosts: *, User:35 has Allow 
> permission for operations: Read from hosts: *, User:15 has Allow permission 
> for operations: Read from hosts: *, User:16 has Allow permission for 
> operations: Read from hosts: *, User:22 has Allow permission for operations: 
> Read from hosts: *, User:26 has Allow permission for operations: Read from 
> hosts: *, User:11 has Allow permission for operations: Read from hosts: *, 
> User:38 has Allow permission for operations: Read from hosts: *, User:8 has 
> Allow permission for operations: Read from hosts: *, User:28 has Allow 
> permission for operations: Read from hosts: *, User:32 has Allow permission 
> for operations: Read from hosts: *, User:25 has Allow permission for 
> operations: Read from hosts: *, User:41 has Allow permission for operations: 
> Read from hosts: *, User:44 has Allow permission for operations: Read from 
> hosts: *, User:48 has Allow permission for operations: Read from hosts: *, 
> User:2 has Allow permission for operations: Read from hosts: *, User:9 has 
> Allow permission for operations: Read from hosts: *, User:14 has Allow 
> permission for operations: Read from hosts: *, User:46 has Allow permission 
> for operations: Read from hosts: *, User:13 has Allow permission for 
> operations: Read from hosts: *, User:5 has Allow permission for operations: 
> Read from hosts: *, User:29 has Allow permission for operations: Read from 
> hosts: *, User:45 has Allow permission for operations: Read from hosts: *, 
> User:6 has Allow permission for operations: Read from hosts: *, User:37 has 
> Allow permission for operations: Read from hosts: *, User:23 has Allow 
> permission for operations: Read from hosts: *, User:19 has Allow permission 
> for operations: Read from hosts: *, User:24 has Allow permission for 
> operations: Read from hosts: *, User:17 has Allow permission for operations: 
> Read from hosts: *, User:34 has Allow permission for operations: Read from 
> hosts: *, User:12 has Allow permission for operations: Read from hosts: *, 
> User:42 has Allow permission for operations: Read from hosts: *, User:4 has 
> Allow permission for operations: Read from hosts: *, User:47 has Allow 
> permission for operations: Read from hosts: *, User:18 has Allow permission 
> for operations: Read from hosts: *, User:31 has Allow permission for 
> operations: Read from hosts: *, User:49 has Allow permission for operations: 
> Read from hosts: *, User:33 has Allow permission for operations: Read from 
> hosts: *, User:1 has Allow permission for operations: Read from hosts: *, 
> User:27 has Allow permission for operations: Read from hosts: *) but got 
> Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 
> has Allow permission for operations: Read from hosts: *, User:21 has Allow 
> permission for operations: Read from hosts: *, User:39 has Allow permission 
> for operations: Read from hosts: *, User:43 has Allow permission for 
> operations: Read from hosts: *, User:3 has Allow permission for operations: 
> Read from hosts: *, User:35 has Allow permission for operations: Read from 
> hosts: *, User:15 has Allow permission for operations: Read from hosts: *, 
> User:16 has Allow permission for operations: Read from hosts: *, User:22 has 
> Allow permission for operations: Read from hosts: *, User:26 has Allow 
> permission for operations: Read from hosts: *, User:11 has Allow permission 
> for operations: Read from hosts: *, User:38 has Allow permission for 
> operations: Read from hosts: *, User:8 has Allow permission for operations:

[jira] [Commented] (KAFKA-5813) Unexpected unclean leader election due to leader/controller's unusual event handling order

2018-04-18 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442343#comment-16442343
 ] 

Manikumar commented on KAFKA-5813:
--

This might have fixed in async zk controller changes.

> Unexpected unclean leader election due to leader/controller's unusual event 
> handling order 
> ---
>
> Key: KAFKA-5813
> URL: https://issues.apache.org/jira/browse/KAFKA-5813
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.2.1
>Reporter: Allen Wang
>Priority: Minor
>
> We experienced an unexpected unclean leader election after network glitch 
> happened on the leader of partition. We have replication factor 2.
> Here is the sequence of event gathered from various logs:
> 1. ZK session timeout happens for leader of partition 
> 2. New ZK session is established for leader 
> 3. Leader removes the follower from ISR (which might be caused by replication 
> delay due to the network problem) and updates the ISR in ZK 
> 4. Controller processes the BrokerChangeListener event happened at step 1 
> where the leader seems to be offline 
> 5. Because the ISR in ZK is already updated by leader to remove the follower, 
> controller makes an unclean leader election 
> 6. Controller processes the second BrokerChangeListener event happened at 
> step 2 to mark the broker online again
> It seems to me that step 4 happens too late. If it happens right after step 
> 1, it will be a clean leader election and hopefully the producer will 
> immediately switch to the new leader, thus avoiding consumer offset reset. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6800) Update documentation for SASL/PLAIN and SCRAM to use callbacks

2018-04-18 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-6800:
-

 Summary: Update documentation for SASL/PLAIN and SCRAM to use 
callbacks
 Key: KAFKA-6800
 URL: https://issues.apache.org/jira/browse/KAFKA-6800
 Project: Kafka
  Issue Type: Task
  Components: documentation, security
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 1.2.0


Refer to custom callbacks introduced in KIP-86 in SASL documentation instead of 
replacing login module. Also include `org.apache.kafka.common.security.plain` 
and `org.apache.kafka.common.security.scram` in javadocs since these are now 
part of the public API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2018-04-18 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-1716.
--
Resolution: Auto Closed

Closing inactive issue. The Scala consumers have been deprecated and no further 
work is planned, please upgrade to the Java consumer whenever possible.

> hang during shutdown of ZookeeperConsumerConnector
> --
>
> Key: KAFKA-1716
> URL: https://issues.apache.org/jira/browse/KAFKA-1716
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1
>Reporter: Sean Fay
>Assignee: Neha Narkhede
>Priority: Major
> Attachments: after-shutdown.log, before-shutdown.log, 
> kafka-shutdown-stuck.log
>
>
> It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
> wedge in the case that some consumer fetcher threads receive messages during 
> the shutdown process.
> Shutdown thread:
> {code}-- Parking to wait for: 
> java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
> at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at 
> scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
> at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
> at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
> at 
> scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
> at 
> kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at 
> kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
> at 
> kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> ConsumerFetcherThread:
> {code}-- Parking to wait for: 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> at 
> java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> at 
> kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
> at 
> scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> at kafka/utils/Utils$.inLock(Utils.scala:538)
> at 

[jira] [Resolved] (KAFKA-5287) Messages getting repeated in kafka

2018-04-18 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-5287.
--
Resolution: Cannot Reproduce

Closing inactive issue. Please reopen with more details., if you think the 
issue still exists 

> Messages getting repeated in kafka
> --
>
> Key: KAFKA-5287
> URL: https://issues.apache.org/jira/browse/KAFKA-5287
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
> Environment: Hardware specification(8 Cores , 16 GB RAM,1 TB Harddisk)
>Reporter: Abhimanyu Nagrath
>Priority: Major
>
> I have a topic with 200 partition in which messages contains the total of 3 
> Million messages. It took 5 days to completely process all the messages and 
> as soon as message got processed i.e. Kafka-consumer-groups.sh showed 0 lag 
> in all the partition of the topic I stopped the consumer .but after 6 hrs 
> again it was showing the lag of 2 million message which I found that were 
> duplicate messages. This thing is happening very frequently. My offsets are 
> stored on Kafka broker itself. 
> My server configuration is:
> broker.id=1
> delete.topic.enable=true
> #listeners=PLAINTEXT://:9092
> #advertised.listeners=PLAINTEXT://your.host.name:9092
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/kafka/data/logs
> num.partitions=1
> num.recovery.threads.per.data.dir=5
> log.flush.interval.messages=1
> #log.flush.interval.ms=1000
> log.retention.hours=480
> log.retention.bytes=1073741824
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=30
> zookeeper.connect=:2181
> zookeeper.connection.timeout.ms=6000
> Is there in the configuration that I am missing? 
> Any help is appreciated 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6772) Broker should load credentials from ZK before requests are allowed

2018-04-18 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-6772.
---
Resolution: Fixed
  Reviewer: Jun Rao

> Broker should load credentials from ZK before requests are allowed
> --
>
> Key: KAFKA-6772
> URL: https://issues.apache.org/jira/browse/KAFKA-6772
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 1.0.0, 1.1.0, 1.0.1
>Reporter: Ismael Juma
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.0.2, 1.2.0, 1.1.1
>
>
> It is currently possible for clients to get an AuthenticationException during 
> start-up if the brokers have not yet loaded credentials from ZK. This 
> definitely affects SCRAM, but it may also affect delegation tokens.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5262) Can't find some consumer group information

2018-04-18 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-5262.
--
Resolution: Cannot Reproduce

Closing inactive issue. Please reopen with more details., if you think the 
issue still exists 


> Can't  find  some  consumer group   information
> ---
>
> Key: KAFKA-5262
> URL: https://issues.apache.org/jira/browse/KAFKA-5262
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 0.10.1.0
>Reporter: miaozhiyong
>Priority: Major
>
> The  kafka client use  broker to connect with kafka ,  i had install  two 
> kafka-manager.  the consumer don't display in the kafka-manager .and   can''t 
>  work with   the commmand line:
> kafka-consumer-groups.sh --new-consumer  --bootstrap-serveer
> but the client is ok . where is consumer store the lag?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3476) -Xloggc is not recognised by IBM java

2018-04-18 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-3476.
--
Resolution: Won't Fix

Closing this as we can export the GC values and performance opts.  Please 
reopen if you think otherwise

>  -Xloggc is not recognised by IBM java
> --
>
> Key: KAFKA-3476
> URL: https://issues.apache.org/jira/browse/KAFKA-3476
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 0.9.0.0
>Reporter: Khirod Patra
>Priority: Major
>
> Getting below error on AIX server.
> NOTE : java version is :
> --
> java version "1.8.0"
> Java(TM) SE Runtime Environment (build pap6480-20150129_02)
> IBM J9 VM (build 2.8, JRE 1.8.0 AIX ppc64-64 Compressed References 
> 20150116_231420 (JIT enabled, AOT enabled)
> J9VM - R28_Java8_GA_20150116_2030_B231420
> JIT  - tr.r14.java_20150109_82886.02
> GC   - R28_Java8_GA_20150116_2030_B231420_CMPRSS
> J9CL - 20150116_231420)
> JCL - 20150123_01 based on Oracle jdk8u31-b12
> Error :
> ---
> kafka-run-class.sh -name zookeeper -loggc  
> org.apache.zookeeper.server.quorum.QuorumPeerMain 
> ../config/zookeeper.properties
> 
> http://www.ibm.com/j9/verbosegc"; 
> version="R28_Java8_GA_20150116_2030_B231420_CMPRSS">
> JVMJ9VM007E Command-line option unrecognised: 
> -Xloggc:/home/test_user/containers/kafka_2.11-0.9.0.0/bin/../logs/zookeeper-gc.log
> 
> Error: Could not create the Java Virtual Machine.
> Error: A fatal exception has occurred. Program will exit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6772) Broker should load credentials from ZK before requests are allowed

2018-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442175#comment-16442175
 ] 

ASF GitHub Bot commented on KAFKA-6772:
---

rajinisivaram closed pull request #4867: KAFKA-6772: Load credentials from ZK 
before accepting connections
URL: https://github.com/apache/kafka/pull/4867
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index e4fdb089cd1..c0bc5939c08 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -76,12 +76,24 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
   private var stoppedProcessingRequests = false
 
   /**
-   * Start the socket server
+   * Start the socket server. Acceptors for all the listeners are started. 
Processors
+   * are started if `startupProcessors` is true. If not, processors are only 
started when
+   * [[kafka.network.SocketServer#startProcessors()]] is invoked. Delayed 
starting of processors
+   * is used to delay processing client connections until server is fully 
initialized, e.g.
+   * to ensure that all credentials have been loaded before authentications 
are performed.
+   * Acceptors are always started during `startup` so that the bound port is 
known when this
+   * method completes even when ephemeral ports are used. Incoming connections 
on this server
+   * are processed when processors start up and invoke 
[[org.apache.kafka.common.network.Selector#poll]].
+   *
+   * @param startupProcessors Flag indicating whether `Processor`s must be 
started.
*/
-  def startup() {
+  def startup(startupProcessors: Boolean = true) {
 this.synchronized {
   connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, 
maxConnectionsPerIpOverrides)
   createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
+  if (startupProcessors) {
+startProcessors()
+  }
 }
 
 newGauge("NetworkProcessorAvgIdlePercent",
@@ -110,6 +122,16 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
 info("Started " + acceptors.size + " acceptor threads")
   }
 
+  /**
+   * Starts processors of all the acceptors of this server if they have not 
already been started.
+   * This method is used for delayed starting of processors if 
[[kafka.network.SocketServer#startup]]
+   * was invoked with `startupProcessors=false`.
+   */
+  def startProcessors(): Unit = synchronized {
+acceptors.values.asScala.foreach { _.startProcessors() }
+info(s"Started processors for ${acceptors.size} acceptors")
+  }
+
   private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
 
   private def createAcceptorAndProcessors(processorsPerListener: Int,
@@ -196,6 +218,7 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
   def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
 info(s"Adding listeners for endpoints $listenersAdded")
 createAcceptorAndProcessors(config.numNetworkThreads, listenersAdded)
+startProcessors()
   }
 
   def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
@@ -307,13 +330,25 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
   private val nioSelector = NSelector.open()
   val serverChannel = openServerSocket(endPoint.host, endPoint.port)
   private val processors = new ArrayBuffer[Processor]()
+  private val processorsStarted = new AtomicBoolean
 
   private[network] def addProcessors(newProcessors: Buffer[Processor]): Unit = 
synchronized {
-newProcessors.foreach { processor =>
+processors ++= newProcessors
+if (processorsStarted.get)
+  startProcessors(newProcessors)
+  }
+
+  private[network] def startProcessors(): Unit = synchronized {
+if (!processorsStarted.getAndSet(true)) {
+  startProcessors(processors)
+}
+  }
+
+  private def startProcessors(processors: Seq[Processor]): Unit = synchronized 
{
+processors.foreach { processor =>
   
KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
 processor).start()
 }
-processors ++= newProcessors
   }
 
   private[network] def removeProcessors(removeCount: Int, requestChannel: 
RequestChannel): Unit = synchronized {
@@ -328,7 +363,9 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 
   override def shutdown(): Unit = {
 super.shutdown()
-processors.foreach(_.shutdown())
+synchronized {
+  processors.foreach(_.sh

[jira] [Commented] (KAFKA-5413) Log cleaner fails due to large offset in segment file

2018-04-18 Thread David Lopez (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442121#comment-16442121
 ] 

David Lopez commented on KAFKA-5413:


Hi, I have the same problem and I'm running 0.11.0.2.

That is what appears in log-cleaner.log:

 

[2018-04-18 10:00:32,918] INFO Cleaner 3: Cleaning log __consumer_offsets-13 
(cleaning prior to Tue Mar 06 12:04:48 CET 2018, discarding tombstones prior to 
Tue Dec 19 19:33:28 CET 2017)... (kafka.log.LogCleaner)
[2018-04-18 10:00:32,919] INFO Cleaner 3: Cleaning segment 0 in log 
__consumer_offsets-13 (largest timestamp Tue Dec 12 09:06:39 CET 2017) into 0, 
retaining deletes. (kafka.log.LogCleaner)
[2018-04-18 10:00:32,954] ERROR [kafka-log-cleaner-thread-3]: Error due to 
(kafka.log.LogCleaner)
java.lang.IllegalArgumentException: requirement failed: largest offset in 
message set can not be safely converted to relative offset.
    at scala.Predef$.require(Predef.scala:224)
    at kafka.log.LogSegment.append(LogSegment.scala:121)
    at kafka.log.Cleaner.cleanInto(LogCleaner.scala:547)
    at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:443)
    at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:385)
    at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:384)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at kafka.log.Cleaner.doClean(LogCleaner.scala:384)
    at kafka.log.Cleaner.clean(LogCleaner.scala:361)
    at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:256)
    at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:236)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
[2018-04-18 10:00:32,954] INFO [kafka-log-cleaner-thread-3]: Stopped 
(kafka.log.LogCleaner)

Any suggestion?

 

Thanks!!

> Log cleaner fails due to large offset in segment file
> -
>
> Key: KAFKA-5413
> URL: https://issues.apache.org/jira/browse/KAFKA-5413
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.10.2.1
> Environment: Ubuntu 14.04 LTS, Oracle Java 8u92, kafka_2.11-0.10.2.0
>Reporter: Nicholas Ngorok
>Assignee: Kelvin Rutt
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.2, 0.11.0.0
>
> Attachments: .index.cleaned, 
> .log, .log.cleaned, 
> .timeindex.cleaned, 002147422683.log, 
> kafka-5413.patch
>
>
> The log cleaner thread in our brokers is failing with the trace below
> {noformat}
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 0 in log __consumer_offsets-12 (largest timestamp Thu Jun 08 
> 15:48:59 PDT 2017) into 0, retaining deletes. (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 2147343575 in log __consumer_offsets-12 (largest timestamp 
> Thu Jun 08 15:49:06 PDT 2017) into 0, retaining deletes. 
> (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,834] ERROR {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: largest offset in 
> message set can not be safely converted to relative offset.
> at scala.Predef$.require(Predef.scala:224)
> at kafka.log.LogSegment.append(LogSegment.scala:109)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:478)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.clean(LogCleaner.scala:362)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-06-08 15:49:54,835] INFO {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Stopped  (kafka.log.LogCleaner)
> {noformat}
> This seems to point at the specific line [here| 
> https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/log/LogSegment.scala#L92]
>  in the kafka src where the difference is actually larger than MAXINT as both 
> baseOffset and offset are of type long. It was introdu

[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442058#comment-16442058
 ] 

ASF GitHub Bot commented on KAFKA-6054:
---

mjsax closed pull request #4880:  KAFKA-6054: Update Kafka Streams metadata to 
version 3
URL: https://github.com/apache/kafka/pull/4880
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index f8daf2fdddc..5b0e6496c2e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1087,6 +1087,18 @@ project(':streams:upgrade-system-tests-10') {
   }
 }
 
+project(':streams:upgrade-system-tests-11') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-11"
+
+  dependencies {
+testCompile libs.kafkaStreams_11
+  }
+
+  systemTestLibs {
+dependsOn testJar
+  }
+}
+
 project(':jmh-benchmarks') {
 
   apply plugin: 'com.github.johnrengelman.shadow'
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index effe763ac45..a6ef5dddeec 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -67,6 +67,7 @@ versions += [
   kafka_0102: "0.10.2.1",
   kafka_0110: "0.11.0.2",
   kafka_10: "1.0.1",
+  kafka_11: "1.1.0",
   lz4: "1.4.1",
   metrics: "2.2.0",
   // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
@@ -115,6 +116,7 @@ libs += [
   kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
   kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110",
   kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10",
+  kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11",
   log4j: "log4j:log4j:$versions.log4j",
   lz4: "org.lz4:lz4-java:$versions.lz4",
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
diff --git a/settings.gradle b/settings.gradle
index 03136849fd5..2a7977cfc93 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -15,5 +15,6 @@
 
 include 'core', 'examples', 'clients', 'tools', 'streams', 
'streams:test-utils', 'streams:examples',
 'streams:upgrade-system-tests-0100', 
'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
-'streams:upgrade-system-tests-0110', 
'streams:upgrade-system-tests-10', 'log4j-appender',
-'connect:api', 'connect:transforms', 'connect:runtime', 
'connect:json', 'connect:file', 'jmh-benchmarks'
+'streams:upgrade-system-tests-0110', 
'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11',
+'log4j-appender', 'connect:api', 'connect:transforms', 
'connect:runtime', 'connect:json', 'connect:file',
+'jmh-benchmarks'
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 819bebd43b6..65b1da6dede 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -172,6 +172,31 @@
  */
 public static final String UPGRADE_FROM_0100 = "0.10.0";
 
+/**
+ * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} 
for upgrading an application from version {@code 0.10.1.x}.
+ */
+public static final String UPGRADE_FROM_0101 = "0.10.1";
+
+/**
+ * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} 
for upgrading an application from version {@code 0.10.2.x}.
+ */
+public static final String UPGRADE_FROM_0102 = "0.10.2";
+
+/**
+ * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} 
for upgrading an application from version {@code 0.11.0.x}.
+ */
+public static final String UPGRADE_FROM_0110 = "0.11.0";
+
+/**
+ * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} 
for upgrading an application from version {@code 1.0.x}.
+ */
+public static final String UPGRADE_FROM_10 = "1.0";
+
+/**
+ * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} 
for upgrading an application from version {@code 1.1.x}.
+ */
+public static final String UPGRADE_FROM_11 = "1.1";
+
 /**
  * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG 
"processing.guarantee"} for at-least-once processing guarantees.
  */
@@ -347,8 +372,9 @@
 
 /** {@code upgrade.from} */
 public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
-public static final String UPGRADE_FROM_DOC = "Allows upgrading from 
version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
-"Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" 
(for upgrading from 0.10.0.x).";
+public static fina

[jira] [Resolved] (KAFKA-6107) SCRAM user add appears to fail if Kafka has never been started

2018-04-18 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-6107.
--
   Resolution: Fixed
Fix Version/s: 1.1.0

This was fixed in KafkaZkClient changes work.

> SCRAM user add appears to fail if Kafka has never been started
> --
>
> Key: KAFKA-6107
> URL: https://issues.apache.org/jira/browse/KAFKA-6107
> Project: Kafka
>  Issue Type: Bug
>  Components: tools, zkclient
>Affects Versions: 0.11.0.0
>Reporter: Dustin Cote
>Priority: Minor
> Fix For: 1.1.0
>
>
> When trying to add a SCRAM user in ZooKeeper without having ever starting 
> Kafka, the kafka-configs tool does not handle it well. This is a common use 
> case because starting a new cluster where you want SCRAM for inter broker 
> communication would generally result in seeing this problem. Today, the 
> workaround is to start Kafka, add the user, then restart Kafka. Here's how to 
> reproduce:
> 1) Start ZooKeeper
> 2) Run 
> {code}
> bin/kafka-configs --zookeeper localhost:2181 --alter --add-config 
> 'SCRAM-SHA-256=[iterations=8192,password=broker_pwd],SCRAM-SHA-512=[password=broker_pwd]'
>  --entity-type users --entity-name broker
> {code}
> This will result in:
> {code}
> bin/kafka-configs --zookeeper localhost:2181 --alter --add-config 
> 'SCRAM-SHA-256=[iterations=8192,password=broker_pwd],SCRAM-SHA-512=[password=broker_pwd]'
>  --entity-type users --entity-name broker
> Error while executing config command 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /config/changes/config_change_
> org.I0Itec.zkclient.exception.ZkNoNodeException: 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /config/changes/config_change_
>   at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
>   at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1001)
>   at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:528)
>   at 
> org.I0Itec.zkclient.ZkClient.createPersistentSequential(ZkClient.java:444)
>   at kafka.utils.ZkPath.createPersistentSequential(ZkUtils.scala:1045)
>   at kafka.utils.ZkUtils.createSequentialPersistentPath(ZkUtils.scala:527)
>   at 
> kafka.admin.AdminUtils$.kafka$admin$AdminUtils$$changeEntityConfig(AdminUtils.scala:600)
>   at 
> kafka.admin.AdminUtils$.changeUserOrUserClientIdConfig(AdminUtils.scala:551)
>   at kafka.admin.AdminUtilities$class.changeConfigs(AdminUtils.scala:63)
>   at kafka.admin.AdminUtils$.changeConfigs(AdminUtils.scala:72)
>   at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:101)
>   at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:68)
>   at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
> Caused by: org.apache.zookeeper.KeeperException$NoNodeException: 
> KeeperErrorCode = NoNode for /config/changes/config_change_
>   at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
>   at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>   at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783)
>   at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:100)
>   at org.I0Itec.zkclient.ZkClient$3.call(ZkClient.java:531)
>   at org.I0Itec.zkclient.ZkClient$3.call(ZkClient.java:528)
>   at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:991)
>   ... 11 more
> {code}
> The command doesn't appear to fail but it does throw an exception. The return 
> code of the script is still 0 and the user is created in ZooKeeper but this 
> should be cleaned up since it's misleading.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration

2018-04-18 Thread Michael Noll (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442006#comment-16442006
 ] 

Michael Noll commented on KAFKA-6555:
-

Any updates here [~asurana]?

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable resulting in the partition downtime. We 
> can make the state store partition queryable for the data already present in 
> the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)