[jira] [Resolved] (KAFKA-7369) Retry when possible in AdminClient.listConsumerGroups
[ https://issues.apache.org/jira/browse/KAFKA-7369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-7369. Resolution: Fixed Fix Version/s: 2.1.0 2.0.1 > Retry when possible in AdminClient.listConsumerGroups > - > > Key: KAFKA-7369 > URL: https://issues.apache.org/jira/browse/KAFKA-7369 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.0.1, 2.1.0 > > > Currently we do not retry ListGroups requests when they fail due to retriable > errors. For example, this is causing some instability in > `kafka.admin.ListConsumerGroupTest.testListConsumerGroups`. > {code} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.CoordinatorLoadInProgressException: Error > listing groups on localhost:43001 (id: 0 rack: null) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:132) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7369) Retry when possible in AdminClient.listConsumerGroups
[ https://issues.apache.org/jira/browse/KAFKA-7369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599455#comment-16599455 ] ASF GitHub Bot commented on KAFKA-7369: --- hachikuji closed pull request #5595: KAFKA-7369; Handle retriable errors in AdminClient list groups API URL: https://github.com/apache/kafka/pull/5595 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 8ddb0c08627..904cd0601e5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2567,8 +2567,11 @@ private void maybeAddConsumerGroup(ListGroupsResponse.Group group) { void handleResponse(AbstractResponse abstractResponse) { final ListGroupsResponse response = (ListGroupsResponse) abstractResponse; synchronized (results) { -if (response.error() != Errors.NONE) { - results.addError(response.error().exception(), node); +Errors error = response.error(); +if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) { +throw error.exception(); +} else if (error != Errors.NONE) { +results.addError(error.exception(), node); } else { for (ListGroupsResponse.Group group : response.groups()) { maybeAddConsumerGroup(group); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java index b108803590f..af6f7212e8c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java @@ -62,6 +62,7 @@ /** * Possible error codes: * + * COORDINATOR_LOADING_IN_PROGRESS (14) * COORDINATOR_NOT_AVAILABLE (15) * AUTHORIZATION_FAILED (29) */ diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 0245cbd3695..c0dc542b159 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -36,7 +36,6 @@ import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.AuthenticationException; -import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.LeaderNotAvailableException; @@ -46,6 +45,7 @@ import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicDeletionDisabledException; +import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; @@ -863,9 +863,11 @@ public void testListConsumerGroups() throws Exception { Node node0 = new Node(0, "localhost", 8121); Node node1 = new Node(1, "localhost", 8122); Node node2 = new Node(2, "localhost", 8123); +Node node3 = new Node(3, "localhost", 8124); nodes.put(0, node0); nodes.put(1, node1); nodes.put(2, node2); +nodes.put(3, node3); final Cluster cluster = new Cluster( "mockClusterId", @@ -902,13 +904,19 @@ public void testListConsumerGroups() throws Exception { )), node0); +// handle retriable errors env.kafkaClient().prepareResponseFrom( new ListGroupsResponse( Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyList() ), node1); - +
[jira] [Commented] (KAFKA-7370) Enhance FileConfigProvider to read a directory
[ https://issues.apache.org/jira/browse/KAFKA-7370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599394#comment-16599394 ] ASF GitHub Bot commented on KAFKA-7370: --- rayokota opened a new pull request #5596: KAFKA-7370: Enhance FileConfigProvider to read a dir URL: https://github.com/apache/kafka/pull/5596 This is a backward compatible enhancement to augment FileConfigProvider to read from a directory, where the file names are the keys and the corresponding file contents are the values. The former functionality of reading a Properties file is retained. This will allow for easier integration with secret management systems where each secret is often written to an individual file, such as in some Docker and Kubernetes setups. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enhance FileConfigProvider to read a directory > -- > > Key: KAFKA-7370 > URL: https://issues.apache.org/jira/browse/KAFKA-7370 > Project: Kafka > Issue Type: Improvement > Components: config >Affects Versions: 2.0.0 >Reporter: Robert Yokota >Assignee: Robert Yokota >Priority: Minor > > Currently FileConfigProvider can read a Properties file as a set of key-value > pairs. This enhancement is to augment FileConfigProvider so that it can also > read a directory, where the file names are the keys and the corresponding > file contents are the values. > This will allow for easier integration with secret management systems where > each secret is often an individual file, such as in Docker and Kubernetes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7370) Enhance FileConfigProvider to read a directory
Robert Yokota created KAFKA-7370: Summary: Enhance FileConfigProvider to read a directory Key: KAFKA-7370 URL: https://issues.apache.org/jira/browse/KAFKA-7370 Project: Kafka Issue Type: Improvement Components: config Affects Versions: 2.0.0 Reporter: Robert Yokota Assignee: Robert Yokota Currently FileConfigProvider can read a Properties file as a set of key-value pairs. This enhancement is to augment FileConfigProvider so that it can also read a directory, where the file names are the keys and the corresponding file contents are the values. This will allow for easier integration with secret management systems where each secret is often an individual file, such as in Docker and Kubernetes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7369) Retry when possible in AdminClient.listConsumerGroups
[ https://issues.apache.org/jira/browse/KAFKA-7369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599210#comment-16599210 ] ASF GitHub Bot commented on KAFKA-7369: --- hachikuji opened a new pull request #5595: KAFKA-7369; Handle retriable errors in AdminClient list groups API URL: https://github.com/apache/kafka/pull/5595 We should retry when possible if ListGroups fails due to a retriable error (e.g. coordinator loading). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Retry when possible in AdminClient.listConsumerGroups > - > > Key: KAFKA-7369 > URL: https://issues.apache.org/jira/browse/KAFKA-7369 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Currently we do not retry ListGroups requests when they fail due to retriable > errors. For example, this is causing some instability in > `kafka.admin.ListConsumerGroupTest.testListConsumerGroups`. > {code} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.CoordinatorLoadInProgressException: Error > listing groups on localhost:43001 (id: 0 rack: null) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:132) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7369) Retry when possible in AdminClient.listConsumerGroups
Jason Gustafson created KAFKA-7369: -- Summary: Retry when possible in AdminClient.listConsumerGroups Key: KAFKA-7369 URL: https://issues.apache.org/jira/browse/KAFKA-7369 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson Currently we do not retry ListGroups requests when they fail due to retriable errors. For example, this is causing some instability in `kafka.admin.ListConsumerGroupTest.testListConsumerGroups`. {code} java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.CoordinatorLoadInProgressException: Error listing groups on localhost:43001 (id: 0 rack: null) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:132) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7287) Set open ACL permissions for old consumer znode path
[ https://issues.apache.org/jira/browse/KAFKA-7287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-7287. Resolution: Fixed Fix Version/s: 2.1.0 2.0.1 1.1.2 Also merged the PR to 1.1 branch. > Set open ACL permissions for old consumer znode path > > > Key: KAFKA-7287 > URL: https://issues.apache.org/jira/browse/KAFKA-7287 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0 >Reporter: Manikumar >Assignee: Manikumar >Priority: Major > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Old consumer znode path should have open ACL permissions in kerberized > environment. This got missed in kafkaZkClient changes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7287) Set open ACL permissions for old consumer znode path
[ https://issues.apache.org/jira/browse/KAFKA-7287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599161#comment-16599161 ] ASF GitHub Bot commented on KAFKA-7287: --- junrao closed pull request #5585: KAFKA-7287: Set open ACL for old consumer znode path URL: https://github.com/apache/kafka/pull/5585 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index 03f4a05f1e4..cff32a2415f 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -407,8 +407,13 @@ object PreferredReplicaElectionZNode { }.map(_.toSet).getOrElse(Set.empty) } +//old consumer path znode +object ConsumerPathZNode { + def path = "/consumers" +} + object ConsumerOffset { - def path(group: String, topic: String, partition: Integer) = s"/consumers/${group}/offsets/${topic}/${partition}" + def path(group: String, topic: String, partition: Integer) = s"${ConsumerPathZNode.path}/${group}/offsets/${topic}/${partition}" def encode(offset: Long): Array[Byte] = offset.toString.getBytes(UTF_8) def decode(bytes: Array[Byte]): Option[Long] = Option(bytes).map(new String(_, UTF_8).toLong) } @@ -536,7 +541,7 @@ object ZkData { // These are persistent ZK paths that should exist on kafka broker startup. val PersistentZkPaths = Seq( -"/consumers", // old consumer path +ConsumerPathZNode.path, // old consumer path BrokerIdsZNode.path, TopicsZNode.path, ConfigEntityChangeNotificationZNode.path, @@ -558,7 +563,8 @@ object ZkData { } def defaultAcls(isSecure: Boolean, path: String): Seq[ACL] = { -if (isSecure) { +//Old Consumer path is kept open as different consumers will write under this node. +if (!ConsumerPathZNode.path.equals(path) && isSecure) { val acls = new ArrayBuffer[ACL] acls ++= ZooDefs.Ids.CREATOR_ALL_ACL.asScala if (!sensitivePath(path)) diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala index 033ca67143b..a80c4074e84 100644 --- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala @@ -19,10 +19,10 @@ package kafka.security.auth import kafka.admin.ZkSecurityMigrator import kafka.utils.{Logging, TestUtils, ZkUtils} -import kafka.zk.ZooKeeperTestHarness +import kafka.zk.{ConsumerPathZNode, ZooKeeperTestHarness} import org.apache.kafka.common.KafkaException import org.apache.kafka.common.security.JaasUtils -import org.apache.zookeeper.data.{ACL} +import org.apache.zookeeper.data.{ACL, Stat} import org.junit.Assert._ import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ @@ -299,4 +299,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { } } } + + @Test + def testConsumerOffsetPathAcls(): Unit = { +zkClient.makeSurePersistentPathExists(ConsumerPathZNode.path) + +val consumerPathAcls = zkClient.currentZooKeeper.getACL(ConsumerPathZNode.path, new Stat()) +assertTrue("old consumer znode path acls are not open", consumerPathAcls.asScala.forall(TestUtils.isAclUnsecure)) + } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Set open ACL permissions for old consumer znode path > > > Key: KAFKA-7287 > URL: https://issues.apache.org/jira/browse/KAFKA-7287 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0 >Reporter: Manikumar >Assignee: Manikumar >Priority: Major > > Old consumer znode path should have open ACL permissions in kerberized > environment. This got missed in kafkaZkClient changes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7368) Support joining Windowed KTables
John Roesler created KAFKA-7368: --- Summary: Support joining Windowed KTables Key: KAFKA-7368 URL: https://issues.apache.org/jira/browse/KAFKA-7368 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Currently, there is no good way to join two `KTable, V>`, aka windowed KTables. They are KTable, so they have a `join` operator available, but it currently will use a regular KeyValue store instead of a Windowed store, so it will grow without bound and new windows enter. One option is to convert both KTables into KStream, and join them (which is a windowed join), and then convert them back into KTables for further processing, but this is an awkward way to accomplish an apparently straightforward task. It should instead be possible to directly support it, but the trick will be to make it impossible to accidentally use a window store for normal (aka non-windowed) KTables. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7367) Verify that Streams never creates RocksDB stores unless they are needed
[ https://issues.apache.org/jira/browse/KAFKA-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-7367: Labels: newbie (was: ) > Verify that Streams never creates RocksDB stores unless they are needed > --- > > Key: KAFKA-7367 > URL: https://issues.apache.org/jira/browse/KAFKA-7367 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: newbie > > We have gotten some reports of Streams creating RocksDB stores unnecessarily > for stateless processes. > We can and should verify this doesn't happen by creating integration tests > for *every* stateless operator that verify that after processing, the state > directory is still empty. > These tests could potentially be backported as far as we care to so that we > can identify and fix potential unnecessary stores in past versions as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7367) Verify that Streams never creates RocksDB stores unless they are needed
John Roesler created KAFKA-7367: --- Summary: Verify that Streams never creates RocksDB stores unless they are needed Key: KAFKA-7367 URL: https://issues.apache.org/jira/browse/KAFKA-7367 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler We have gotten some reports of Streams creating RocksDB stores unnecessarily for stateless processes. We can and should verify this doesn't happen by creating integration tests for *every* stateless operator that verify that after processing, the state directory is still empty. These tests could potentially be backported as far as we care to so that we can identify and fix potential unnecessary stores in past versions as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3596) Kafka Streams: Window expiration needs to consider more than event time
[ https://issues.apache.org/jira/browse/KAFKA-3596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599108#comment-16599108 ] John Roesler commented on KAFKA-3596: - This should be fixed by a few changes coming in 2.1: * window segments are no longer bounded by number, but by size. This means that if you were to get events "from the future", they would no longer cause still-live windows to be dropped * we have revised the processing and stream-time model: ** We pull records one-at-a-time from the inputs in the order of their timestamp. This should eliminate the cases where we process an event with a much-advanced timestamp from queue A, while there are events with smaller timestamps still to process from queue B. It won't eliminate cases where queue A alone has out-of-order events. ** Stream time itself is now computed to be the non-decreasing maximum of observed timestamps. ** Together, these changes mean that "future events" are no longer possible, only "late events". > Kafka Streams: Window expiration needs to consider more than event time > --- > > Key: KAFKA-3596 > URL: https://issues.apache.org/jira/browse/KAFKA-3596 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Henry Cai >Priority: Minor > Labels: architecture > Fix For: 2.1.0 > > > Currently in Kafka Streams, the way the windows are expired in RocksDB is > triggered by new event insertion. When a window is created at T0 with 10 > minutes retention, when we saw a new record coming with event timestamp T0 + > 10 +1, we will expire that window (remove it) out of RocksDB. > In the real world, it's very easy to see event coming with future timestamp > (or out-of-order events coming with big time gaps between events), this way > of retiring a window based on one event's event timestamp is dangerous. I > think at least we need to consider both the event's event time and > server/stream time elapse. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3596) Kafka Streams: Window expiration needs to consider more than event time
[ https://issues.apache.org/jira/browse/KAFKA-3596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-3596: Fix Version/s: 2.1.0 > Kafka Streams: Window expiration needs to consider more than event time > --- > > Key: KAFKA-3596 > URL: https://issues.apache.org/jira/browse/KAFKA-3596 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Henry Cai >Priority: Minor > Labels: architecture > Fix For: 2.1.0 > > > Currently in Kafka Streams, the way the windows are expired in RocksDB is > triggered by new event insertion. When a window is created at T0 with 10 > minutes retention, when we saw a new record coming with event timestamp T0 + > 10 +1, we will expire that window (remove it) out of RocksDB. > In the real world, it's very easy to see event coming with future timestamp > (or out-of-order events coming with big time gaps between events), this way > of retiring a window based on one event's event timestamp is dangerous. I > think at least we need to consider both the event's event time and > server/stream time elapse. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-4988) JVM crash when running on Alpine Linux
[ https://issues.apache.org/jira/browse/KAFKA-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-4988. - Resolution: Won't Fix I think this issue is out of our hands. If you think there is something for us to do, please feel free to reopen the ticket and comment. Thanks, -John > JVM crash when running on Alpine Linux > -- > > Key: KAFKA-4988 > URL: https://issues.apache.org/jira/browse/KAFKA-4988 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Vincent Rischmann >Priority: Minor > > I'm developing my Kafka Streams application using Docker and I run my jars > using the official openjdk:8-jre-alpine image. > I'm just starting to use windowing and now the JVM crashes because of an > issue with RocksDB I think. > It's trivial to fix on my part, just use the debian jessie based image. > However, it would be cool if alpine was supported too since its docker images > are quite a bit less heavy > {quote} > Exception in thread "StreamThread-1" java.lang.UnsatisfiedLinkError: > /tmp/librocksdbjni3285995384052305662.so: Error loading shared library > ld-linux-x86-64.so.2: No such file or directory (needed by > /tmp/librocksdbjni3285995384052305662.so) > at java.lang.ClassLoader$NativeLibrary.load(Native Method) > at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941) > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824) > at java.lang.Runtime.load0(Runtime.java:809) > at java.lang.System.load(System.java:1086) > at > org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78) > at > org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56) > at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64) > at org.rocksdb.RocksDB.(RocksDB.java:35) > at org.rocksdb.Options.(Options.java:22) > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:115) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:148) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7f60f34ce088, pid=1, tid=0x7f60f3705ab0 > # > # JRE version: OpenJDK Runtime Environment (8.0_121-b13) (build 1.8.0_121-b13) > # Java VM: OpenJDK 64-Bit Server VM (25.121-b13 mixed mode linux-amd64 > compressed oops) > # Derivati
[jira] [Commented] (KAFKA-6033) Kafka Streams does not work with musl-libc (alpine linux)
[ https://issues.apache.org/jira/browse/KAFKA-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599098#comment-16599098 ] John Roesler commented on KAFKA-6033: - Also, I've just found https://issues.apache.org/jira/browse/KAFKA-4988, which has some more concrete recommendations. It might be worth a look. > Kafka Streams does not work with musl-libc (alpine linux) > - > > Key: KAFKA-6033 > URL: https://issues.apache.org/jira/browse/KAFKA-6033 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.0 > Environment: Alpine 3.6 >Reporter: Jeffrey Zampieron >Priority: Major > > Using the released version of kafka fails on alpine based images b/c of > rocksdb using the jni and failing to load. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6033) Kafka Streams does not work with musl-libc (alpine linux)
[ https://issues.apache.org/jira/browse/KAFKA-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-6033. - Resolution: Won't Fix This is unfortunately out of our hands. If you think I'm wrong about this, please reopen the ticket. Thanks, -John > Kafka Streams does not work with musl-libc (alpine linux) > - > > Key: KAFKA-6033 > URL: https://issues.apache.org/jira/browse/KAFKA-6033 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.0 > Environment: Alpine 3.6 >Reporter: Jeffrey Zampieron >Priority: Major > > Using the released version of kafka fails on alpine based images b/c of > rocksdb using the jni and failing to load. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4544) Add system tests for delegation token based authentication
[ https://issues.apache.org/jira/browse/KAFKA-4544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599038#comment-16599038 ] Manikumar commented on KAFKA-4544: -- [~asasvari] If you are interested, Please takeover the JIRA. Currently I am working on KAFKA-6945. Thanks. > Add system tests for delegation token based authentication > -- > > Key: KAFKA-4544 > URL: https://issues.apache.org/jira/browse/KAFKA-4544 > Project: Kafka > Issue Type: Sub-task > Components: security >Reporter: Ashish Singh >Assignee: Manikumar >Priority: Major > > Add system tests for delegation token based authentication. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7366) topic level segment.bytes and segment.ms not taking effect immediately
[ https://issues.apache.org/jira/browse/KAFKA-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599003#comment-16599003 ] Jun Rao commented on KAFKA-7366: [~ijuma], it might be debatable. However, if all other topic level configs take effect immediately, it seems that we should make that consistent for all configs. > topic level segment.bytes and segment.ms not taking effect immediately > -- > > Key: KAFKA-7366 > URL: https://issues.apache.org/jira/browse/KAFKA-7366 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 2.0.0 >Reporter: Jun Rao >Priority: Major > > It used to be that topic level configs such as segment.bytes takes effect > immediately. Because of KAFKA-6324 in 1.1, those configs now only take effect > after the active segment has rolled. The relevant part of KAFKA-6324 is that > in Log.maybeRoll, the checking of the segment rolling is moved to > LogSegment.shouldRoll(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7366) topic level segment.bytes and segment.ms not taking effect immediately
[ https://issues.apache.org/jira/browse/KAFKA-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598993#comment-16598993 ] Ismael Juma commented on KAFKA-7366: [~rsivaram] noticed this while working on dynamic configs as well. We discussed it then and I'm not sure what was the conclusion. I guess you're saying we should change it back [~junrao]? > topic level segment.bytes and segment.ms not taking effect immediately > -- > > Key: KAFKA-7366 > URL: https://issues.apache.org/jira/browse/KAFKA-7366 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 2.0.0 >Reporter: Jun Rao >Priority: Major > > It used to be that topic level configs such as segment.bytes takes effect > immediately. Because of KAFKA-6324 in 1.1, those configs now only take effect > after the active segment has rolled. The relevant part of KAFKA-6324 is that > in Log.maybeRoll, the checking of the segment rolling is moved to > LogSegment.shouldRoll(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7214) Mystic FATAL error
[ https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598992#comment-16598992 ] John Roesler commented on KAFKA-7214: - Hi [~habdank], I've commented on https://issues.apache.org/jira/browse/KAFKA-6777. If the problem is long GC pauses, but the JVM never actually runs out of memory, there there would be no OOME to catch. Can you let us know what kind of GC pauses you're observing? As Guozhang said, if any pause is longer than any of the heartbeat intervals (or other keepalive configs like the poll interval), then it will cause problems. The only solution to long GC pauses is to reshape the computation, which it seems like you have done. Also, regarding: > The keyword in all those errors is: KSTREAM-SOURCE-X This is just the name of a source node in your streams topology. Such names will appear in all kinds of logs, and doesn't indicate that your problem is related to this ticket. Maybe we can relocate this discussion to a new ticket? If you do start a new ticket, can you indicate what behavior you observe? Such as, "my application crashes" or "my application rebalances frequently". Thanks, -John > Mystic FATAL error > -- > > Key: KAFKA-7214 > URL: https://issues.apache.org/jira/browse/KAFKA-7214 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.3, 1.1.1 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Critical > > Dears, > Very often at startup of the streaming application I got exception: > {code} > Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, > topic=my_instance_medium_topic, partition=1, offset=198900203; > [org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:212), > > org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:347), > > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:420), > > org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:339), > > org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:648), > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513), > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482), > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:459)] > in thread > my_application-my_instance-my_instance_medium-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62 > {code} > and then (without shutdown request from my side): > {code} > 2018-07-30 07:45:02 [ar313] [INFO ] StreamThread:912 - stream-thread > [my_application-my_instance-my_instance-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62] > State transition from PENDING_SHUTDOWN to DEAD. > {code} > What is this? > How to correctly handle it? > Thanks in advance for help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7366) topic level segment.bytes and segment.ms not taking effect immediately
[ https://issues.apache.org/jira/browse/KAFKA-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-7366: --- Affects Version/s: 2.0.0 > topic level segment.bytes and segment.ms not taking effect immediately > -- > > Key: KAFKA-7366 > URL: https://issues.apache.org/jira/browse/KAFKA-7366 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 2.0.0 >Reporter: Jun Rao >Priority: Major > > It used to be that topic level configs such as segment.bytes takes effect > immediately. Because of KAFKA-6324 in 1.1, those configs now only take effect > after the active segment has rolled. The relevant part of KAFKA-6324 is that > in Log.maybeRoll, the checking of the segment rolling is moved to > LogSegment.shouldRoll(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7364) kafka periodically run into high cpu usage with ssl writing
[ https://issues.apache.org/jira/browse/KAFKA-7364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598988#comment-16598988 ] Ismael Juma commented on KAFKA-7364: If you have 20k producers all connecting at the same time, then it's not unexpected that it would be costly. OpenSSL may be better, if someone wants to test that, it would be useful. Once connections are established, then the handshake cost is no longer an issue, of course. > kafka periodically run into high cpu usage with ssl writing > --- > > Key: KAFKA-7364 > URL: https://issues.apache.org/jira/browse/KAFKA-7364 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.0.0 >Reporter: Yu Yang >Priority: Major > Attachments: Screen Shot 2018-08-30 at 10.57.32 PM.png > > > while testing ssl writing to kafka, we found that kafka often run into high > cpu usage due to inefficiency in jdk ssl implementation. > In detail, we use a test cluster of 12 d2.8xlarge instances that uses kafka > 2.0.0, jdk-10.0.2, and hosts only one topic that have ~20k producers write > to through ssl channel. We observed that the network threads often get 100% > cpu usage after enabling ssl writing to kafka. To improve kafka's > throughput, we have "num.network.threads=32" for the broker. Even with 32 > network threads, we see the broker cpu usage jump right after ssl writing is > enabled. The broker's cpu usage would drop immediately when we disabled ssl > writing. > !Screen Shot 2018-08-30 at 10.57.32 PM.png|height=360! > When the broker's cpu usage is high, 'perf top' shows that kafka is busy with > executing code in libsunec.so. The following is a sample stack track that > we get when the broker's cpu usage was high. This seems to be related to > inefficiency in jdk ssl related implementation. Switching to use > https://github.com/netty/netty-tcnative to handle ssl handshake can be > helpful. > {code} > Thread 77562: (state = IN_NATIVE) > - sun.security.ec.ECDSASignature.verifySignedDigest(byte[], byte[], byte[], > byte[]) @bci=0 (Compiled frame; information may be imprecise) > - sun.security.ec.ECDSASignature.engineVerify(byte[]) @bci=70, line=321 > (Compiled frame) > - java.security.Signature$Delegate.engineVerify(byte[]) @bci=9, line=1222 > (Compiled frame) > - java.security.Signature.verify(byte[]) @bci=10, line=655 (Compiled frame) > - sun.security.x509.X509CertImpl.verify(java.security.PublicKey, > java.lang.String) @bci=136, line=444 (Compiled frame) > - > sun.security.provider.certpath.BasicChecker.verifySignature(java.security.cert.X509Certificate) > @bci=48, line=166 (Compiled frame) > - > sun.security.provider.certpath.BasicChecker.check(java.security.cert.Certificate, > java.util.Collection) @bci=24, line=147 (Compiled frame) > - > sun.security.provider.certpath.PKIXMasterCertPathValidator.validate(java.security.cert.CertPath, > java.util.List, java.util.List) @bci=316, line=125 (Compiled frame) > - > sun.security.provider.certpath.PKIXCertPathValidator.validate(java.security.cert.TrustAnchor, > sun.security.provider.certpath.PKIX$ValidatorParams) @bci=390, line=233 > (Compiled frame) > - > sun.security.provider.certpath.PKIXCertPathValidator.validate(sun.security.provider.certpath.PKIX$ValidatorParams) > @bci=217, line=141 (Compiled frame) > - > sun.security.provider.certpath.PKIXCertPathValidator.engineValidate(java.security.cert.CertPath, > java.security.cert.CertPathParameters) @bci=7, line=80 (Compiled frame) > - java.security.cert.CertPathValidator.validate(java.security.cert.CertPath, > java.security.cert.CertPathParameters) @bci=6, line=292 (Compiled frame) > - > sun.security.validator.PKIXValidator.doValidate(java.security.cert.X509Certificate[], > java.security.cert.PKIXBuilderParameters) @bci=34, line=357 (Compiled frame) > - > sun.security.validator.PKIXValidator.engineValidate(java.security.cert.X509Certificate[], > java.util.Collection, java.security.AlgorithmConstraints, java.lang.Object) > @bci=232, line=259 (Compiled frame) > - > sun.security.validator.Validator.validate(java.security.cert.X509Certificate[], > java.util.Collection, java.security.AlgorithmConstraints, java.lang.Object) > @bci=6, line=260 (Compiled frame) > - > sun.security.ssl.X509TrustManagerImpl.validate(sun.security.validator.Validator, > java.security.cert.X509Certificate[], java.security.AlgorithmConstraints, > java.lang.String) @bci=10, line=324 (Compiled frame) > - > sun.security.ssl.X509TrustManagerImpl.checkTrusted(java.security.cert.X509Certificate[], > java.lang.String, javax.net.ssl.SSLEngine, boolean) @bci=179, line=279 > (Compiled frame) > - > sun.security.ssl.X509TrustManagerImpl.checkClientTrusted(java.security.cert.X509Ce
[jira] [Created] (KAFKA-7366) topic level segment.bytes and segment.ms not taking effect immediately
Jun Rao created KAFKA-7366: -- Summary: topic level segment.bytes and segment.ms not taking effect immediately Key: KAFKA-7366 URL: https://issues.apache.org/jira/browse/KAFKA-7366 Project: Kafka Issue Type: Bug Affects Versions: 1.1.0 Reporter: Jun Rao It used to be that topic level configs such as segment.bytes takes effect immediately. Because of KAFKA-6324 in 1.1, those configs now only take effect after the active segment has rolled. The relevant part of KAFKA-6324 is that in Log.maybeRoll, the checking of the segment rolling is moved to LogSegment.shouldRoll(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7364) kafka periodically run into high cpu usage with ssl writing
[ https://issues.apache.org/jira/browse/KAFKA-7364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598981#comment-16598981 ] Ismael Juma commented on KAFKA-7364: Is this specific to ECDSA or all signature types? > kafka periodically run into high cpu usage with ssl writing > --- > > Key: KAFKA-7364 > URL: https://issues.apache.org/jira/browse/KAFKA-7364 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.0.0 >Reporter: Yu Yang >Priority: Major > Attachments: Screen Shot 2018-08-30 at 10.57.32 PM.png > > > while testing ssl writing to kafka, we found that kafka often run into high > cpu usage due to inefficiency in jdk ssl implementation. > In detail, we use a test cluster of 12 d2.8xlarge instances that uses kafka > 2.0.0, jdk-10.0.2, and hosts only one topic that have ~20k producers write > to through ssl channel. We observed that the network threads often get 100% > cpu usage after enabling ssl writing to kafka. To improve kafka's > throughput, we have "num.network.threads=32" for the broker. Even with 32 > network threads, we see the broker cpu usage jump right after ssl writing is > enabled. The broker's cpu usage would drop immediately when we disabled ssl > writing. > !Screen Shot 2018-08-30 at 10.57.32 PM.png|height=360! > When the broker's cpu usage is high, 'perf top' shows that kafka is busy with > executing code in libsunec.so. The following is a sample stack track that > we get when the broker's cpu usage was high. This seems to be related to > inefficiency in jdk ssl related implementation. Switching to use > https://github.com/netty/netty-tcnative to handle ssl handshake can be > helpful. > {code} > Thread 77562: (state = IN_NATIVE) > - sun.security.ec.ECDSASignature.verifySignedDigest(byte[], byte[], byte[], > byte[]) @bci=0 (Compiled frame; information may be imprecise) > - sun.security.ec.ECDSASignature.engineVerify(byte[]) @bci=70, line=321 > (Compiled frame) > - java.security.Signature$Delegate.engineVerify(byte[]) @bci=9, line=1222 > (Compiled frame) > - java.security.Signature.verify(byte[]) @bci=10, line=655 (Compiled frame) > - sun.security.x509.X509CertImpl.verify(java.security.PublicKey, > java.lang.String) @bci=136, line=444 (Compiled frame) > - > sun.security.provider.certpath.BasicChecker.verifySignature(java.security.cert.X509Certificate) > @bci=48, line=166 (Compiled frame) > - > sun.security.provider.certpath.BasicChecker.check(java.security.cert.Certificate, > java.util.Collection) @bci=24, line=147 (Compiled frame) > - > sun.security.provider.certpath.PKIXMasterCertPathValidator.validate(java.security.cert.CertPath, > java.util.List, java.util.List) @bci=316, line=125 (Compiled frame) > - > sun.security.provider.certpath.PKIXCertPathValidator.validate(java.security.cert.TrustAnchor, > sun.security.provider.certpath.PKIX$ValidatorParams) @bci=390, line=233 > (Compiled frame) > - > sun.security.provider.certpath.PKIXCertPathValidator.validate(sun.security.provider.certpath.PKIX$ValidatorParams) > @bci=217, line=141 (Compiled frame) > - > sun.security.provider.certpath.PKIXCertPathValidator.engineValidate(java.security.cert.CertPath, > java.security.cert.CertPathParameters) @bci=7, line=80 (Compiled frame) > - java.security.cert.CertPathValidator.validate(java.security.cert.CertPath, > java.security.cert.CertPathParameters) @bci=6, line=292 (Compiled frame) > - > sun.security.validator.PKIXValidator.doValidate(java.security.cert.X509Certificate[], > java.security.cert.PKIXBuilderParameters) @bci=34, line=357 (Compiled frame) > - > sun.security.validator.PKIXValidator.engineValidate(java.security.cert.X509Certificate[], > java.util.Collection, java.security.AlgorithmConstraints, java.lang.Object) > @bci=232, line=259 (Compiled frame) > - > sun.security.validator.Validator.validate(java.security.cert.X509Certificate[], > java.util.Collection, java.security.AlgorithmConstraints, java.lang.Object) > @bci=6, line=260 (Compiled frame) > - > sun.security.ssl.X509TrustManagerImpl.validate(sun.security.validator.Validator, > java.security.cert.X509Certificate[], java.security.AlgorithmConstraints, > java.lang.String) @bci=10, line=324 (Compiled frame) > - > sun.security.ssl.X509TrustManagerImpl.checkTrusted(java.security.cert.X509Certificate[], > java.lang.String, javax.net.ssl.SSLEngine, boolean) @bci=179, line=279 > (Compiled frame) > - > sun.security.ssl.X509TrustManagerImpl.checkClientTrusted(java.security.cert.X509Certificate[], > java.lang.String, javax.net.ssl.SSLEngine) @bci=5, line=130 (Compiled frame) > - > sun.security.ssl.ServerHandshaker.clientCertificate(sun.security.ssl.HandshakeMessage$CertificateMsg) > @bci=190, line=1966 (C
[jira] [Commented] (KAFKA-6777) Wrong reaction on Out Of Memory situation
[ https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598974#comment-16598974 ] John Roesler commented on KAFKA-6777: - Hi [~habdank], It's unfortunately a common behavior with JVM applications that when they are memory-constrained they never actually crash, but instead disappear into gc-pause oblivion. For practical purposes, we don't have any visibility into when GC pauses occur, how long they are, or even what our resident memory footprint is. This is all by design of the JVM. However, if we are catching and swallowing OOME, or really any subclass of Error, it would not be good. Error is by definition not recoverable and should be caught only to gracefully exit. I've taken a quick perusal of the code, and most of the `catch (Throwable t)` instances I see are logged and/or propagated. Some usages (such as in KafkaAdminClient.AdminClientRunnable) are suspicious. I'm unclear on whether you are saying that when Kafka runs out of memory, it # shuts down, but hides the reason # or continues running The latter seems unlikely, since if the JVM is truly out of memory, then catching and swallowing the OOME would only work for so long; it seems like eventually some operation would attempt to allocate memory outside of a catch block and still crash the app. Can you elaborate on the reason you think that the culprit is a swallowed OOME instead of just normal GC hell? Is there a specific code path that you think is responsible for catching and swallowing OOMEs? Thanks, -John > Wrong reaction on Out Of Memory situation > - > > Key: KAFKA-6777 > URL: https://issues.apache.org/jira/browse/KAFKA-6777 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Critical > Attachments: screenshot-1.png > > > Dears, > We already encountered many times problems related to Out Of Memory situation > in Kafka Broker and streaming clients. > The scenario is the following. > When Kafka Broker (or Streaming Client) is under load and has too less > memory, there are no errors in server logs. One can see some cryptic entries > in GC logs, but they are definitely not self-explaining. > Kafka Broker (and Streaming Clients) works further. Later we see in JMX > monitoring, that JVM uses more and more time in GC. In our case it grows from > e.g. 1% to 80%-90% of CPU time is used by GC. > Next, software collapses into zombie mode – process in not ending. In such a > case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse > Kafka treats such a zombie process normal and somewhat sends messages, which > are in fact getting lost, also the cluster is not excluding broken nodes. The > question is how to configure Kafka to really terminate the JVM and not remain > in zombie mode, to give a chance to other nodes to know, that something is > dead. > I would expect that in Out Of Memory situation JVM is ended if not graceful > than at least process is crashed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598948#comment-16598948 ] Dong Lin commented on KAFKA-7278: - [~niob] The stacktrace in that Jira seems similar to the issue fixed here. So there is good chance that we have fixed that issue as well. > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of > https://issues.apache.org/jira/browse/KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7363) How num.stream.threads in streaming application influence memory consumption?
[ https://issues.apache.org/jira/browse/KAFKA-7363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598941#comment-16598941 ] John Roesler commented on KAFKA-7363: - I'm not sure what other information we can provide. Is it ok with you if we just resolve this ticket with "Information Provided"? Or do you have further questions? Thanks, -John > How num.stream.threads in streaming application influence memory consumption? > - > > Key: KAFKA-7363 > URL: https://issues.apache.org/jira/browse/KAFKA-7363 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Seweryn Habdank-Wojewodzki >Priority: Major > > Dears, > How option _num.stream.threads_ in streaming application influence memory > consumption? > I see that by increasing num.stream.threads my application needs more memory. > This is obvious, but it is not obvious how much I need to give it. Try and > error method does not work, as it seems to be highly dependen on forced > throughput. > I mean: higher load more memory is needed. > Thanks for help and regards, > Seweryn. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7363) How num.stream.threads in streaming application influence memory consumption?
[ https://issues.apache.org/jira/browse/KAFKA-7363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598938#comment-16598938 ] John Roesler commented on KAFKA-7363: - Hi Seweryn, It's a little hard to say. For one thing, extra threads have some overhead of their own, but I agree with you that the bulk of the extra memory would come from the extra throughput you're able to drive through the application. I haven't done any analysis of this before, so just reasoning about this (as opposed to speaking from experience): In the maximum case, doubling your thread count would double your memory usage. This is for an "ideal" CPU-bound process. In reality, there are shared resources, such as network and disk, that should prevent you from reaching this bound. In the minimum case, if the app is already saturating some other resource, like network, disk, or even memory, then increasing the thread count would not add an appreciable amount of memory. The reason is that if the app is saturating, say, the network already, then more threads doesn't change that fact, and you still can't increase the throughput. As far as a concrete answer to your question, I think you're unfortunately the only one with enough visibility to predict the memory load. It would be very dependent on your machines, network, the number of topics and partitions, the size of your records in each partition, what exactly your Streams app does, and even your broker configuration. However, I'd propose the following experimental strategy to try and get a handle on it: 1. start with one thread. Observe all the main resources (CPU, network i/o, disk i/o), but especially memory. For memory, pay particular attention to the memory used immediately after GC. You might want to turn on GC logging to help with this. 1b. observe these metrics for long enough for a stable trend to emerge. This might be hours or even a day. 2. add one more thread. Continue observing all the resources. As I said, in the ideal case, this should double your throughput and hence double your memory usage. Looking at how much all the extra metrics increase when you add the second thread should help you start building a model of the increase you should expect for each extra thread. 3. continue the experiment, adding one thread each time. At some point, you'll notice that the throughput/memory increase drops off when you add an extra thread. This means that you've saturated one or more other resource. The metrics for those resources should corroborate this. Note that, if nothing else, the CPU should become saturated once the number of threads is equal to the number of cores. Increasing the thread count much beyond this shouldn't help much. I hope this helps! > How num.stream.threads in streaming application influence memory consumption? > - > > Key: KAFKA-7363 > URL: https://issues.apache.org/jira/browse/KAFKA-7363 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Seweryn Habdank-Wojewodzki >Priority: Major > > Dears, > How option _num.stream.threads_ in streaming application influence memory > consumption? > I see that by increasing num.stream.threads my application needs more memory. > This is obvious, but it is not obvious how much I need to give it. Try and > error method does not work, as it seems to be highly dependen on forced > throughput. > I mean: higher load more memory is needed. > Thanks for help and regards, > Seweryn. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7363) How num.stream.threads in streaming application influence memory consumption?
[ https://issues.apache.org/jira/browse/KAFKA-7363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-7363: Component/s: streams > How num.stream.threads in streaming application influence memory consumption? > - > > Key: KAFKA-7363 > URL: https://issues.apache.org/jira/browse/KAFKA-7363 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Seweryn Habdank-Wojewodzki >Priority: Major > > Dears, > How option _num.stream.threads_ in streaming application influence memory > consumption? > I see that by increasing num.stream.threads my application needs more memory. > This is obvious, but it is not obvious how much I need to give it. Try and > error method does not work, as it seems to be highly dependen on forced > throughput. > I mean: higher load more memory is needed. > Thanks for help and regards, > Seweryn. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-1880) Add support for checking binary/source compatibility
[ https://issues.apache.org/jira/browse/KAFKA-1880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598830#comment-16598830 ] Viktor Somogyi commented on KAFKA-1880: --- [~granthenke] if you don't mind I've reassigned this to continue your work as something similar has come up regarding KIP-336. > Add support for checking binary/source compatibility > > > Key: KAFKA-1880 > URL: https://issues.apache.org/jira/browse/KAFKA-1880 > Project: Kafka > Issue Type: New Feature >Reporter: Ashish Singh >Assignee: Viktor Somogyi >Priority: Major > Attachments: compatibilityReport-only-incompatible.html, > compatibilityReport.html > > > Recent discussions around compatibility shows how important compatibility is > to users. Kafka should leverage a tool to find, report, and avoid > incompatibility issues in public methods. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-1880) Add support for checking binary/source compatibility
[ https://issues.apache.org/jira/browse/KAFKA-1880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi reassigned KAFKA-1880: - Assignee: Viktor Somogyi (was: Grant Henke) > Add support for checking binary/source compatibility > > > Key: KAFKA-1880 > URL: https://issues.apache.org/jira/browse/KAFKA-1880 > Project: Kafka > Issue Type: New Feature >Reporter: Ashish Singh >Assignee: Viktor Somogyi >Priority: Major > Attachments: compatibilityReport-only-incompatible.html, > compatibilityReport.html > > > Recent discussions around compatibility shows how important compatibility is > to users. Kafka should leverage a tool to find, report, and avoid > incompatibility issues in public methods. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working
[ https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591 ] M. Manna edited comment on KAFKA-7365 at 8/31/18 12:06 PM: --- [~kivaturi] I am not sure how your heartbeat interval (and session timeout) has been harmonised as per the long processing delay. Also, max.poll.records is a tested config in PlainTextConsumerTest.scala file which works as expected. {{Do you know roughly how much delay (max) you need to process the message? e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and adjust session.timeout.ms to be ~3x more than that) and see if you are still encountering the issue? I might have overlooked something, but from the code I don't see any reason why this will be overridden (will check this anyway).}} {{Also, please check the description of CommitFailedException which might help you understand this issue.}} _{{Commit cannot be completed since the group has already }}{{rebalanced and assigned the partitions to another member. This means that the time }}{{between subsequent calls to poll() was longer than the configured max.poll.interval.ms, }}{{which typically implies that the poll loop is spending too much time message processing. }}{{You can address this either by increasing the session timeout or by reducing the maximum }}{{size of batches returned in poll() with max.poll.records.}}_ was (Author: manme...@gmail.com): [~kivaturi] I am not sure how your heartbeat interval (and session timeout) has been harmonised as per the long processing delay. Also, max.poll.records is a tested config in PlainTextConsumerTest.scala file which works as expected. {{Do you know roughly how much delay (max) you need to process the message? e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and adjust session.timeout.ms to be ~3x more than that) and see if you are still encountering the issue? I might have overlooked something, but from the code I don't see any reason why this will be overridden (will check this anyway).}} > max.poll.records setting in Kafka Consumer is not working > - > > Key: KAFKA-7365 > URL: https://issues.apache.org/jira/browse/KAFKA-7365 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Kashyap Ivaturi >Priority: Major > > Hi, > I have a requirement where I consume messages one by one, each message has > additional processing that I should do and then manually commit the offset. > Things work well most of the times until I get a big bunch of records which > takes longer time to process and I encounter CommitFailed exception for the > last set of records even though they were processed. While i'am able to > reconnect back its picking some messages that I had already processed. I > don't want this to happen as its creating duplicates in target systems that I > integrate with while processing the message. > > I decided that even though there are more messages in the queue , I would > like to have a control on how many records I can process when polled. > I tried to replicate a scenario where I have started the consumer by setting > 'max.poll.records' to '1' and then pushed 4 messages into the Topic the > consumer is listening. > I expected that the consumer will only process 1 message because of my > 'max.poll.records' setting but the consumer has processed all the 4 messages > in single poll. Any idea why did it not consider 'max.poll.records' setting > or is some other setting overriding this setting?. Appreciate your help or > guidance in troubleshooting this issue. > Here is the log of my Consumer config when it starts: > > 2018-08-28 08:29:47.873 INFO 91121 --- [ main] > o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: > [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000 > auto.offset.reset = earliest > bootstrap.servers = [messaging-rtp3.cisco.com:9093] > check.crcs = true > [client.id|https://client.id/] = > [connections.max.idle.ms|https://connections.max.idle.ms/] = 54 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500 > fetch.min.bytes = 1 > [group.id|https://group.id/] = empestor > [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000 > interceptor.classes = null > internal.leave.group.on.close = true > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > max.partition.fetch.bytes = 1048576 > [max.poll.interval.ms|https://max.poll.interval.ms/] = 30 > max.poll.records = 1 > [metadata.max.age.ms|https://metadata.max.age.ms/] =
[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working
[ https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591 ] M. Manna edited comment on KAFKA-7365 at 8/31/18 12:06 PM: --- [~kivaturi] I am not sure how your heartbeat interval (and session timeout) has been harmonised as per the long processing delay. Also, max.poll.records is a tested config in PlainTextConsumerTest.scala file which works as expected. {{Do you know roughly how much delay (max) you need to process the message? e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and adjust session.timeout.ms to be ~3x more than that) and see if you are still encountering the issue? I might have overlooked something, but from the code I don't see any reason why this will be overridden (will check this anyway).}} {{Also, please check the description of CommitFailedException which might help you understand this issue.}} _{{Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.}}_ was (Author: manme...@gmail.com): [~kivaturi] I am not sure how your heartbeat interval (and session timeout) has been harmonised as per the long processing delay. Also, max.poll.records is a tested config in PlainTextConsumerTest.scala file which works as expected. {{Do you know roughly how much delay (max) you need to process the message? e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and adjust session.timeout.ms to be ~3x more than that) and see if you are still encountering the issue? I might have overlooked something, but from the code I don't see any reason why this will be overridden (will check this anyway).}} {{Also, please check the description of CommitFailedException which might help you understand this issue.}} _{{Commit cannot be completed since the group has already }}{{rebalanced and assigned the partitions to another member. This means that the time }}{{between subsequent calls to poll() was longer than the configured max.poll.interval.ms, }}{{which typically implies that the poll loop is spending too much time message processing. }}{{You can address this either by increasing the session timeout or by reducing the maximum }}{{size of batches returned in poll() with max.poll.records.}}_ > max.poll.records setting in Kafka Consumer is not working > - > > Key: KAFKA-7365 > URL: https://issues.apache.org/jira/browse/KAFKA-7365 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Kashyap Ivaturi >Priority: Major > > Hi, > I have a requirement where I consume messages one by one, each message has > additional processing that I should do and then manually commit the offset. > Things work well most of the times until I get a big bunch of records which > takes longer time to process and I encounter CommitFailed exception for the > last set of records even though they were processed. While i'am able to > reconnect back its picking some messages that I had already processed. I > don't want this to happen as its creating duplicates in target systems that I > integrate with while processing the message. > > I decided that even though there are more messages in the queue , I would > like to have a control on how many records I can process when polled. > I tried to replicate a scenario where I have started the consumer by setting > 'max.poll.records' to '1' and then pushed 4 messages into the Topic the > consumer is listening. > I expected that the consumer will only process 1 message because of my > 'max.poll.records' setting but the consumer has processed all the 4 messages > in single poll. Any idea why did it not consider 'max.poll.records' setting > or is some other setting overriding this setting?. Appreciate your help or > guidance in troubleshooting this issue. > Here is the log of my Consumer config when it starts: > > 2018-08-28 08:29:47.873 INFO 91121 --- [ main] > o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: > [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000 > auto.offset.reset = earliest > bootstrap.servers = [messaging-rtp3.cisco.com:9093] > check.crcs = true > [client.id|https://client.id/] = > [connections.max.idle.ms|https://connections.max.idle.ms/] = 54 > enable.auto.commit = false > exclude.internal.topics = t
[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working
[ https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591 ] M. Manna edited comment on KAFKA-7365 at 8/31/18 12:04 PM: --- [~kivaturi] I am not sure how your heartbeat interval (and session timeout) has been harmonised as per the long processing delay. Also, max.poll.records is a tested config in PlainTextConsumerTest.scala file which works as expected. {{Do you know roughly how much delay (max) you need to process the message? e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and adjust session.timeout.ms to be ~3x more than that) and see if you are still encountering the issue? I might have overlooked something, but from the code I don't see any reason why this will be overridden (will check this anyway).}} was (Author: manme...@gmail.com): [~kivaturi] I am not sure how your heartbeat interval (and session timeout) has been harmonised as per the long processing delay. Also, max.poll.records is a tested config in PlainTextConsumerTest.scala file which works as expected. {{Do you know roughly how much delay (max) you need to process the message? e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and adjust session.timeout.ms to be ~3x more than that) and see if you are still encountering the issue? I might have overlooked something, but from the code I don't see any reason why this will be overridden (will check this anyway).}} {{If you can take a look at this part of code (Fetcher.java) - it might be what you are experiencing:}} {{// this case shouldn't usually happen because we only send one fetch at a time per partition,}} {{// but it might conceivably happen in some rare cases (such as partition leader changes).}} {{// we have to copy to a new list because the old one may be immutable}} {{List> newRecords = new ArrayList<>(records.size() + currentRecords.size());}} {{newRecords.addAll(currentRecords);}} {{newRecords.addAll(records);}} {{fetched.put(partition, newRecords);}} > max.poll.records setting in Kafka Consumer is not working > - > > Key: KAFKA-7365 > URL: https://issues.apache.org/jira/browse/KAFKA-7365 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Kashyap Ivaturi >Priority: Major > > Hi, > I have a requirement where I consume messages one by one, each message has > additional processing that I should do and then manually commit the offset. > Things work well most of the times until I get a big bunch of records which > takes longer time to process and I encounter CommitFailed exception for the > last set of records even though they were processed. While i'am able to > reconnect back its picking some messages that I had already processed. I > don't want this to happen as its creating duplicates in target systems that I > integrate with while processing the message. > > I decided that even though there are more messages in the queue , I would > like to have a control on how many records I can process when polled. > I tried to replicate a scenario where I have started the consumer by setting > 'max.poll.records' to '1' and then pushed 4 messages into the Topic the > consumer is listening. > I expected that the consumer will only process 1 message because of my > 'max.poll.records' setting but the consumer has processed all the 4 messages > in single poll. Any idea why did it not consider 'max.poll.records' setting > or is some other setting overriding this setting?. Appreciate your help or > guidance in troubleshooting this issue. > Here is the log of my Consumer config when it starts: > > 2018-08-28 08:29:47.873 INFO 91121 --- [ main] > o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: > [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000 > auto.offset.reset = earliest > bootstrap.servers = [messaging-rtp3.cisco.com:9093] > check.crcs = true > [client.id|https://client.id/] = > [connections.max.idle.ms|https://connections.max.idle.ms/] = 54 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500 > fetch.min.bytes = 1 > [group.id|https://group.id/] = empestor > [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000 > interceptor.classes = null > internal.leave.group.on.close = true > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > max.partition.fetch.bytes = 1048576 > [max.poll.interval.ms|https://max.poll.interval.ms/] = 30 > max.poll.records = 1 > [metadata.max.age.ms|https://metadata.max.age.ms/] = 30 > metric.reporters = [] > me
[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working
[ https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591 ] M. Manna edited comment on KAFKA-7365 at 8/31/18 11:58 AM: --- [~kivaturi] I am not sure how your heartbeat interval (and session timeout) has been harmonised as per the long processing delay. Also, max.poll.records is a tested config in PlainTextConsumerTest.scala file which works as expected. {{Do you know roughly how much delay (max) you need to process the message? e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and adjust session.timeout.ms to be ~3x more than that) and see if you are still encountering the issue? I might have overlooked something, but from the code I don't see any reason why this will be overridden (will check this anyway).}} {{If you can take a look at this part of code (Fetcher.java) - it might be what you are experiencing:}} {{// this case shouldn't usually happen because we only send one fetch at a time per partition,}} {{// but it might conceivably happen in some rare cases (such as partition leader changes).}} {{// we have to copy to a new list because the old one may be immutable}} {{List> newRecords = new ArrayList<>(records.size() + currentRecords.size());}} {{newRecords.addAll(currentRecords);}} {{newRecords.addAll(records);}} {{fetched.put(partition, newRecords);}} was (Author: manme...@gmail.com): [~kivaturi] I am not sure how your heartbeat interval (and session timeout) has been harmonised as per the long processing delay. Also, max.poll.records is a tested config in PlainTextConsumerTest.scala file which works as expected. {{Do you know roughly how much delay (max) you need to process the message? e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and adjust session.timeout.ms to be ~3x more than that) and see if you are still encountering the issue? I might have overlooked something, but from the code I don't see any reason why this will be overridden (will check this anyway).}} {{If you can take a look at this part of code (Fetcher.java) - it might be what you are experiencing:}} > max.poll.records setting in Kafka Consumer is not working > - > > Key: KAFKA-7365 > URL: https://issues.apache.org/jira/browse/KAFKA-7365 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Kashyap Ivaturi >Priority: Major > > Hi, > I have a requirement where I consume messages one by one, each message has > additional processing that I should do and then manually commit the offset. > Things work well most of the times until I get a big bunch of records which > takes longer time to process and I encounter CommitFailed exception for the > last set of records even though they were processed. While i'am able to > reconnect back its picking some messages that I had already processed. I > don't want this to happen as its creating duplicates in target systems that I > integrate with while processing the message. > > I decided that even though there are more messages in the queue , I would > like to have a control on how many records I can process when polled. > I tried to replicate a scenario where I have started the consumer by setting > 'max.poll.records' to '1' and then pushed 4 messages into the Topic the > consumer is listening. > I expected that the consumer will only process 1 message because of my > 'max.poll.records' setting but the consumer has processed all the 4 messages > in single poll. Any idea why did it not consider 'max.poll.records' setting > or is some other setting overriding this setting?. Appreciate your help or > guidance in troubleshooting this issue. > Here is the log of my Consumer config when it starts: > > 2018-08-28 08:29:47.873 INFO 91121 --- [ main] > o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: > [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000 > auto.offset.reset = earliest > bootstrap.servers = [messaging-rtp3.cisco.com:9093] > check.crcs = true > [client.id|https://client.id/] = > [connections.max.idle.ms|https://connections.max.idle.ms/] = 54 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500 > fetch.min.bytes = 1 > [group.id|https://group.id/] = empestor > [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000 > interceptor.classes = null > internal.leave.group.on.close = true > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > max.partition.fetch.bytes = 1048576 > [max.poll.interval.ms|https://max.poll.interval.ms/] = 30 > max.p
[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working
[ https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591 ] M. Manna edited comment on KAFKA-7365 at 8/31/18 11:57 AM: --- [~kivaturi] I am not sure how your heartbeat interval (and session timeout) has been harmonised as per the long processing delay. Also, max.poll.records is a tested config in PlainTextConsumerTest.scala file which works as expected. {{Do you know roughly how much delay (max) you need to process the message? e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and adjust session.timeout.ms to be ~3x more than that) and see if you are still encountering the issue? I might have overlooked something, but from the code I don't see any reason why this will be overridden (will check this anyway).}} {{If you can take a look at this part of code (Fetcher.java) - it might be what you are experiencing:}} was (Author: manme...@gmail.com): [~kivaturi] I am not sure how your heartbeat interval (and session timeout) has been harmonised as per the long processing delay. Also, max.poll.records is a tested config in PlainTextConsumerTest.scala file which works as expected. {{Do you know roughly how much delay (max) you need to process the message? e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and adjust session.timeout.ms to be ~3x more than that) and see if you are still encountering the issue? I might have overlooked something, but from the code I don't see any reason why this will be overridden (will check this anyway).}} {{If you can take a look at this part of code (Fetcher.java) - it might be what you are experiencing:}} {{ // this case shouldn't usually happen because we only send one fetch at a time per partition,}} {{ // but it might conceivably happen in some rare cases (such as partition leader changes).}} {{ // we have to copy to a new list because the old one may be immutable}} {{ List> newRecords = new ArrayList<>(records.size() + currentRecords.size());}} {{ newRecords.addAll(currentRecords);}} {{ newRecords.addAll(records);}} {{ fetched.put(partition, newRecords);}} > max.poll.records setting in Kafka Consumer is not working > - > > Key: KAFKA-7365 > URL: https://issues.apache.org/jira/browse/KAFKA-7365 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Kashyap Ivaturi >Priority: Major > > Hi, > I have a requirement where I consume messages one by one, each message has > additional processing that I should do and then manually commit the offset. > Things work well most of the times until I get a big bunch of records which > takes longer time to process and I encounter CommitFailed exception for the > last set of records even though they were processed. While i'am able to > reconnect back its picking some messages that I had already processed. I > don't want this to happen as its creating duplicates in target systems that I > integrate with while processing the message. > > I decided that even though there are more messages in the queue , I would > like to have a control on how many records I can process when polled. > I tried to replicate a scenario where I have started the consumer by setting > 'max.poll.records' to '1' and then pushed 4 messages into the Topic the > consumer is listening. > I expected that the consumer will only process 1 message because of my > 'max.poll.records' setting but the consumer has processed all the 4 messages > in single poll. Any idea why did it not consider 'max.poll.records' setting > or is some other setting overriding this setting?. Appreciate your help or > guidance in troubleshooting this issue. > Here is the log of my Consumer config when it starts: > > 2018-08-28 08:29:47.873 INFO 91121 --- [ main] > o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: > [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000 > auto.offset.reset = earliest > bootstrap.servers = [messaging-rtp3.cisco.com:9093] > check.crcs = true > [client.id|https://client.id/] = > [connections.max.idle.ms|https://connections.max.idle.ms/] = 54 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500 > fetch.min.bytes = 1 > [group.id|https://group.id/] = empestor > [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000 > interceptor.classes = null > internal.leave.group.on.close = true > isolation.level = read_uncommitted > key.de
[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working
[ https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591 ] M. Manna edited comment on KAFKA-7365 at 8/31/18 11:55 AM: --- [~kivaturi] I am not sure how your heartbeat interval (and session timeout) has been harmonised as per the long processing delay. Also, max.poll.records is a tested config in PlainTextConsumerTest.scala file which works as expected. {{Do you know roughly how much delay (max) you need to process the message? e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and adjust session.timeout.ms to be ~3x more than that) and see if you are still encountering the issue? I might have overlooked something, but from the code I don't see any reason why this will be overridden (will check this anyway).}} {{If you can take a look at this part of code (Fetcher.java) - it might be what you are experiencing:}} {{ // this case shouldn't usually happen because we only send one fetch at a time per partition,}} {{ // but it might conceivably happen in some rare cases (such as partition leader changes).}} {{ // we have to copy to a new list because the old one may be immutable}} {{ List> newRecords = new ArrayList<>(records.size() + currentRecords.size());}} {{ newRecords.addAll(currentRecords);}} {{ newRecords.addAll(records);}} {{ fetched.put(partition, newRecords);}} was (Author: manme...@gmail.com): [~kivaturi] I am not sure how your heartbeat interval (and session timeout) has been harmonised as per the long processing delay. Also, max.poll.records is a tested config in PlainTextConsumerTest.scala file which works as expected. Do you know roughly how much delay (max) you need to process the message? e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and adjust session.timeout.ms to be ~3x more than that) and see if you are still encountering the issue? I might have overlooked something, but from the code I don't see any reason why this will be overridden (will check this anyway). > max.poll.records setting in Kafka Consumer is not working > - > > Key: KAFKA-7365 > URL: https://issues.apache.org/jira/browse/KAFKA-7365 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Kashyap Ivaturi >Priority: Major > > Hi, > I have a requirement where I consume messages one by one, each message has > additional processing that I should do and then manually commit the offset. > Things work well most of the times until I get a big bunch of records which > takes longer time to process and I encounter CommitFailed exception for the > last set of records even though they were processed. While i'am able to > reconnect back its picking some messages that I had already processed. I > don't want this to happen as its creating duplicates in target systems that I > integrate with while processing the message. > > I decided that even though there are more messages in the queue , I would > like to have a control on how many records I can process when polled. > I tried to replicate a scenario where I have started the consumer by setting > 'max.poll.records' to '1' and then pushed 4 messages into the Topic the > consumer is listening. > I expected that the consumer will only process 1 message because of my > 'max.poll.records' setting but the consumer has processed all the 4 messages > in single poll. Any idea why did it not consider 'max.poll.records' setting > or is some other setting overriding this setting?. Appreciate your help or > guidance in troubleshooting this issue. > Here is the log of my Consumer config when it starts: > > 2018-08-28 08:29:47.873 INFO 91121 --- [ main] > o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: > [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000 > auto.offset.reset = earliest > bootstrap.servers = [messaging-rtp3.cisco.com:9093] > check.crcs = true > [client.id|https://client.id/] = > [connections.max.idle.ms|https://connections.max.idle.ms/] = 54 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500 > fetch.min.bytes = 1 > [group.id|https://group.id/] = empestor > [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000 > interceptor.classes = null > internal.leave.group.on.close = true > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > max.partition.fetch.bytes = 10485
[jira] [Commented] (KAFKA-6950) Add mechanism to delay response to failed client authentication
[ https://issues.apache.org/jira/browse/KAFKA-6950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598593#comment-16598593 ] ASF GitHub Bot commented on KAFKA-6950: --- rajinisivaram closed pull request #5082: KAFKA-6950: Delay response to failed client authentication to prevent potential DoS issues (KIP-306) URL: https://github.com/apache/kafka/pull/5082 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java index f6458c6f22d..7a05eba03f2 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java @@ -40,6 +40,10 @@ public AuthenticationException(String message) { super(message); } +public AuthenticationException(Throwable cause) { +super(cause); +} + public AuthenticationException(String message, Throwable cause) { super(message, cause); } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java index 4e2e7273a68..33c2e908516 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java @@ -37,6 +37,14 @@ */ void authenticate() throws AuthenticationException, IOException; +/** + * Perform any processing related to authentication failure. This is invoked when the channel is about to be closed + * because of an {@link AuthenticationException} thrown from a prior {@link #authenticate()} call. + * @throws IOException if read/write fails due to an I/O error + */ +default void handleAuthenticationFailure() throws IOException { +} + /** * Returns Principal using PrincipalBuilder */ @@ -46,5 +54,4 @@ * returns true if authentication is complete otherwise returns false; */ boolean complete(); - } diff --git a/clients/src/main/java/org/apache/kafka/common/network/DelayedResponseAuthenticationException.java b/clients/src/main/java/org/apache/kafka/common/network/DelayedResponseAuthenticationException.java new file mode 100644 index 000..8474426c609 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/DelayedResponseAuthenticationException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import org.apache.kafka.common.errors.AuthenticationException; + +public class DelayedResponseAuthenticationException extends AuthenticationException { +private static final long serialVersionUID = 1L; + +public DelayedResponseAuthenticationException(Throwable cause) { +super(cause); +} +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 1839729f2e7..17dc6a33ef2 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -120,15 +120,22 @@ public KafkaPrincipal principal() { * authentication. For SASL, authentication is performed by {@link Authenticator#authenticate()}. */ public void prepare() throws AuthenticationException, IOException { +boolean authenticating = false; try { if (!transportLayer.ready()) transportLayer.handshake(); -if (transportLayer.ready() && !authenticator.complete()) +if (transportLayer.ready() && !authenticator.complete()) { +authenticating = true; authenticator
[jira] [Commented] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working
[ https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591 ] M. Manna commented on KAFKA-7365: - [~kivaturi] I am not sure how your heartbeat interval (and session timeout) has been harmonised as per the long processing delay. Also, max.poll.records is a tested config in PlainTextConsumerTest.scala file which works as expected. Do you know roughly how much delay (max) you need to process the message? e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and adjust session.timeout.ms to be ~3x more than that) and see if you are still encountering the issue? I might have overlooked something, but from the code I don't see any reason why this will be overridden (will check this anyway). > max.poll.records setting in Kafka Consumer is not working > - > > Key: KAFKA-7365 > URL: https://issues.apache.org/jira/browse/KAFKA-7365 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Kashyap Ivaturi >Priority: Major > > Hi, > I have a requirement where I consume messages one by one, each message has > additional processing that I should do and then manually commit the offset. > Things work well most of the times until I get a big bunch of records which > takes longer time to process and I encounter CommitFailed exception for the > last set of records even though they were processed. While i'am able to > reconnect back its picking some messages that I had already processed. I > don't want this to happen as its creating duplicates in target systems that I > integrate with while processing the message. > > I decided that even though there are more messages in the queue , I would > like to have a control on how many records I can process when polled. > I tried to replicate a scenario where I have started the consumer by setting > 'max.poll.records' to '1' and then pushed 4 messages into the Topic the > consumer is listening. > I expected that the consumer will only process 1 message because of my > 'max.poll.records' setting but the consumer has processed all the 4 messages > in single poll. Any idea why did it not consider 'max.poll.records' setting > or is some other setting overriding this setting?. Appreciate your help or > guidance in troubleshooting this issue. > Here is the log of my Consumer config when it starts: > > 2018-08-28 08:29:47.873 INFO 91121 --- [ main] > o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: > [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000 > auto.offset.reset = earliest > bootstrap.servers = [messaging-rtp3.cisco.com:9093] > check.crcs = true > [client.id|https://client.id/] = > [connections.max.idle.ms|https://connections.max.idle.ms/] = 54 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500 > fetch.min.bytes = 1 > [group.id|https://group.id/] = empestor > [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000 > interceptor.classes = null > internal.leave.group.on.close = true > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > max.partition.fetch.bytes = 1048576 > [max.poll.interval.ms|https://max.poll.interval.ms/] = 30 > max.poll.records = 1 > [metadata.max.age.ms|https://metadata.max.age.ms/] = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > [metrics.sample.window.ms|https://metrics.sample.window.ms/] = 3 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > [reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000 > [reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50 > [request.timeout.ms|https://request.timeout.ms/] = 4 > [retry.backoff.ms|https://retry.backoff.ms/] = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = SSL > send.buffer.bytes = 131072 > [session.timeout.ms|https://session.timeout.ms/] = 1 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = [hidden] > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = > /kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jks > ssl.keystore.password = [hidden] > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation =
[jira] [Commented] (KAFKA-4544) Add system tests for delegation token based authentication
[ https://issues.apache.org/jira/browse/KAFKA-4544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598557#comment-16598557 ] Attila Sasvari commented on KAFKA-4544: --- Do you have the capacity to work on this [~omkreddy]? If there's anything I can do to help, please let me know. > Add system tests for delegation token based authentication > -- > > Key: KAFKA-4544 > URL: https://issues.apache.org/jira/browse/KAFKA-4544 > Project: Kafka > Issue Type: Sub-task > Components: security >Reporter: Ashish Singh >Assignee: Manikumar >Priority: Major > > Add system tests for delegation token based authentication. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598540#comment-16598540 ] Christoph Schmidt commented on KAFKA-7278: -- This ticket has raised questions in the comments of ancient KAFKA-1194 (which knows three old PRs) - does it by chance fix that issue, too? Root cause over there is that rename-while-still-open blows up under windows, with the only available workaround being to completely disable the log cleaner. > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of > https://issues.apache.org/jira/browse/KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working
Kashyap Ivaturi created KAFKA-7365: -- Summary: max.poll.records setting in Kafka Consumer is not working Key: KAFKA-7365 URL: https://issues.apache.org/jira/browse/KAFKA-7365 Project: Kafka Issue Type: Bug Components: consumer Reporter: Kashyap Ivaturi Hi, I have a requirement where I consume messages one by one, each message has additional processing that I should do and then manually commit the offset. Things work well most of the times until I get a big bunch of records which takes longer time to process and I encounter CommitFailed exception for the last set of records even though they were processed. While i'am able to reconnect back its picking some messages that I had already processed. I don't want this to happen as its creating duplicates in target systems that I integrate with while processing the message. I decided that even though there are more messages in the queue , I would like to have a control on how many records I can process when polled. I tried to replicate a scenario where I have started the consumer by setting 'max.poll.records' to '1' and then pushed 4 messages into the Topic the consumer is listening. I expected that the consumer will only process 1 message because of my 'max.poll.records' setting but the consumer has processed all the 4 messages in single poll. Any idea why did it not consider 'max.poll.records' setting or is some other setting overriding this setting?. Appreciate your help or guidance in troubleshooting this issue. Here is the log of my Consumer config when it starts: 2018-08-28 08:29:47.873 INFO 91121 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000 auto.offset.reset = earliest bootstrap.servers = [messaging-rtp3.cisco.com:9093] check.crcs = true [client.id|https://client.id/] = [connections.max.idle.ms|https://connections.max.idle.ms/] = 54 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500 fetch.min.bytes = 1 [group.id|https://group.id/] = empestor [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 [max.poll.interval.ms|https://max.poll.interval.ms/] = 30 max.poll.records = 1 [metadata.max.age.ms|https://metadata.max.age.ms/] = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO [metrics.sample.window.ms|https://metrics.sample.window.ms/] = 3 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 [reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000 [reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50 [request.timeout.ms|https://request.timeout.ms/] = 4 [retry.backoff.ms|https://retry.backoff.ms/] = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = SSL send.buffer.bytes = 131072 [session.timeout.ms|https://session.timeout.ms/] = 1 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = [hidden] ssl.keymanager.algorithm = SunX509 ssl.keystore.location = /kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jks ssl.keystore.password = [hidden] ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = /kafka/certs/empestor/certificates/kafka.client.truststore.jks ssl.truststore.password = [hidden] ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2018-08-28 08:29:48.079 INFO 91121 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)