[jira] [Resolved] (KAFKA-7369) Retry when possible in AdminClient.listConsumerGroups

2018-08-31 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7369.

   Resolution: Fixed
Fix Version/s: 2.1.0
   2.0.1

> Retry when possible in AdminClient.listConsumerGroups
> -
>
> Key: KAFKA-7369
> URL: https://issues.apache.org/jira/browse/KAFKA-7369
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.0.1, 2.1.0
>
>
> Currently we do not retry ListGroups requests when they fail due to retriable 
> errors. For example, this is causing some instability in 
> `kafka.admin.ListConsumerGroupTest.testListConsumerGroups`.
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: Error 
> listing groups on localhost:43001 (id: 0 rack: null)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
>   at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:132)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7369) Retry when possible in AdminClient.listConsumerGroups

2018-08-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599455#comment-16599455
 ] 

ASF GitHub Bot commented on KAFKA-7369:
---

hachikuji closed pull request #5595: KAFKA-7369; Handle retriable errors in 
AdminClient list groups API
URL: https://github.com/apache/kafka/pull/5595
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 8ddb0c08627..904cd0601e5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2567,8 +2567,11 @@ private void 
maybeAddConsumerGroup(ListGroupsResponse.Group group) {
 void handleResponse(AbstractResponse abstractResponse) 
{
 final ListGroupsResponse response = 
(ListGroupsResponse) abstractResponse;
 synchronized (results) {
-if (response.error() != Errors.NONE) {
-
results.addError(response.error().exception(), node);
+Errors error = response.error();
+if (error == 
Errors.COORDINATOR_LOAD_IN_PROGRESS || error == 
Errors.COORDINATOR_NOT_AVAILABLE) {
+throw error.exception();
+} else if (error != Errors.NONE) {
+results.addError(error.exception(), node);
 } else {
 for (ListGroupsResponse.Group group : 
response.groups()) {
 maybeAddConsumerGroup(group);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index b108803590f..af6f7212e8c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -62,6 +62,7 @@
 /**
  * Possible error codes:
  *
+ * COORDINATOR_LOADING_IN_PROGRESS (14)
  * COORDINATOR_NOT_AVAILABLE (15)
  * AUTHORIZATION_FAILED (29)
  */
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0245cbd3695..c0dc542b159 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -36,7 +36,6 @@
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.AuthenticationException;
-import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
@@ -46,6 +45,7 @@
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicDeletionDisabledException;
+import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
@@ -863,9 +863,11 @@ public void testListConsumerGroups() throws Exception {
 Node node0 = new Node(0, "localhost", 8121);
 Node node1 = new Node(1, "localhost", 8122);
 Node node2 = new Node(2, "localhost", 8123);
+Node node3 = new Node(3, "localhost", 8124);
 nodes.put(0, node0);
 nodes.put(1, node1);
 nodes.put(2, node2);
+nodes.put(3, node3);
 
 final Cluster cluster = new Cluster(
 "mockClusterId",
@@ -902,13 +904,19 @@ public void testListConsumerGroups() throws Exception {
 )),
 node0);
 
+// handle retriable errors
 env.kafkaClient().prepareResponseFrom(
 new ListGroupsResponse(
 Errors.COORDINATOR_NOT_AVAILABLE,
 Collections.emptyList()
 ),
 node1);
-
+

[jira] [Commented] (KAFKA-7370) Enhance FileConfigProvider to read a directory

2018-08-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599394#comment-16599394
 ] 

ASF GitHub Bot commented on KAFKA-7370:
---

rayokota opened a new pull request #5596: KAFKA-7370: Enhance 
FileConfigProvider to read a dir
URL: https://github.com/apache/kafka/pull/5596
 
 
   This is a backward compatible enhancement to augment FileConfigProvider
   to read from a directory, where the file names are the keys and the
   corresponding file contents are the values.  The former functionality of
   reading a Properties file is retained.
   
   This will allow for easier integration with secret management systems
   where each secret is often written to an individual file, such as in
   some Docker and Kubernetes setups.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enhance FileConfigProvider to read a directory
> --
>
> Key: KAFKA-7370
> URL: https://issues.apache.org/jira/browse/KAFKA-7370
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 2.0.0
>Reporter: Robert Yokota
>Assignee: Robert Yokota
>Priority: Minor
>
> Currently FileConfigProvider can read a Properties file as a set of key-value 
> pairs.  This enhancement is to augment FileConfigProvider so that it can also 
> read a directory, where the file names are the keys and the corresponding 
> file contents are the values.
> This will allow for easier integration with secret management systems where 
> each secret is often an individual file, such as in Docker and Kubernetes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7370) Enhance FileConfigProvider to read a directory

2018-08-31 Thread Robert Yokota (JIRA)
Robert Yokota created KAFKA-7370:


 Summary: Enhance FileConfigProvider to read a directory
 Key: KAFKA-7370
 URL: https://issues.apache.org/jira/browse/KAFKA-7370
 Project: Kafka
  Issue Type: Improvement
  Components: config
Affects Versions: 2.0.0
Reporter: Robert Yokota
Assignee: Robert Yokota


Currently FileConfigProvider can read a Properties file as a set of key-value 
pairs.  This enhancement is to augment FileConfigProvider so that it can also 
read a directory, where the file names are the keys and the corresponding file 
contents are the values.

This will allow for easier integration with secret management systems where 
each secret is often an individual file, such as in Docker and Kubernetes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7369) Retry when possible in AdminClient.listConsumerGroups

2018-08-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599210#comment-16599210
 ] 

ASF GitHub Bot commented on KAFKA-7369:
---

hachikuji opened a new pull request #5595: KAFKA-7369; Handle retriable errors 
in AdminClient list groups API
URL: https://github.com/apache/kafka/pull/5595
 
 
   We should retry when possible if ListGroups fails due to a retriable error 
(e.g. coordinator loading).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Retry when possible in AdminClient.listConsumerGroups
> -
>
> Key: KAFKA-7369
> URL: https://issues.apache.org/jira/browse/KAFKA-7369
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Currently we do not retry ListGroups requests when they fail due to retriable 
> errors. For example, this is causing some instability in 
> `kafka.admin.ListConsumerGroupTest.testListConsumerGroups`.
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: Error 
> listing groups on localhost:43001 (id: 0 rack: null)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
>   at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:132)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7369) Retry when possible in AdminClient.listConsumerGroups

2018-08-31 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-7369:
--

 Summary: Retry when possible in AdminClient.listConsumerGroups
 Key: KAFKA-7369
 URL: https://issues.apache.org/jira/browse/KAFKA-7369
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Currently we do not retry ListGroups requests when they fail due to retriable 
errors. For example, this is causing some instability in 
`kafka.admin.ListConsumerGroupTest.testListConsumerGroups`.

{code}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.CoordinatorLoadInProgressException: Error 
listing groups on localhost:43001 (id: 0 rack: null)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:132)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7287) Set open ACL permissions for old consumer znode path

2018-08-31 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7287.

   Resolution: Fixed
Fix Version/s: 2.1.0
   2.0.1
   1.1.2

Also merged the PR to 1.1 branch.

> Set open ACL permissions for old consumer znode path
> 
>
> Key: KAFKA-7287
> URL: https://issues.apache.org/jira/browse/KAFKA-7287
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Old consumer znode path should have open ACL permissions in kerberized 
> environment. This got missed in kafkaZkClient changes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7287) Set open ACL permissions for old consumer znode path

2018-08-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599161#comment-16599161
 ] 

ASF GitHub Bot commented on KAFKA-7287:
---

junrao closed pull request #5585: KAFKA-7287: Set open ACL for old consumer 
znode path 
URL: https://github.com/apache/kafka/pull/5585
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/zk/ZkData.scala 
b/core/src/main/scala/kafka/zk/ZkData.scala
index 03f4a05f1e4..cff32a2415f 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -407,8 +407,13 @@ object PreferredReplicaElectionZNode {
   }.map(_.toSet).getOrElse(Set.empty)
 }
 
+//old consumer path znode
+object ConsumerPathZNode {
+  def path = "/consumers"
+}
+
 object ConsumerOffset {
-  def path(group: String, topic: String, partition: Integer) = 
s"/consumers/${group}/offsets/${topic}/${partition}"
+  def path(group: String, topic: String, partition: Integer) = 
s"${ConsumerPathZNode.path}/${group}/offsets/${topic}/${partition}"
   def encode(offset: Long): Array[Byte] = offset.toString.getBytes(UTF_8)
   def decode(bytes: Array[Byte]): Option[Long] = Option(bytes).map(new 
String(_, UTF_8).toLong)
 }
@@ -536,7 +541,7 @@ object ZkData {
 
   // These are persistent ZK paths that should exist on kafka broker startup.
   val PersistentZkPaths = Seq(
-"/consumers", // old consumer path
+ConsumerPathZNode.path, // old consumer path
 BrokerIdsZNode.path,
 TopicsZNode.path,
 ConfigEntityChangeNotificationZNode.path,
@@ -558,7 +563,8 @@ object ZkData {
   }
 
   def defaultAcls(isSecure: Boolean, path: String): Seq[ACL] = {
-if (isSecure) {
+//Old Consumer path is kept open as different consumers will write under 
this node.
+if (!ConsumerPathZNode.path.equals(path) && isSecure) {
   val acls = new ArrayBuffer[ACL]
   acls ++= ZooDefs.Ids.CREATOR_ALL_ACL.asScala
   if (!sensitivePath(path))
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 033ca67143b..a80c4074e84 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -19,10 +19,10 @@ package kafka.security.auth
 
 import kafka.admin.ZkSecurityMigrator
 import kafka.utils.{Logging, TestUtils, ZkUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{ConsumerPathZNode, ZooKeeperTestHarness}
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.data.{ACL}
+import org.apache.zookeeper.data.{ACL, Stat}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import scala.collection.JavaConverters._
@@ -299,4 +299,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness 
with Logging {
 }
 }
   }
+
+  @Test
+  def testConsumerOffsetPathAcls(): Unit = {
+zkClient.makeSurePersistentPathExists(ConsumerPathZNode.path)
+
+val consumerPathAcls = 
zkClient.currentZooKeeper.getACL(ConsumerPathZNode.path, new Stat())
+assertTrue("old consumer znode path acls are not open", 
consumerPathAcls.asScala.forall(TestUtils.isAclUnsecure))
+  }
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Set open ACL permissions for old consumer znode path
> 
>
> Key: KAFKA-7287
> URL: https://issues.apache.org/jira/browse/KAFKA-7287
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
>
> Old consumer znode path should have open ACL permissions in kerberized 
> environment. This got missed in kafkaZkClient changes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7368) Support joining Windowed KTables

2018-08-31 Thread John Roesler (JIRA)
John Roesler created KAFKA-7368:
---

 Summary: Support joining Windowed KTables
 Key: KAFKA-7368
 URL: https://issues.apache.org/jira/browse/KAFKA-7368
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, there is no good way to join two `KTable, V>`, aka 
windowed KTables.

They are KTable, so they have a `join` operator available, but it currently 
will use a regular KeyValue store instead of a Windowed store, so it will grow 
without bound and new windows enter.

One option is to convert both KTables into KStream, and join them (which is a 
windowed join), and then convert them back into KTables for further processing, 
but this is an awkward way to accomplish an apparently straightforward task.

It should instead be possible to directly support it, but the trick will be to 
make it impossible to accidentally use a window store for normal (aka 
non-windowed) KTables.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7367) Verify that Streams never creates RocksDB stores unless they are needed

2018-08-31 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-7367:

Labels: newbie  (was: )

> Verify that Streams never creates RocksDB stores unless they are needed
> ---
>
> Key: KAFKA-7367
> URL: https://issues.apache.org/jira/browse/KAFKA-7367
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: newbie
>
> We have gotten some reports of Streams creating RocksDB stores unnecessarily 
> for stateless processes.
> We can and should verify this doesn't happen by creating integration tests 
> for *every* stateless operator that verify that after processing, the state 
> directory is still empty.
> These tests could potentially be backported as far as we care to so that we 
> can identify and fix potential unnecessary stores in past versions as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7367) Verify that Streams never creates RocksDB stores unless they are needed

2018-08-31 Thread John Roesler (JIRA)
John Roesler created KAFKA-7367:
---

 Summary: Verify that Streams never creates RocksDB stores unless 
they are needed
 Key: KAFKA-7367
 URL: https://issues.apache.org/jira/browse/KAFKA-7367
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


We have gotten some reports of Streams creating RocksDB stores unnecessarily 
for stateless processes.

We can and should verify this doesn't happen by creating integration tests for 
*every* stateless operator that verify that after processing, the state 
directory is still empty.

These tests could potentially be backported as far as we care to so that we can 
identify and fix potential unnecessary stores in past versions as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3596) Kafka Streams: Window expiration needs to consider more than event time

2018-08-31 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599108#comment-16599108
 ] 

John Roesler commented on KAFKA-3596:
-

This should be fixed by a few changes coming in 2.1:
 * window segments are no longer bounded by number, but by size. This means 
that if you were to get events "from the future", they would no longer cause 
still-live windows to be dropped
 * we have revised the processing and stream-time model:
 ** We pull records one-at-a-time from the inputs in the order of their 
timestamp. This should eliminate the cases where we process an event with a 
much-advanced timestamp from queue A, while there are events with smaller 
timestamps still to process from queue B. It won't eliminate cases where queue 
A alone has out-of-order events.
 ** Stream time itself is now computed to be the non-decreasing maximum of 
observed timestamps.
 ** Together, these changes mean that "future events" are no longer possible, 
only "late events".

> Kafka Streams: Window expiration needs to consider more than event time
> ---
>
> Key: KAFKA-3596
> URL: https://issues.apache.org/jira/browse/KAFKA-3596
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Henry Cai
>Priority: Minor
>  Labels: architecture
> Fix For: 2.1.0
>
>
> Currently in Kafka Streams, the way the windows are expired in RocksDB is 
> triggered by new event insertion.  When a window is created at T0 with 10 
> minutes retention, when we saw a new record coming with event timestamp T0 + 
> 10 +1, we will expire that window (remove it) out of RocksDB.
> In the real world, it's very easy to see event coming with future timestamp 
> (or out-of-order events coming with big time gaps between events), this way 
> of retiring a window based on one event's event timestamp is dangerous.  I 
> think at least we need to consider both the event's event time and 
> server/stream time elapse.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3596) Kafka Streams: Window expiration needs to consider more than event time

2018-08-31 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-3596:

Fix Version/s: 2.1.0

> Kafka Streams: Window expiration needs to consider more than event time
> ---
>
> Key: KAFKA-3596
> URL: https://issues.apache.org/jira/browse/KAFKA-3596
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Henry Cai
>Priority: Minor
>  Labels: architecture
> Fix For: 2.1.0
>
>
> Currently in Kafka Streams, the way the windows are expired in RocksDB is 
> triggered by new event insertion.  When a window is created at T0 with 10 
> minutes retention, when we saw a new record coming with event timestamp T0 + 
> 10 +1, we will expire that window (remove it) out of RocksDB.
> In the real world, it's very easy to see event coming with future timestamp 
> (or out-of-order events coming with big time gaps between events), this way 
> of retiring a window based on one event's event timestamp is dangerous.  I 
> think at least we need to consider both the event's event time and 
> server/stream time elapse.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4988) JVM crash when running on Alpine Linux

2018-08-31 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-4988.
-
Resolution: Won't Fix

I think this issue is out of our hands.

If you think there is something for us to do, please feel free to reopen the 
ticket and comment.

Thanks,

-John

> JVM crash when running on Alpine Linux
> --
>
> Key: KAFKA-4988
> URL: https://issues.apache.org/jira/browse/KAFKA-4988
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Vincent Rischmann
>Priority: Minor
>
> I'm developing my Kafka Streams application using Docker and I run my jars 
> using the official openjdk:8-jre-alpine image.
> I'm just starting to use windowing and now the JVM crashes because of an 
> issue with RocksDB I think.
> It's trivial to fix on my part, just use the debian jessie based image. 
> However, it would be cool if alpine was supported too since its docker images 
> are quite a bit less heavy
> {quote}
> Exception in thread "StreamThread-1" java.lang.UnsatisfiedLinkError: 
> /tmp/librocksdbjni3285995384052305662.so: Error loading shared library 
> ld-linux-x86-64.so.2: No such file or directory (needed by 
> /tmp/librocksdbjni3285995384052305662.so)
>   at java.lang.ClassLoader$NativeLibrary.load(Native Method)
>   at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
>   at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
>   at java.lang.Runtime.load0(Runtime.java:809)
>   at java.lang.System.load(System.java:1086)
>   at 
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
>   at 
> org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
>   at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
>   at org.rocksdb.RocksDB.(RocksDB.java:35)
>   at org.rocksdb.Options.(Options.java:22)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:115)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:148)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7f60f34ce088, pid=1, tid=0x7f60f3705ab0
> #
> # JRE version: OpenJDK Runtime Environment (8.0_121-b13) (build 1.8.0_121-b13)
> # Java VM: OpenJDK 64-Bit Server VM (25.121-b13 mixed mode linux-amd64 
> compressed oops)
> # Derivati

[jira] [Commented] (KAFKA-6033) Kafka Streams does not work with musl-libc (alpine linux)

2018-08-31 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599098#comment-16599098
 ] 

John Roesler commented on KAFKA-6033:
-

Also, I've just found https://issues.apache.org/jira/browse/KAFKA-4988, which 
has some more concrete recommendations. It might be worth a look.

> Kafka Streams does not work with musl-libc (alpine linux)
> -
>
> Key: KAFKA-6033
> URL: https://issues.apache.org/jira/browse/KAFKA-6033
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: Alpine 3.6
>Reporter: Jeffrey Zampieron
>Priority: Major
>
> Using the released version of kafka fails on alpine based images b/c of 
> rocksdb using the jni and failing to load.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6033) Kafka Streams does not work with musl-libc (alpine linux)

2018-08-31 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-6033.
-
Resolution: Won't Fix

This is unfortunately out of our hands.

If you think I'm wrong about this, please reopen the ticket.

Thanks,

-John

> Kafka Streams does not work with musl-libc (alpine linux)
> -
>
> Key: KAFKA-6033
> URL: https://issues.apache.org/jira/browse/KAFKA-6033
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: Alpine 3.6
>Reporter: Jeffrey Zampieron
>Priority: Major
>
> Using the released version of kafka fails on alpine based images b/c of 
> rocksdb using the jni and failing to load.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4544) Add system tests for delegation token based authentication

2018-08-31 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599038#comment-16599038
 ] 

Manikumar commented on KAFKA-4544:
--

[~asasvari]  If you are interested, Please takeover the JIRA.  Currently I am 
working on KAFKA-6945. Thanks.

> Add system tests for delegation token based authentication
> --
>
> Key: KAFKA-4544
> URL: https://issues.apache.org/jira/browse/KAFKA-4544
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Ashish Singh
>Assignee: Manikumar
>Priority: Major
>
> Add system tests for delegation token based authentication.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7366) topic level segment.bytes and segment.ms not taking effect immediately

2018-08-31 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599003#comment-16599003
 ] 

Jun Rao commented on KAFKA-7366:


[~ijuma], it might be debatable. However, if all other topic level configs take 
effect immediately, it seems that we should make that consistent for all 
configs.

> topic level segment.bytes and segment.ms not taking effect immediately
> --
>
> Key: KAFKA-7366
> URL: https://issues.apache.org/jira/browse/KAFKA-7366
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 2.0.0
>Reporter: Jun Rao
>Priority: Major
>
> It used to be that topic level configs such as segment.bytes takes effect 
> immediately. Because of KAFKA-6324 in 1.1, those configs now only take effect 
> after the active segment has rolled. The relevant part of KAFKA-6324 is that 
> in Log.maybeRoll, the checking of the segment rolling is moved to 
> LogSegment.shouldRoll().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7366) topic level segment.bytes and segment.ms not taking effect immediately

2018-08-31 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598993#comment-16598993
 ] 

Ismael Juma commented on KAFKA-7366:


[~rsivaram] noticed this while working on dynamic configs as well. We discussed 
it then and I'm not sure what was the conclusion. I guess you're saying we 
should change it back [~junrao]?

> topic level segment.bytes and segment.ms not taking effect immediately
> --
>
> Key: KAFKA-7366
> URL: https://issues.apache.org/jira/browse/KAFKA-7366
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 2.0.0
>Reporter: Jun Rao
>Priority: Major
>
> It used to be that topic level configs such as segment.bytes takes effect 
> immediately. Because of KAFKA-6324 in 1.1, those configs now only take effect 
> after the active segment has rolled. The relevant part of KAFKA-6324 is that 
> in Log.maybeRoll, the checking of the segment rolling is moved to 
> LogSegment.shouldRoll().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7214) Mystic FATAL error

2018-08-31 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598992#comment-16598992
 ] 

John Roesler commented on KAFKA-7214:
-

Hi [~habdank],

I've commented on https://issues.apache.org/jira/browse/KAFKA-6777.

If the problem is long GC pauses, but the JVM never actually runs out of 
memory, there there would be no OOME to catch. Can you let us know what kind of 
GC pauses you're observing? As Guozhang said, if any pause is longer than any 
of the heartbeat intervals (or other keepalive configs like the poll interval), 
then it will cause problems.

The only solution to long GC pauses is to reshape the computation, which it 
seems like you have done.

 

Also, regarding:

> The keyword in all those errors is: KSTREAM-SOURCE-X

This is just the name of a source node in your streams topology. Such names 
will appear in all kinds of logs, and doesn't indicate that your problem is 
related to this ticket.

Maybe we can relocate this discussion to a new ticket?

 

If you do start a new ticket, can you indicate what behavior you observe? Such 
as, "my application crashes" or "my application rebalances frequently".

 

Thanks,

-John

> Mystic FATAL error
> --
>
> Key: KAFKA-7214
> URL: https://issues.apache.org/jira/browse/KAFKA-7214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.1.1
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> Dears,
> Very often at startup of the streaming application I got exception:
> {code}
> Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=my_instance_medium_topic, partition=1, offset=198900203; 
> [org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:212),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:347),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:420),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:339),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:648),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:459)]
>  in thread 
> my_application-my_instance-my_instance_medium-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62
> {code}
> and then (without shutdown request from my side):
> {code}
> 2018-07-30 07:45:02 [ar313] [INFO ] StreamThread:912 - stream-thread 
> [my_application-my_instance-my_instance-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62]
>  State transition from PENDING_SHUTDOWN to DEAD.
> {code}
> What is this?
> How to correctly handle it?
> Thanks in advance for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7366) topic level segment.bytes and segment.ms not taking effect immediately

2018-08-31 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7366:
---
Affects Version/s: 2.0.0

> topic level segment.bytes and segment.ms not taking effect immediately
> --
>
> Key: KAFKA-7366
> URL: https://issues.apache.org/jira/browse/KAFKA-7366
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 2.0.0
>Reporter: Jun Rao
>Priority: Major
>
> It used to be that topic level configs such as segment.bytes takes effect 
> immediately. Because of KAFKA-6324 in 1.1, those configs now only take effect 
> after the active segment has rolled. The relevant part of KAFKA-6324 is that 
> in Log.maybeRoll, the checking of the segment rolling is moved to 
> LogSegment.shouldRoll().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7364) kafka periodically run into high cpu usage with ssl writing

2018-08-31 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598988#comment-16598988
 ] 

Ismael Juma commented on KAFKA-7364:


If you have 20k producers all connecting at the same time, then it's not 
unexpected that it would be costly. OpenSSL may be better, if someone wants to 
test that, it would be useful. Once connections are established, then the 
handshake cost is no longer an issue, of course.

> kafka periodically run into high cpu usage with ssl writing
> ---
>
> Key: KAFKA-7364
> URL: https://issues.apache.org/jira/browse/KAFKA-7364
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Yu Yang
>Priority: Major
> Attachments: Screen Shot 2018-08-30 at 10.57.32 PM.png
>
>
> while testing ssl writing to kafka, we found that kafka often run into high 
> cpu usage due to inefficiency in jdk ssl implementation. 
> In detail, we use a test cluster of 12 d2.8xlarge instances that uses kafka 
> 2.0.0,  jdk-10.0.2,  and hosts only one topic that have ~20k producers write 
> to through ssl channel. We observed that  the network threads often get 100% 
> cpu usage after enabling ssl writing to kafka.   To improve kafka's 
> throughput, we have "num.network.threads=32" for the broker.  Even with 32 
> network threads, we see the broker cpu usage jump right after ssl writing is 
> enabled.  The broker's cpu usage would drop immediately when we disabled ssl 
> writing.  
>  !Screen Shot 2018-08-30 at 10.57.32 PM.png|height=360! 
> When the broker's cpu usage is high, 'perf top' shows that kafka is busy with 
> executing code in  libsunec.so.  The following is a sample stack track that 
> we get when the broker's cpu usage was high. This seems to be related to 
> inefficiency in jdk ssl related implementation.  Switching to use 
> https://github.com/netty/netty-tcnative to handle ssl handshake can be 
> helpful. 
> {code}
> Thread 77562: (state = IN_NATIVE)
>  - sun.security.ec.ECDSASignature.verifySignedDigest(byte[], byte[], byte[], 
> byte[]) @bci=0 (Compiled frame; information may be imprecise)
>  - sun.security.ec.ECDSASignature.engineVerify(byte[]) @bci=70, line=321 
> (Compiled frame)
>  - java.security.Signature$Delegate.engineVerify(byte[]) @bci=9, line=1222 
> (Compiled frame)
>  - java.security.Signature.verify(byte[]) @bci=10, line=655 (Compiled frame)
>  - sun.security.x509.X509CertImpl.verify(java.security.PublicKey, 
> java.lang.String) @bci=136, line=444 (Compiled frame)
>  - 
> sun.security.provider.certpath.BasicChecker.verifySignature(java.security.cert.X509Certificate)
>  @bci=48, line=166 (Compiled frame)
>  - 
> sun.security.provider.certpath.BasicChecker.check(java.security.cert.Certificate,
>  java.util.Collection) @bci=24, line=147 (Compiled frame)
>  - 
> sun.security.provider.certpath.PKIXMasterCertPathValidator.validate(java.security.cert.CertPath,
>  java.util.List, java.util.List) @bci=316, line=125 (Compiled frame)
>  - 
> sun.security.provider.certpath.PKIXCertPathValidator.validate(java.security.cert.TrustAnchor,
>  sun.security.provider.certpath.PKIX$ValidatorParams) @bci=390, line=233 
> (Compiled frame)
>  - 
> sun.security.provider.certpath.PKIXCertPathValidator.validate(sun.security.provider.certpath.PKIX$ValidatorParams)
>  @bci=217, line=141 (Compiled frame)
>  - 
> sun.security.provider.certpath.PKIXCertPathValidator.engineValidate(java.security.cert.CertPath,
>  java.security.cert.CertPathParameters) @bci=7, line=80 (Compiled frame)
>  - java.security.cert.CertPathValidator.validate(java.security.cert.CertPath, 
> java.security.cert.CertPathParameters) @bci=6, line=292 (Compiled frame)
>  - 
> sun.security.validator.PKIXValidator.doValidate(java.security.cert.X509Certificate[],
>  java.security.cert.PKIXBuilderParameters) @bci=34, line=357 (Compiled frame)
>  - 
> sun.security.validator.PKIXValidator.engineValidate(java.security.cert.X509Certificate[],
>  java.util.Collection, java.security.AlgorithmConstraints, java.lang.Object) 
> @bci=232, line=259 (Compiled frame)
>  - 
> sun.security.validator.Validator.validate(java.security.cert.X509Certificate[],
>  java.util.Collection, java.security.AlgorithmConstraints, java.lang.Object) 
> @bci=6, line=260 (Compiled frame)
>  - 
> sun.security.ssl.X509TrustManagerImpl.validate(sun.security.validator.Validator,
>  java.security.cert.X509Certificate[], java.security.AlgorithmConstraints, 
> java.lang.String) @bci=10, line=324 (Compiled frame)
>  - 
> sun.security.ssl.X509TrustManagerImpl.checkTrusted(java.security.cert.X509Certificate[],
>  java.lang.String, javax.net.ssl.SSLEngine, boolean) @bci=179, line=279 
> (Compiled frame)
>  - 
> sun.security.ssl.X509TrustManagerImpl.checkClientTrusted(java.security.cert.X509Ce

[jira] [Created] (KAFKA-7366) topic level segment.bytes and segment.ms not taking effect immediately

2018-08-31 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7366:
--

 Summary: topic level segment.bytes and segment.ms not taking 
effect immediately
 Key: KAFKA-7366
 URL: https://issues.apache.org/jira/browse/KAFKA-7366
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Jun Rao


It used to be that topic level configs such as segment.bytes takes effect 
immediately. Because of KAFKA-6324 in 1.1, those configs now only take effect 
after the active segment has rolled. The relevant part of KAFKA-6324 is that in 
Log.maybeRoll, the checking of the segment rolling is moved to 
LogSegment.shouldRoll().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7364) kafka periodically run into high cpu usage with ssl writing

2018-08-31 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598981#comment-16598981
 ] 

Ismael Juma commented on KAFKA-7364:


Is this specific to ECDSA or all signature types?

> kafka periodically run into high cpu usage with ssl writing
> ---
>
> Key: KAFKA-7364
> URL: https://issues.apache.org/jira/browse/KAFKA-7364
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Yu Yang
>Priority: Major
> Attachments: Screen Shot 2018-08-30 at 10.57.32 PM.png
>
>
> while testing ssl writing to kafka, we found that kafka often run into high 
> cpu usage due to inefficiency in jdk ssl implementation. 
> In detail, we use a test cluster of 12 d2.8xlarge instances that uses kafka 
> 2.0.0,  jdk-10.0.2,  and hosts only one topic that have ~20k producers write 
> to through ssl channel. We observed that  the network threads often get 100% 
> cpu usage after enabling ssl writing to kafka.   To improve kafka's 
> throughput, we have "num.network.threads=32" for the broker.  Even with 32 
> network threads, we see the broker cpu usage jump right after ssl writing is 
> enabled.  The broker's cpu usage would drop immediately when we disabled ssl 
> writing.  
>  !Screen Shot 2018-08-30 at 10.57.32 PM.png|height=360! 
> When the broker's cpu usage is high, 'perf top' shows that kafka is busy with 
> executing code in  libsunec.so.  The following is a sample stack track that 
> we get when the broker's cpu usage was high. This seems to be related to 
> inefficiency in jdk ssl related implementation.  Switching to use 
> https://github.com/netty/netty-tcnative to handle ssl handshake can be 
> helpful. 
> {code}
> Thread 77562: (state = IN_NATIVE)
>  - sun.security.ec.ECDSASignature.verifySignedDigest(byte[], byte[], byte[], 
> byte[]) @bci=0 (Compiled frame; information may be imprecise)
>  - sun.security.ec.ECDSASignature.engineVerify(byte[]) @bci=70, line=321 
> (Compiled frame)
>  - java.security.Signature$Delegate.engineVerify(byte[]) @bci=9, line=1222 
> (Compiled frame)
>  - java.security.Signature.verify(byte[]) @bci=10, line=655 (Compiled frame)
>  - sun.security.x509.X509CertImpl.verify(java.security.PublicKey, 
> java.lang.String) @bci=136, line=444 (Compiled frame)
>  - 
> sun.security.provider.certpath.BasicChecker.verifySignature(java.security.cert.X509Certificate)
>  @bci=48, line=166 (Compiled frame)
>  - 
> sun.security.provider.certpath.BasicChecker.check(java.security.cert.Certificate,
>  java.util.Collection) @bci=24, line=147 (Compiled frame)
>  - 
> sun.security.provider.certpath.PKIXMasterCertPathValidator.validate(java.security.cert.CertPath,
>  java.util.List, java.util.List) @bci=316, line=125 (Compiled frame)
>  - 
> sun.security.provider.certpath.PKIXCertPathValidator.validate(java.security.cert.TrustAnchor,
>  sun.security.provider.certpath.PKIX$ValidatorParams) @bci=390, line=233 
> (Compiled frame)
>  - 
> sun.security.provider.certpath.PKIXCertPathValidator.validate(sun.security.provider.certpath.PKIX$ValidatorParams)
>  @bci=217, line=141 (Compiled frame)
>  - 
> sun.security.provider.certpath.PKIXCertPathValidator.engineValidate(java.security.cert.CertPath,
>  java.security.cert.CertPathParameters) @bci=7, line=80 (Compiled frame)
>  - java.security.cert.CertPathValidator.validate(java.security.cert.CertPath, 
> java.security.cert.CertPathParameters) @bci=6, line=292 (Compiled frame)
>  - 
> sun.security.validator.PKIXValidator.doValidate(java.security.cert.X509Certificate[],
>  java.security.cert.PKIXBuilderParameters) @bci=34, line=357 (Compiled frame)
>  - 
> sun.security.validator.PKIXValidator.engineValidate(java.security.cert.X509Certificate[],
>  java.util.Collection, java.security.AlgorithmConstraints, java.lang.Object) 
> @bci=232, line=259 (Compiled frame)
>  - 
> sun.security.validator.Validator.validate(java.security.cert.X509Certificate[],
>  java.util.Collection, java.security.AlgorithmConstraints, java.lang.Object) 
> @bci=6, line=260 (Compiled frame)
>  - 
> sun.security.ssl.X509TrustManagerImpl.validate(sun.security.validator.Validator,
>  java.security.cert.X509Certificate[], java.security.AlgorithmConstraints, 
> java.lang.String) @bci=10, line=324 (Compiled frame)
>  - 
> sun.security.ssl.X509TrustManagerImpl.checkTrusted(java.security.cert.X509Certificate[],
>  java.lang.String, javax.net.ssl.SSLEngine, boolean) @bci=179, line=279 
> (Compiled frame)
>  - 
> sun.security.ssl.X509TrustManagerImpl.checkClientTrusted(java.security.cert.X509Certificate[],
>  java.lang.String, javax.net.ssl.SSLEngine) @bci=5, line=130 (Compiled frame)
>  - 
> sun.security.ssl.ServerHandshaker.clientCertificate(sun.security.ssl.HandshakeMessage$CertificateMsg)
>  @bci=190, line=1966 (C

[jira] [Commented] (KAFKA-6777) Wrong reaction on Out Of Memory situation

2018-08-31 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598974#comment-16598974
 ] 

John Roesler commented on KAFKA-6777:
-

Hi [~habdank],

It's unfortunately a common behavior with JVM applications that when they are 
memory-constrained they never actually crash, but instead disappear into 
gc-pause oblivion. For practical purposes, we don't have any visibility into 
when GC pauses occur, how long they are, or even what our resident memory 
footprint is. This is all by design of the JVM.

However, if we are catching and swallowing OOME, or really any subclass of 
Error, it would not be good. Error is by definition not recoverable and should 
be caught only to gracefully exit.

I've taken a quick perusal of the code, and most of the `catch (Throwable t)` 
instances I see are logged and/or propagated. Some usages (such as in 
KafkaAdminClient.AdminClientRunnable) are suspicious.

 

I'm unclear on whether you are saying that when Kafka runs out of memory, it 
 # shuts down, but hides the reason
 # or continues running

The latter seems unlikely, since if the JVM is truly out of memory, then 
catching and swallowing the OOME would only work for so long; it seems like 
eventually some operation would attempt to allocate memory outside of a catch 
block and still crash the app.

 

Can you elaborate on the reason you think that the culprit is a swallowed OOME 
instead of just normal GC hell?

Is there a specific code path that you think is responsible for catching and 
swallowing OOMEs?

Thanks,

-John

> Wrong reaction on Out Of Memory situation
> -
>
> Key: KAFKA-6777
> URL: https://issues.apache.org/jira/browse/KAFKA-6777
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
> Attachments: screenshot-1.png
>
>
> Dears,
> We already encountered many times problems related to Out Of Memory situation 
> in Kafka Broker and streaming clients.
> The scenario is the following.
> When Kafka Broker (or Streaming Client) is under load and has too less 
> memory, there are no errors in server logs. One can see some cryptic entries 
> in GC logs, but they are definitely not self-explaining.
> Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
> monitoring, that JVM uses more and more time in GC. In our case it grows from 
> e.g. 1% to 80%-90% of CPU time is used by GC.
> Next, software collapses into zombie mode – process in not ending. In such a 
> case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
> Kafka treats such a zombie process normal and somewhat sends messages, which 
> are in fact getting lost, also the cluster is not excluding broken nodes. The 
> question is how to configure Kafka to really terminate the JVM and not remain 
> in zombie mode, to give a chance to other nodes to know, that something is 
> dead.
> I would expect that in Out Of Memory situation JVM is ended if not graceful 
> than at least process is crashed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-31 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598948#comment-16598948
 ] 

Dong Lin commented on KAFKA-7278:
-

[~niob] The stacktrace in that Jira seems similar to the issue fixed here. So 
there is good chance that we have fixed that issue as well.

 

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7363) How num.stream.threads in streaming application influence memory consumption?

2018-08-31 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598941#comment-16598941
 ] 

John Roesler commented on KAFKA-7363:
-

I'm not sure what other information we can provide. Is it ok with you if we 
just resolve this ticket with "Information Provided"?

Or do you have further questions?

 

Thanks,

-John

> How num.stream.threads in streaming application influence memory consumption?
> -
>
> Key: KAFKA-7363
> URL: https://issues.apache.org/jira/browse/KAFKA-7363
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> Dears,
> How option _num.stream.threads_ in streaming application influence memory 
> consumption?
> I see that by increasing num.stream.threads my application needs more memory.
> This is obvious, but it is not obvious how much I need to give it. Try and 
> error method does not work, as it seems to be highly dependen on forced 
> throughput.
> I mean: higher load more memory is needed.
> Thanks for help and regards,
> Seweryn.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7363) How num.stream.threads in streaming application influence memory consumption?

2018-08-31 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598938#comment-16598938
 ] 

John Roesler commented on KAFKA-7363:
-

Hi Seweryn,
 
It's a little hard to say. For one thing, extra threads have some overhead of 
their own, but I agree with you that the bulk of the extra memory would come 
from the extra throughput you're able to drive through the application.
 
I haven't done any analysis of this before, so just reasoning about this (as 
opposed to speaking from experience):
 
In the maximum case, doubling your thread count would double your memory usage. 
This is for an "ideal" CPU-bound process. In reality, there are shared 
resources, such as network and disk, that should prevent you from reaching this 
bound.
 
In the minimum case, if the app is already saturating some other resource, like 
network, disk, or even memory, then increasing the thread count would not add 
an appreciable amount of memory. The reason is that if the app is saturating, 
say, the network already, then more threads doesn't change that fact, and you 
still can't increase the throughput.
 
As far as a concrete answer to your question, I think you're unfortunately the 
only one with enough visibility to predict the memory load. It would be very 
dependent on your machines, network, the number of topics and partitions, the 
size of your records in each partition, what exactly your Streams app does, and 
even your broker configuration.
 
However, I'd propose the following experimental strategy to try and get a 
handle on it:
1. start with one thread. Observe all the main resources (CPU, network i/o, 
disk i/o), but especially memory. For memory, pay particular attention to the 
memory used immediately after GC. You might want to turn on GC logging to help 
with this.
1b. observe these metrics for long enough for a stable trend to emerge. This 
might be hours or even a day.
2. add one more thread. Continue observing all the resources. As I said, in the 
ideal case, this should double your throughput and hence double your memory 
usage. Looking at how much all the extra metrics increase when you add the 
second thread should help you start building a model of the increase you should 
expect for each extra thread.
3. continue the experiment, adding one thread each time. At some point, you'll 
notice that the throughput/memory increase drops off when you add an extra 
thread. This means that you've saturated one or more other resource. The 
metrics for those resources should corroborate this.
 
Note that, if nothing else, the CPU should become saturated once the number of 
threads is equal to the number of cores. Increasing the thread count much 
beyond this shouldn't help much.
 
I hope this helps!

> How num.stream.threads in streaming application influence memory consumption?
> -
>
> Key: KAFKA-7363
> URL: https://issues.apache.org/jira/browse/KAFKA-7363
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> Dears,
> How option _num.stream.threads_ in streaming application influence memory 
> consumption?
> I see that by increasing num.stream.threads my application needs more memory.
> This is obvious, but it is not obvious how much I need to give it. Try and 
> error method does not work, as it seems to be highly dependen on forced 
> throughput.
> I mean: higher load more memory is needed.
> Thanks for help and regards,
> Seweryn.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7363) How num.stream.threads in streaming application influence memory consumption?

2018-08-31 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-7363:

Component/s: streams

> How num.stream.threads in streaming application influence memory consumption?
> -
>
> Key: KAFKA-7363
> URL: https://issues.apache.org/jira/browse/KAFKA-7363
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> Dears,
> How option _num.stream.threads_ in streaming application influence memory 
> consumption?
> I see that by increasing num.stream.threads my application needs more memory.
> This is obvious, but it is not obvious how much I need to give it. Try and 
> error method does not work, as it seems to be highly dependen on forced 
> throughput.
> I mean: higher load more memory is needed.
> Thanks for help and regards,
> Seweryn.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1880) Add support for checking binary/source compatibility

2018-08-31 Thread Viktor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598830#comment-16598830
 ] 

Viktor Somogyi commented on KAFKA-1880:
---

[~granthenke] if you don't mind I've reassigned this to continue your work as 
something similar has come up regarding KIP-336.

> Add support for checking binary/source compatibility
> 
>
> Key: KAFKA-1880
> URL: https://issues.apache.org/jira/browse/KAFKA-1880
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Ashish Singh
>Assignee: Viktor Somogyi
>Priority: Major
> Attachments: compatibilityReport-only-incompatible.html, 
> compatibilityReport.html
>
>
> Recent discussions around compatibility shows how important compatibility is 
> to users. Kafka should leverage a tool to find, report, and avoid 
> incompatibility issues in public methods.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-1880) Add support for checking binary/source compatibility

2018-08-31 Thread Viktor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi reassigned KAFKA-1880:
-

Assignee: Viktor Somogyi  (was: Grant Henke)

> Add support for checking binary/source compatibility
> 
>
> Key: KAFKA-1880
> URL: https://issues.apache.org/jira/browse/KAFKA-1880
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Ashish Singh
>Assignee: Viktor Somogyi
>Priority: Major
> Attachments: compatibilityReport-only-incompatible.html, 
> compatibilityReport.html
>
>
> Recent discussions around compatibility shows how important compatibility is 
> to users. Kafka should leverage a tool to find, report, and avoid 
> incompatibility issues in public methods.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna edited comment on KAFKA-7365 at 8/31/18 12:06 PM:
---

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

 

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{Also, please check the description of CommitFailedException which might help 
you understand this issue.}}

_{{Commit cannot be completed since the group has already }}{{rebalanced and 
assigned the partitions to another member. This means that the time }}{{between 
subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
}}{{which typically implies that the poll loop is spending too much time 
message processing. }}{{You can address this either by increasing the session 
timeout or by reducing the maximum }}{{size of batches returned in poll() with 
max.poll.records.}}_

 


was (Author: manme...@gmail.com):
[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

 

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> [max.poll.interval.ms|https://max.poll.interval.ms/] = 30
> max.poll.records = 1
> [metadata.max.age.ms|https://metadata.max.age.ms/] = 

[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna edited comment on KAFKA-7365 at 8/31/18 12:06 PM:
---

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

 

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{Also, please check the description of CommitFailedException which might help 
you understand this issue.}}

_{{Commit cannot be completed since the group has already rebalanced and 
assigned the partitions to another member. This means that the time between 
subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
which typically implies that the poll loop is spending too much time message 
processing. You can address this either by increasing the session timeout or by 
reducing the maximum size of batches returned in poll() with 
max.poll.records.}}_

 


was (Author: manme...@gmail.com):
[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

 

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{Also, please check the description of CommitFailedException which might help 
you understand this issue.}}

_{{Commit cannot be completed since the group has already }}{{rebalanced and 
assigned the partitions to another member. This means that the time }}{{between 
subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
}}{{which typically implies that the poll loop is spending too much time 
message processing. }}{{You can address this either by increasing the session 
timeout or by reducing the maximum }}{{size of batches returned in poll() with 
max.poll.records.}}_

 

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = t

[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna edited comment on KAFKA-7365 at 8/31/18 12:04 PM:
---

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

 


was (Author: manme...@gmail.com):
[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{If you can take a look at this part of code (Fetcher.java) - it might be what 
you are experiencing:}}

{{// this case shouldn't usually happen because we only send one fetch at a 
time per partition,}}
 {{// but it might conceivably happen in some rare cases (such as partition 
leader changes).}}
 {{// we have to copy to a new list because the old one may be immutable}}
  {{List> newRecords = new ArrayList<>(records.size() + 
currentRecords.size());}}
  {{newRecords.addAll(currentRecords);}}
  {{newRecords.addAll(records);}}
 {{fetched.put(partition, newRecords);}}

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> [max.poll.interval.ms|https://max.poll.interval.ms/] = 30
> max.poll.records = 1
> [metadata.max.age.ms|https://metadata.max.age.ms/] = 30
> metric.reporters = []
> me

[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna edited comment on KAFKA-7365 at 8/31/18 11:58 AM:
---

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{If you can take a look at this part of code (Fetcher.java) - it might be what 
you are experiencing:}}

{{// this case shouldn't usually happen because we only send one fetch at a 
time per partition,}}
 {{// but it might conceivably happen in some rare cases (such as partition 
leader changes).}}
 {{// we have to copy to a new list because the old one may be immutable}}
  {{List> newRecords = new ArrayList<>(records.size() + 
currentRecords.size());}}
  {{newRecords.addAll(currentRecords);}}
  {{newRecords.addAll(records);}}
 {{fetched.put(partition, newRecords);}}


was (Author: manme...@gmail.com):
[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{If you can take a look at this part of code (Fetcher.java) - it might be what 
you are experiencing:}}

 

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> [max.poll.interval.ms|https://max.poll.interval.ms/] = 30
> max.p

[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna edited comment on KAFKA-7365 at 8/31/18 11:57 AM:
---

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{If you can take a look at this part of code (Fetcher.java) - it might be what 
you are experiencing:}}

 


was (Author: manme...@gmail.com):
[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{If you can take a look at this part of code (Fetcher.java) - it might be what 
you are experiencing:}}

{{    // this case shouldn't usually happen because we 
only send one fetch at a time per partition,}}
{{    // but it might conceivably happen in some rare 
cases (such as partition leader changes).}}
{{    // we have to copy to a new list because the old 
one may be immutable}}
{{    List> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());}}
{{    newRecords.addAll(currentRecords);}}
{{    newRecords.addAll(records);}}
{{    fetched.put(partition, newRecords);}}

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.de

[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna edited comment on KAFKA-7365 at 8/31/18 11:55 AM:
---

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{If you can take a look at this part of code (Fetcher.java) - it might be what 
you are experiencing:}}

{{    // this case shouldn't usually happen because we 
only send one fetch at a time per partition,}}
{{    // but it might conceivably happen in some rare 
cases (such as partition leader changes).}}
{{    // we have to copy to a new list because the old 
one may be immutable}}
{{    List> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());}}
{{    newRecords.addAll(currentRecords);}}
{{    newRecords.addAll(records);}}
{{    fetched.put(partition, newRecords);}}


was (Author: manme...@gmail.com):
[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

Do you know roughly how much delay (max) you need to process the message? e.g. 
20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and 
adjust session.timeout.ms to be ~3x more than that) and see if you are still 
encountering the issue? I might have overlooked something, but from the code I 
don't see any reason why this will be overridden (will check this anyway).

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 10485

[jira] [Commented] (KAFKA-6950) Add mechanism to delay response to failed client authentication

2018-08-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598593#comment-16598593
 ] 

ASF GitHub Bot commented on KAFKA-6950:
---

rajinisivaram closed pull request #5082: KAFKA-6950: Delay response to failed 
client authentication to prevent potential DoS issues (KIP-306)
URL: https://github.com/apache/kafka/pull/5082
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
index f6458c6f22d..7a05eba03f2 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
@@ -40,6 +40,10 @@ public AuthenticationException(String message) {
 super(message);
 }
 
+public AuthenticationException(Throwable cause) {
+super(cause);
+}
+
 public AuthenticationException(String message, Throwable cause) {
 super(message, cause);
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
index 4e2e7273a68..33c2e908516 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
@@ -37,6 +37,14 @@
  */
 void authenticate() throws AuthenticationException, IOException;
 
+/**
+ * Perform any processing related to authentication failure. This is 
invoked when the channel is about to be closed
+ * because of an {@link AuthenticationException} thrown from a prior 
{@link #authenticate()} call.
+ * @throws IOException if read/write fails due to an I/O error
+ */
+default void handleAuthenticationFailure() throws IOException {
+}
+
 /**
  * Returns Principal using PrincipalBuilder
  */
@@ -46,5 +54,4 @@
  * returns true if authentication is complete otherwise returns false;
  */
 boolean complete();
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/DelayedResponseAuthenticationException.java
 
b/clients/src/main/java/org/apache/kafka/common/network/DelayedResponseAuthenticationException.java
new file mode 100644
index 000..8474426c609
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/DelayedResponseAuthenticationException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.errors.AuthenticationException;
+
+public class DelayedResponseAuthenticationException extends 
AuthenticationException {
+private static final long serialVersionUID = 1L;
+
+public DelayedResponseAuthenticationException(Throwable cause) {
+super(cause);
+}
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 1839729f2e7..17dc6a33ef2 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -120,15 +120,22 @@ public KafkaPrincipal principal() {
  * authentication. For SASL, authentication is performed by {@link 
Authenticator#authenticate()}.
  */
 public void prepare() throws AuthenticationException, IOException {
+boolean authenticating = false;
 try {
 if (!transportLayer.ready())
 transportLayer.handshake();
-if (transportLayer.ready() && !authenticator.complete())
+if (transportLayer.ready() && !authenticator.complete()) {
+authenticating = true;
 authenticator

[jira] [Commented] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna commented on KAFKA-7365:
-

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

Do you know roughly how much delay (max) you need to process the message? e.g. 
20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and 
adjust session.timeout.ms to be ~3x more than that) and see if you are still 
encountering the issue? I might have overlooked something, but from the code I 
don't see any reason why this will be overridden (will check this anyway).

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> [max.poll.interval.ms|https://max.poll.interval.ms/] = 30
> max.poll.records = 1
> [metadata.max.age.ms|https://metadata.max.age.ms/] = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> [metrics.sample.window.ms|https://metrics.sample.window.ms/] = 3
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> [reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000
> [reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50
> [request.timeout.ms|https://request.timeout.ms/] = 4
> [retry.backoff.ms|https://retry.backoff.ms/] = 100
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = SSL
> send.buffer.bytes = 131072
> [session.timeout.ms|https://session.timeout.ms/] = 1
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = [hidden]
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = 
> /kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jks
> ssl.keystore.password = [hidden]
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = 

[jira] [Commented] (KAFKA-4544) Add system tests for delegation token based authentication

2018-08-31 Thread Attila Sasvari (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598557#comment-16598557
 ] 

Attila Sasvari commented on KAFKA-4544:
---

Do you have the capacity to work on this [~omkreddy]? If there's anything I can 
do to help, please let me know.

> Add system tests for delegation token based authentication
> --
>
> Key: KAFKA-4544
> URL: https://issues.apache.org/jira/browse/KAFKA-4544
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Ashish Singh
>Assignee: Manikumar
>Priority: Major
>
> Add system tests for delegation token based authentication.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-31 Thread Christoph Schmidt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598540#comment-16598540
 ] 

Christoph Schmidt commented on KAFKA-7278:
--

This ticket has raised questions in the comments of ancient KAFKA-1194 (which 
knows three old PRs) - does it by chance fix that issue, too? Root cause over 
there is that rename-while-still-open blows up under windows, with the only 
available workaround being to completely disable the log cleaner.

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread Kashyap Ivaturi (JIRA)
Kashyap Ivaturi created KAFKA-7365:
--

 Summary: max.poll.records setting in Kafka Consumer is not working
 Key: KAFKA-7365
 URL: https://issues.apache.org/jira/browse/KAFKA-7365
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Kashyap Ivaturi


Hi,

I have a requirement where I consume messages one by one, each message has 
additional processing that I should do and then manually commit the offset.

Things work well most of the times until I get a big bunch of records which 
takes longer time to process and I encounter CommitFailed exception for the 
last set of records even though they were processed. While i'am able to 
reconnect back its picking some messages that I had already processed. I don't 
want this to happen as its creating duplicates in target systems that I 
integrate with while processing the message.

 

I decided that even though there are more messages in the queue , I would like 
to have a control on how many records I can process when polled.

I tried to replicate a scenario where I have started the consumer by setting 
'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
consumer is listening.

I expected that the consumer will only process 1 message because of my 
'max.poll.records' setting but the consumer has processed all the 4 messages in 
single poll. Any idea why did it not consider 'max.poll.records' setting or is 
some other setting overriding this setting?. Appreciate your help or guidance 
in troubleshooting this issue.

Here is the log of my Consumer config when it starts:

 

2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 

[auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000

auto.offset.reset = earliest

bootstrap.servers = [messaging-rtp3.cisco.com:9093]

check.crcs = true

[client.id|https://client.id/] = 

[connections.max.idle.ms|https://connections.max.idle.ms/] = 54

enable.auto.commit = false

exclude.internal.topics = true

fetch.max.bytes = 52428800

[fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500

fetch.min.bytes = 1

[group.id|https://group.id/] = empestor

[heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000

interceptor.classes = null

internal.leave.group.on.close = true

isolation.level = read_uncommitted

key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer

max.partition.fetch.bytes = 1048576

[max.poll.interval.ms|https://max.poll.interval.ms/] = 30

max.poll.records = 1

[metadata.max.age.ms|https://metadata.max.age.ms/] = 30

metric.reporters = []

metrics.num.samples = 2

metrics.recording.level = INFO

[metrics.sample.window.ms|https://metrics.sample.window.ms/] = 3

partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]

receive.buffer.bytes = 65536

[reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000

[reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50

[request.timeout.ms|https://request.timeout.ms/] = 4

[retry.backoff.ms|https://retry.backoff.ms/] = 100

sasl.jaas.config = null

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.min.time.before.relogin = 6

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

sasl.kerberos.ticket.renew.window.factor = 0.8

sasl.mechanism = GSSAPI

security.protocol = SSL

send.buffer.bytes = 131072

[session.timeout.ms|https://session.timeout.ms/] = 1

ssl.cipher.suites = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.endpoint.identification.algorithm = null

ssl.key.password = [hidden]

ssl.keymanager.algorithm = SunX509

ssl.keystore.location = 
/kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jks

ssl.keystore.password = [hidden]

ssl.keystore.type = JKS

ssl.protocol = TLS

ssl.provider = null

ssl.secure.random.implementation = null

ssl.trustmanager.algorithm = PKIX

ssl.truststore.location = 
/kafka/certs/empestor/certificates/kafka.client.truststore.jks

ssl.truststore.password = [hidden]

ssl.truststore.type = JKS

value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer

 

2018-08-28 08:29:48.079  INFO 91121 --- [           main] 
o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)