[jira] [Commented] (KAFKA-2617) Move protocol field default values to Protocol
[ https://issues.apache.org/jira/browse/KAFKA-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805568#comment-16805568 ] Guozhang Wang commented on KAFKA-2617: -- I'm closing this ticket as it is resolved by KAFKA-7609 now. > Move protocol field default values to Protocol > -- > > Key: KAFKA-2617 > URL: https://issues.apache.org/jira/browse/KAFKA-2617 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Jakub Nowak >Priority: Minor > Labels: newbie > > Right now the default values are scattered in the Request / Response classes, > and some duplicates already exists like JoinGroupRequest.UNKNOWN_CONSUMER_ID > and OffsetCommitRequest.DEFAULT_CONSUMER_ID. We would like to move all > default values into org.apache.kafka.common.protocol.Protocol since > org.apache.kafka.common.requests depends on org.apache.kafka.common.protocol > anyways. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails
[ https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-4600. -- Resolution: Fixed Fix Version/s: 0.11.0.0 > Consumer proceeds on when ConsumerRebalanceListener fails > - > > Key: KAFKA-4600 > URL: https://issues.apache.org/jira/browse/KAFKA-4600 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.1.1 >Reporter: Braedon Vickers >Priority: Major > Fix For: 0.11.0.0 > > > One of the use cases for a ConsumerRebalanceListener is to load state > necessary for processing a partition when it is assigned. However, when > ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. > the state isn't loaded), the error is logged and the consumer proceeds on as > if nothing happened, happily consuming messages from the new partition. When > the state is relied upon for correct processing, this can be very bad, e.g. > data loss can occur. > It would be better if the error was propagated up so it could be dealt with > normally. At the very least the assignment should fail so the consumer > doesn't see any messages from the new partitions, and the rebalance can be > reattempted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails
[ https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805565#comment-16805565 ] Guozhang Wang commented on KAFKA-4600: -- This seems to be the same issue as KAFKA-5154 and has been fixed as in 0.11.0.0. > Consumer proceeds on when ConsumerRebalanceListener fails > - > > Key: KAFKA-4600 > URL: https://issues.apache.org/jira/browse/KAFKA-4600 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.1.1 >Reporter: Braedon Vickers >Priority: Major > > One of the use cases for a ConsumerRebalanceListener is to load state > necessary for processing a partition when it is assigned. However, when > ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. > the state isn't loaded), the error is logged and the consumer proceeds on as > if nothing happened, happily consuming messages from the new partition. When > the state is relied upon for correct processing, this can be very bad, e.g. > data loss can occur. > It would be better if the error was propagated up so it could be dealt with > normally. At the very least the assignment should fail so the consumer > doesn't see any messages from the new partitions, and the rebalance can be > reattempted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-4799) session timeout during event processing shuts down stream
[ https://issues.apache.org/jira/browse/KAFKA-4799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-4799. -- Resolution: Fixed Fix Version/s: 0.11.0.1 > session timeout during event processing shuts down stream > - > > Key: KAFKA-4799 > URL: https://issues.apache.org/jira/browse/KAFKA-4799 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: kafka streams client running on os x, with docker > machine running broker >Reporter: Jacob Gur >Priority: Critical > Fix For: 0.11.0.1 > > > I have a simple stream application like this: > {code:title=Part of my class|borderStyle=solid} > private IConsumerSubscription buildSubscriptionStream( > Class clazz, Consumer consumer, String group, > Function> > topicStreamFunc) > { > KStreamBuilder builder = new KStreamBuilder(); > KStream stream = topicStreamFunc.apply(builder); > stream.foreach((k, v) -> { > try { > T value = > _jsonObjectMapper.mapFromJsonString(v, clazz); > consumer.accept(value); > Logger.trace("Consumed message {}", value); > } catch (Throwable th) { > Logger.warn("Error while consuming message", > th); > } > }); > final KafkaStreams streams = new KafkaStreams(builder, > constructProperties(group)); > streams.start(); > return streams::close; > } > {code} > There is just one client running this application stream. > If I run the client in a debugger with a breakpoint on the event processor > (i.e., inside the foreach lambda) with debugger suspending all threads for > perhaps more than 10 seconds, then when I resume the application: > Actual behavior - the stream shuts down > Expected behavior - the stream should recover, perhaps temporarily removed > from partition but then re-added and recovered. > It looks like what happens is this: > 1) The kafka client session times out. > 2) The partition is revoked > 3) The streams library has a rebalance listener that tries to commit offsets, > but that commit fails due to a rebalance exception. > 4) Stream shuts down. > Steps 3 and 4 occur in StreamThread's rebalance listener. > It seems that it should be more resilient and recover just like a regular > KafkaConsumer would. Its partition would be revoked, and then it would get it > back again and resume processing at the last offset. > Is current behavior expected and I'm not understanding the intention? Or is > this a bug? > Thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6631) Kafka Streams - Rebalancing exception in Kafka 1.0.0
[ https://issues.apache.org/jira/browse/KAFKA-6631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6631. -- Resolution: Fixed Just a side note that we are working on KAFKA-7149 to reduce the assignment metadata size with many topic partitions in the assignment. > Kafka Streams - Rebalancing exception in Kafka 1.0.0 > > > Key: KAFKA-6631 > URL: https://issues.apache.org/jira/browse/KAFKA-6631 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 > Environment: Container Linux by CoreOS 1576.5.0 >Reporter: Alexander Ivanichev >Priority: Critical > > > In Kafka Streams 1.0.0, we saw a strange rebalance error, our stream app > performs window based aggregations, sometimes on start when all stream > workers join the app just crash, however if we enable only one worker than > it works fine, sometime 2 workers work just fine, but when third join the app > crashes again, some critical issue with rebalance. > {code:java} > 018-03-08T18:51:01.226243000Z org.apache.kafka.common.KafkaException: > Unexpected error from SyncGroup: The server experienced an unexpected error > when processing the request > 2018-03-08T18:51:01.226557000Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566) > 2018-03-08T18:51:01.22686Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:539) > 2018-03-08T18:51:01.227328000Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) > 2018-03-08T18:51:01.22763Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) > 2018-03-08T18:51:01.228152000Z at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > 2018-03-08T18:51:01.228449000Z at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > 2018-03-08T18:51:01.228897000Z at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > 2018-03-08T18:51:01.229196000Z at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506) > 2018-03-08T18:51:01.229673000Z at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > 2018-03-08T18:51:01.229971000Z at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268) > 2018-03-08T18:51:01.230436000Z at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) > 2018-03-08T18:51:01.230749000Z at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174) > 2018-03-08T18:51:01.231065000Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) > 2018-03-08T18:51:01.231584000Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > 2018-03-08T18:51:01.231911000Z at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > 2018-03-08T18:51:01.23219Z at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138) > 2018-03-08T18:51:01.232643000Z at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103) > 2018-03-08T18:51:01.233121000Z at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) > 2018-03-08T18:51:01.233409000Z at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) > 2018-03-08T18:51:01.23372Z at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > 2018-03-08T18:51:01.234196000Z at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) > 2018-03-08T18:51:01.234655000Z org.apache.kafka.common.KafkaException: > Unexpected error from SyncGroup: The server experienced an unexpected error > when processing the request > 2018-03-08T18:51:01.234972000Z exception in thread, closing process > 2018-03-08T18:51:01.23550Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566) > 2018-03-08T18:51:01.235839000Z at >
[jira] [Commented] (KAFKA-6745) kafka consumer rebalancing takes long time (from 3 secs to 5 minutes)
[ https://issues.apache.org/jira/browse/KAFKA-6745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805561#comment-16805561 ] Guozhang Wang commented on KAFKA-6745: -- I think the root cause is that when you are bouncing a consumer instance, the consumer's member.id is not kicked out of the group yet when it was re-started and hence re-join as a new member. In this case the old.member will never send a re-join group and the coordinator will always have to wait till the rebalance.timeout (5 min) has elapsed to kick out the member. Could you describe how did you rebalance the consumer? Did you gracefully shutdown each instance, and then restarted them? > kafka consumer rebalancing takes long time (from 3 secs to 5 minutes) > - > > Key: KAFKA-6745 > URL: https://issues.apache.org/jira/browse/KAFKA-6745 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 0.11.0.0 >Reporter: Ramkumar >Priority: Major > > Hi, We had an HTTP service 3 nodes around Kafka 0.8 . This http service acts > as a REST api for the publishers and consumers to use middleware intead of > using kafka client api. Here the when the consumers rebalance is not a major > issue. > We wanted to upgrade to kafka 0.11 , we have updated our http services (3 > node cluster) to use new Kafka consumer API , but it takes rebalancing of > consumer (multiple consumer under same Group) between secs to 5 mins > (max.poll.interval.ms). Because of this time our http clients are timing out > and do failover. This rebalancing time is major issue. It is not clear from > the documentation ,that rebalance activity for the group takes place after > max.poll.interval.ms or it starts after 3 secs and complete any time with in > 5 minutes. We tried to reduce max.poll.interval.ms to 15 seconds. but this > also triggers rebalance internally. > Below are the other parameters we have set In our service > max.poll.interval.ms = 30 sec > seconds heartbeat.interval.ms = 1 > minute session.timeout.ms = 4 > minutes consumer.cache.timeout = 2 min > > > below is the log > ""2018-03-26 12:53:23,009 [qtp1404928347-11556] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - > (Re-)joining group firstnetportal_001 > ""2018-03-26 12:57:52,793 [qtp1404928347-11556] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - > Successfully joined group firstnetportal_001 with generation 7475 > Please let me know if there are any other application/client use http > interace in 3 nodes with out any having this issue > > > > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5586) Handle client disconnects during JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-5586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805554#comment-16805554 ] Guozhang Wang commented on KAFKA-5586: -- Hi [~hachikuji], with KIP-394 and KIP-91, that Streams is also going to remove its override on `max.poll.interval.ms` to use consumer default (5min) in KIP-442, I think this would not be an issue worth resolving now. WDYT? > Handle client disconnects during JoinGroup > -- > > Key: KAFKA-5586 > URL: https://issues.apache.org/jira/browse/KAFKA-5586 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Jason Gustafson >Priority: Major > > If a consumer disconnects with a JoinGroup in-flight, we do not remove it > from the group until after the Join phase completes. If the client > immediately re-sends the JoinGroup request and it already had a memberId, > then the callback will be replaced and there is no harm done. For the other > cases: > 1. If the client disconnected due to a failure and does not re-send the > JoinGroup, the consumer will still be included in the new group generation > after the rebalance completes, but will immediately timeout and trigger a new > rebalance. > 2. If the consumer was not a member of the group and re-sends JoinGroup, then > a new memberId will be created for that consumer and the old one will not be > removed. When the rebalance completes, the old memberId will timeout and a > rebalance will be triggered. > To address these issues, we should add some additional logic to handle client > disconnections during the join phase. For newly generated memberIds, we > should simply remove them. For existing members, we should probably leave > them in the group and reset the heartbeat expiration task. > Note that we currently have no facility to expose disconnects from the > network layer to the other layers, so we need to find a good approach for > this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds
[ https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-7142. -- Resolution: Fixed Fix Version/s: 2.1.0 Resolving the ticket since the PR is merged as in 2.1.0 already. > Rebalancing large consumer group can block the coordinator broker for several > seconds > - > > Key: KAFKA-7142 > URL: https://issues.apache.org/jira/browse/KAFKA-7142 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 1.0.0, > 1.1.0 >Reporter: Ying Zheng >Assignee: Ying Zheng >Priority: Major > Fix For: 2.1.0 > > > In our production cluster, we noticed that when a large consumer group (a few > thousand members) is rebalancing, the produce latency of the coordinator > broker can jump to several seconds. > > Group rebalance is a very frequent operation, it can be triggered by adding / > removing / restarting a single member in the consumer group. > > When this happens, jstack shows all the request handler threads of the broker > are waiting for group lock: > {noformat} > "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 > nid=0x1b985 waiting on condition [0x7f98f1adb000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00024aa73b20> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > > Besides one thread that is either doing GroupMetadata.supportsProtocols(): > {noformat} > "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 > nid=0x1b984 runnable [0x7f98f1bdc000] >java.lang.Thread.State: RUNNABLE > at scala.collection.immutable.List.map(List.scala:284) > at > kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at scala.collection.immutable.List.map(List.scala:288) > at > kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > or GroupCoordinator.tryCompleteJoin > {noformat} > "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 > nid=0x1ceff runnable [0x7fe8701ca000] >java.lang.Thread.State: RUNNABLE > at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > at >
[jira] [Resolved] (KAFKA-6681) Two instances of kafka consumer reading the same partition within a consumer group
[ https://issues.apache.org/jira/browse/KAFKA-6681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6681. -- Resolution: Fixed Fix Version/s: 0.11.0.0 Resolving as part of https://issues.apache.org/jira/browse/KAFKA-5154 now. > Two instances of kafka consumer reading the same partition within a consumer > group > -- > > Key: KAFKA-6681 > URL: https://issues.apache.org/jira/browse/KAFKA-6681 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 0.10.2.1 >Reporter: Narayan Periwal >Priority: Critical > Fix For: 0.11.0.0 > > Attachments: server-1.log, server-2.log > > > We have seen this issue with the Kafka consumer, the new library that got > introduced in 0.9 > With this new client, the group management is done by kafka coordinator, > which is one of the kafka broker. > We are using Kafka broker 0.10.2.1 and consumer client version is also > 0.10.2.1 > The issue that we have faced is that, after rebalancing, some of the > partitions gets consumed by 2 instances within a consumer group, leading to > duplication of the entire partition data. Both the instances continue to read > until the next rebalancing, or the restart of those clients. > It looks like that a particular consumer goes on fetching the data from a > partition, but the broker is not able to identify this "stale" consumer > instance. > We have hit this twice in production. Please look at it the earliest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8029) Add in-memory bytes-only session store implementation
[ https://issues.apache.org/jira/browse/KAFKA-8029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805546#comment-16805546 ] ASF GitHub Bot commented on KAFKA-8029: --- ableegoldman commented on pull request #6525: KAFKA-8029: In memory session store URL: https://github.com/apache/kafka/pull/6525 First pass at an in-memory session store implementation. WIP (needs testing, KIP) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add in-memory bytes-only session store implementation > - > > Key: KAFKA-8029 > URL: https://issues.apache.org/jira/browse/KAFKA-8029 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > > As titled. We've added the window store and session store implementations in > memory, what's left is the session store now. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805543#comment-16805543 ] Guozhang Wang commented on KAFKA-7447: -- Is it related to https://issues.apache.org/jira/browse/KAFKA-8069? > Consumer offsets lost during leadership rebalance after bringing node back > from clean shutdown > -- > > Key: KAFKA-7447 > URL: https://issues.apache.org/jira/browse/KAFKA-7447 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.1, 2.0.0 >Reporter: Ben Isaacs >Priority: Major > > *Summary:* > * When 1 of my 3 brokers is cleanly shut down, consumption and production > continues as normal due to replication. (Consumers are rebalanced to the > replicas, and producers are rebalanced to the remaining brokers). However, > when the cleanly-shut-down broker comes back, after about 10 minutes, a > flurry of production errors occur and my consumers suddenly go back in time 2 > weeks, causing a long outage (12 hours+) as all messages are replayed on some > topics. > * The hypothesis is that the auto-leadership-rebalance is happening too > quickly after the downed broker returns, before it has had a chance to become > fully synchronised on all partitions. In particular, it seems that having > consumer offets ahead of the most recent data on the topic that consumer was > following causes the consumer to be reset to 0. > *Expected:* > * bringing a node back from a clean shut down does not cause any consumers > to reset to 0. > *Actual:* > * I experience approximately 12 hours of partial outage triggered at the > point that auto leadership rebalance occurs, after a cleanly shut down node > returns. > *Workaround:* > * disable auto leadership rebalance entirely. > * manually rebalance it from time to time when all nodes and all partitions > are fully replicated. > *My Setup:* > * Kafka deployment with 3 brokers and 2 topics. > * Replication factor is 3, for all topics. > * min.isr is 2, for all topics. > * Zookeeper deployment with 3 instances. > * In the region of 10 to 15 consumers, with 2 user topics (and, of course, > the system topics such as consumer offsets). Consumer offsets has the > standard 50 partitions. The user topics have about 3000 partitions in total. > * Offset retention time of 7 days, and topic retention time of 14 days. > * Input rate ~1000 messages/sec. > * Deployment happens to be on Google compute engine. > *Related Stack Overflow Post:* > https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker > It was suggested I open a ticket by "Muir" who says he they have also > experienced this. > *Transcription of logs, showing the problem:* > Below, you can see chronologically sorted, interleaved, logs from the 3 > brokers. prod-kafka-2 is the node which was cleanly shut down and then > restarted. I filtered the messages only to those regardling > __consumer_offsets-29 because it's just too much to paste, otherwise. > ||Broker host||Broker ID|| > |prod-kafka-1|0| > |prod-kafka-2|1 (this one was restarted)| > |prod-kafka-3|2| > prod-kafka-2: (just starting up) > {code} > [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Based on follower's leader epoch, leader replied with an unknown > offset in __consumer_offsets-29. The initial fetch offset 0 will be used for > truncation. (kafka.server.ReplicaFetcherThread) > {code} > prod-kafka-3: (sees replica1 come back) > {code} > [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] > Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition) > {code} > prod-kafka-2: > {code} > [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling > unloading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished > unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached > groups. (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed > fetcher for partitions __consumer_offsets-29 > (kafka.server.ReplicaFetcherManager) > [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] > __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous > Leader Epoch was: 77 (kafka.cluster.Partition) > [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling > loading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:24:03,288] INFO [GroupMetadataManager brokerId=1] Finished > loading offsets and group
[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.
[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805539#comment-16805539 ] Jun Rao commented on KAFKA-8106: [~Flower.min], one of the validation that the broker has to do is to verify that the timestamp of each record is within the allowed max diff. One can only know the timestamp of a record after decompressing the batch. > Remove unnecessary decompression operation when logValidator do validation. > > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. >Reporter: Flower.min >Assignee: Flower.min >Priority: Major > Labels: performance > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .We found the hot spot method is > *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and > *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When > we checking thread stack information we also have found most CPU being > occupied by lots of thread which is busy decompressing messages.Then we > read source code of Kafka . > There is double-layer nested Iterator when LogValidator do validate > every record.And There is a decompression for each message when traversing > every RecordBatch iterator. It is consuming CPU and affect total performance > that decompress message._*The purpose of decompressing every messages just > for gain total size in bytes of one record and size in bytes of record body > when magic value to use is above 1 and no format conversion or value > overwriting is required for compressed messages.It is negative for > performance in common usage scenarios .*_{color:#33}Therefore, we suggest > that *_removing unnecessary decompression operation_* when doing validation > for compressed message when magic value to use is above 1 and no format > conversion or value overwriting is required for compressed messages.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8034) Replace DeleteTopics request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-8034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805463#comment-16805463 ] ASF GitHub Bot commented on KAFKA-8034: --- cmccabe commented on pull request #6366: KAFKA-8034: Use automatic RPC generation in DeleteTopics URL: https://github.com/apache/kafka/pull/6366 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Replace DeleteTopics request/response with automated protocol > - > > Key: KAFKA-8034 > URL: https://issues.apache.org/jira/browse/KAFKA-8034 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7502) Cleanup KTable materialization logic in a single place
[ https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805444#comment-16805444 ] ASF GitHub Bot commented on KAFKA-7502: --- bbejeck commented on pull request #6520: KAFKA-7502: Cleanup KTable materialization logic in a single place (doMapValues) URL: https://github.com/apache/kafka/pull/6520 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup KTable materialization logic in a single place > -- > > Key: KAFKA-7502 > URL: https://issues.apache.org/jira/browse/KAFKA-7502 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Lee Dongjin >Priority: Major > > Today since we pre-create all the `KTableXXX` operator along with the logical > node, we are effectively duplicating the logic to determine whether the > resulted KTable should be materialized. More specifically, the > materialization principle today is that: > 1) If users specified Materialized in the DSL and it contains a queryable > name. We always materialize. > 2) If users specified Materialized in the DSL but not contains a queryable > name, or if users do not specify a Materialized object at all, Streams may > choose to materialize or not. But in any cases, even if the KTable is > materialized it will not be queryable since there's no queryable name (i.e. > only storeName is not null, but queryableName is null): > 2.a) If the resulted KTable is from an aggregation, we always materialize > since it is needed for storing the aggregation (i.e. we use the > MaterializedInternal constructor with nameProvider != null). > 2.b) If the resulted KTable is from a source topic, we delay the > materialization until the downstream operator requires this KTable to be > materialized or send-old-values (see `KTableSourceNode` and `KTableSource`). > 2.c) If the resulted KTable if from a join, we always materialize if users > creates a Materialized object even without a queryable name. However this can > be optimized similar to 2.b) but is orthogonal to this ticket (see > `KTableImpl#buildJoin` where we always use constructor with nameProvider != > null). > 2.d) If the resulted KTable is from a stateless operation like filter / > mapValues, we never materialize. > > Now, in all of these cases, we have logical node like "KTableKTableJoinNode", > as well as physical node like `ProcessorNode`. Ideally we should always > create the logical Plan (i.e. the StreamsGraph), and then optimize it if > necessary, and then generate the physical plan (i.e. the Topology), however > today we create some physical nodes beforehand, and the above logic is hence > duplicated in the creation of both physical nodes and logical nodes. For > example, in `KTableKTableJoinNode` we check if Materialized is null for > adding a state store, and in `KTableImpl#doJoin` we check if materialized is > specified (case 2.c) above). > Another example is in TableProcessorNode which is used for 2.d) above, in > which it includes the logic whereas its caller, `KTableImpl#doFilter` for > example, also contains the logic when deciding to pass `queryableName` > parameter to `KTableProcessorSupplier`. > This is bug-vulnerable since we may update the logic in one class but forgot > to update the other class. > -- > What we want to have is a cleaner code path similar to what we have for 2.b), > such that when creating the logical nodes we keep track of whether 1) > materialized is specified, and 2) queryable name is provided. And during > optimization phase, we may change the inner physical ProcessorBuilder's > parameters like queryable name etc, and then when it is time to generate the > physical node, we can just blindly take the parameters and go for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7502) Cleanup KTable materialization logic in a single place
[ https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805442#comment-16805442 ] ASF GitHub Bot commented on KAFKA-7502: --- bbejeck commented on pull request #6519: KAFKA-7502: Cleanup KTable materialization logic in a single place (doTransformValues) URL: https://github.com/apache/kafka/pull/6519 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup KTable materialization logic in a single place > -- > > Key: KAFKA-7502 > URL: https://issues.apache.org/jira/browse/KAFKA-7502 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Lee Dongjin >Priority: Major > > Today since we pre-create all the `KTableXXX` operator along with the logical > node, we are effectively duplicating the logic to determine whether the > resulted KTable should be materialized. More specifically, the > materialization principle today is that: > 1) If users specified Materialized in the DSL and it contains a queryable > name. We always materialize. > 2) If users specified Materialized in the DSL but not contains a queryable > name, or if users do not specify a Materialized object at all, Streams may > choose to materialize or not. But in any cases, even if the KTable is > materialized it will not be queryable since there's no queryable name (i.e. > only storeName is not null, but queryableName is null): > 2.a) If the resulted KTable is from an aggregation, we always materialize > since it is needed for storing the aggregation (i.e. we use the > MaterializedInternal constructor with nameProvider != null). > 2.b) If the resulted KTable is from a source topic, we delay the > materialization until the downstream operator requires this KTable to be > materialized or send-old-values (see `KTableSourceNode` and `KTableSource`). > 2.c) If the resulted KTable if from a join, we always materialize if users > creates a Materialized object even without a queryable name. However this can > be optimized similar to 2.b) but is orthogonal to this ticket (see > `KTableImpl#buildJoin` where we always use constructor with nameProvider != > null). > 2.d) If the resulted KTable is from a stateless operation like filter / > mapValues, we never materialize. > > Now, in all of these cases, we have logical node like "KTableKTableJoinNode", > as well as physical node like `ProcessorNode`. Ideally we should always > create the logical Plan (i.e. the StreamsGraph), and then optimize it if > necessary, and then generate the physical plan (i.e. the Topology), however > today we create some physical nodes beforehand, and the above logic is hence > duplicated in the creation of both physical nodes and logical nodes. For > example, in `KTableKTableJoinNode` we check if Materialized is null for > adding a state store, and in `KTableImpl#doJoin` we check if materialized is > specified (case 2.c) above). > Another example is in TableProcessorNode which is used for 2.d) above, in > which it includes the logic whereas its caller, `KTableImpl#doFilter` for > example, also contains the logic when deciding to pass `queryableName` > parameter to `KTableProcessorSupplier`. > This is bug-vulnerable since we may update the logic in one class but forgot > to update the other class. > -- > What we want to have is a cleaner code path similar to what we have for 2.b), > such that when creating the logical nodes we keep track of whether 1) > materialized is specified, and 2) queryable name is provided. And during > optimization phase, we may change the inner physical ProcessorBuilder's > parameters like queryable name etc, and then when it is time to generate the > physical node, we can just blindly take the parameters and go for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients
[ https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805328#comment-16805328 ] Rajesh Nataraja commented on KAFKA-8154: [~rsivaram] The infinite loop is exactly what we noticed, when trying to implement as per oracle documentation. I tried some ways to avoid infinite loop, but all of them end up causing IO exception, because the other side closes the connection. Probably because client is unable to complete the read operation. But in most of the cases of buffer overflow trying to increase the application buffer results in buffer underflow. Here are the underflow outputs where you can see netread buffersize. {color:#00} WARN org.apache.kafka.common.network.Selector - [Consumer clientId=xxx-a60c2c61-282d-4866-a3d1-f88579b44de7-StreamThread-2-consumer, groupId=1 ... ] Unexpected error from /[10.10.10.20|http://10.10.10.20/]; closing connection{color} {color:#00}java.lang.IllegalStateException: Buffer underflow when available data size (18437) > packet buffer size (18437){color} {color:#00} at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:565){color} {color:#00} at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117){color} {color:#00} at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381){color} {color:#00} at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342){color} {color:#00} at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609){color} {color:#00} at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541){color} {color:#00} at org.apache.kafka.common.network.Selector.poll(Selector.java:467){color} {color:#00} at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535){color} {color:#00} at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265){color} {color:#00} at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236){color} {color:#00} at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243){color} {color:#00} at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188){color} {color:#00} at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164){color} {color:#00} at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:913){color} {color:#00} at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822){color} {color:#00} at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777){color} {color:#00} at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747){color} > Buffer Overflow exceptions between brokers and with clients > --- > > Key: KAFKA-8154 > URL: https://issues.apache.org/jira/browse/KAFKA-8154 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Rajesh Nataraja >Priority: Major > Attachments: server.properties.txt > > > https://github.com/apache/kafka/pull/6495 > https://github.com/apache/kafka/pull/5785 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically
[ https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805258#comment-16805258 ] Dhruvil Shah edited comment on KAFKA-7362 at 3/29/19 6:38 PM: -- Thanks [~xiongqiwu]. It will be interesting to hear your proposal on how to make progress on topic deletion in spite of having offline brokers. I think the main challenge there is to figure out what happens on topic recreation, i.e. when you delete a topic and a topic with the same name is recreated. I think it would make sense to decouple both of these issues though. Cleaning up orphaned partitions is useful to reclaim the disk space, regardless of whether we solve the topic deletion problem or not. I am also not sure if we need a KIP to implement cleanup of orphaned partitions, as this does not change user experience in any way. What do you think? was (Author: dhruvilshah): Thanks [~xiongqiwu]. It will be interesting to hear your proposal on how to make progress on topic deletion in spite of having offline brokers. I think the main challenge there is to figure out what happens on topic recreation, i.e. when you delete a topic and a topic with the same name is recreated. I think it would make sense to decouple both of these issues though. Cleaning up orphaned partitions is useful to reclaim the disk space, regardless of whether we solve the topic deletion problem or not. I am also not sure if we need a KIP to implement cleanup of orphaned partitions, as this does not change user experience in any way. What do you think? > enable kafka broker to remove orphan partitions automatically > -- > > Key: KAFKA-7362 > URL: https://issues.apache.org/jira/browse/KAFKA-7362 > Project: Kafka > Issue Type: Improvement > Components: core, log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > When partition reassignment removes topic partitions from a offline broker, > those removed partitions become orphan partitions to the broker. When the > offline broker comes back online, it is not able to clean up both data and > folders that belong to orphan partitions. Log manager will scan all all dirs > during startup, but the time based retention policy on a topic partition will > not be kicked out until the broker is either a follower or a leader of the > partition. In addition, we do not have logic to delete folders that belong > to orphan partition today. > Open this ticket to provide a mechanism (when enabled) to safely remove > orphan partitions automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically
[ https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805258#comment-16805258 ] Dhruvil Shah commented on KAFKA-7362: - Thanks [~xiongqiwu]. It will be interesting to hear your proposal on how to make progress on topic deletion in spite of having offline brokers. I think the main challenge there is to figure out what happens on topic recreation, i.e. when you delete a topic and a topic with the same name is recreated. I think it would make sense to decouple both of these issues though. Cleaning up orphaned partitions is useful to reclaim the disk space, regardless of whether we solve the topic deletion problem or not. I am also not sure if we need a KIP to implement cleanup of orphaned partitions, as this does not change user experience in any way. What do you think? > enable kafka broker to remove orphan partitions automatically > -- > > Key: KAFKA-7362 > URL: https://issues.apache.org/jira/browse/KAFKA-7362 > Project: Kafka > Issue Type: Improvement > Components: core, log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > When partition reassignment removes topic partitions from a offline broker, > those removed partitions become orphan partitions to the broker. When the > offline broker comes back online, it is not able to clean up both data and > folders that belong to orphan partitions. Log manager will scan all all dirs > during startup, but the time based retention policy on a topic partition will > not be kicked out until the broker is either a follower or a leader of the > partition. In addition, we do not have logic to delete folders that belong > to orphan partition today. > Open this ticket to provide a mechanism (when enabled) to safely remove > orphan partitions automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8162) IBM JDK Class not found error when handling SASL authentication exception
[ https://issues.apache.org/jira/browse/KAFKA-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805151#comment-16805151 ] ASF GitHub Bot commented on KAFKA-8162: --- edoardocomar commented on pull request #6524: KAFKA-8162: IBM JDK Class not found error when handling SASL URL: https://github.com/apache/kafka/pull/6524 Attempt to load the IBM internal class but fallback on loading the Sun class if the IBM one is not found. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > IBM JDK Class not found error when handling SASL authentication exception > - > > Key: KAFKA-8162 > URL: https://issues.apache.org/jira/browse/KAFKA-8162 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 2.1.0, 2.2.0, 2.1.1 > Environment: Any with IBM JDK 8 SR5 FP10 >Reporter: Arkadiusz Firus >Assignee: Edoardo Comar >Priority: Major > > When there is a problem with SASL authentication then enum KerberosError is > being used to retrieve the error code. When IBM JDK is being used it tries to > load a class com.ibm.security.krb5.internal.KrbException which is not present > in all IBM JDK versions. This leads to NoClassDefFoundError which is not > handled. > I tested it on: > java version "1.8.0_161" > Java(TM) SE Runtime Environment (build 8.0.5.10 - > pxa6480sr5fp10-20180214_01(SR5 FP10)) > IBM J9 VM (build 2.9, JRE 1.8.0 Linux amd64-64 Compressed References > 20180208_378436 (JIT enabled, AOT enabled) > In this version of JDK class KrbException is in package com.ibm.security.krb5 > (without internal). So the fully class name is: > com.ibm.security.krb5.KrbException > Full stack trace from the logs: > [2019-03-27 06:50:00,113] ERROR Processor got uncaught exception. > (kafka.network.Processor) > java.lang.NoClassDefFoundError: > org.apache.kafka.common.security.kerberos.KerberosError (initialization > failure) > at > java.lang.J9VMInternals.initializationAlreadyFailed(J9VMInternals.java:96) > at > org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:384) > at > org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:256) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:132) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532) > at org.apache.kafka.common.network.Selector.poll(Selector.java:467) > at kafka.network.Processor.poll(SocketServer.scala:689) > at kafka.network.Processor.run(SocketServer.scala:594) > at java.lang.Thread.run(Thread.java:811) > Caused by: org.apache.kafka.common.KafkaException: Kerberos exceptions could > not be initialized > at > org.apache.kafka.common.security.kerberos.KerberosError.(KerberosError.java:59) > ... 8 more > Caused by: java.lang.ClassNotFoundException: > com.ibm.security.krb5.internal.KrbException > at java.lang.Class.forNameImpl(Native Method) > at java.lang.Class.forName(Class.java:297) > at > org.apache.kafka.common.security.kerberos.KerberosError.(KerberosError.java:53) > ... 8 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8162) IBM JDK Class not found error when handling SASL authentication exception
[ https://issues.apache.org/jira/browse/KAFKA-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805115#comment-16805115 ] Edoardo Comar commented on KAFKA-8162: -- Newer IBM JDKs bundle the regular Sun class "sun.security.krb5.KrbException" so the KerberosError class could attempt to load the IBM one but fallback on the Sun one. > IBM JDK Class not found error when handling SASL authentication exception > - > > Key: KAFKA-8162 > URL: https://issues.apache.org/jira/browse/KAFKA-8162 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 2.1.0, 2.2.0, 2.1.1 > Environment: Any with IBM JDK 8 SR5 FP10 >Reporter: Arkadiusz Firus >Assignee: Edoardo Comar >Priority: Major > > When there is a problem with SASL authentication then enum KerberosError is > being used to retrieve the error code. When IBM JDK is being used it tries to > load a class com.ibm.security.krb5.internal.KrbException which is not present > in all IBM JDK versions. This leads to NoClassDefFoundError which is not > handled. > I tested it on: > java version "1.8.0_161" > Java(TM) SE Runtime Environment (build 8.0.5.10 - > pxa6480sr5fp10-20180214_01(SR5 FP10)) > IBM J9 VM (build 2.9, JRE 1.8.0 Linux amd64-64 Compressed References > 20180208_378436 (JIT enabled, AOT enabled) > In this version of JDK class KrbException is in package com.ibm.security.krb5 > (without internal). So the fully class name is: > com.ibm.security.krb5.KrbException > Full stack trace from the logs: > [2019-03-27 06:50:00,113] ERROR Processor got uncaught exception. > (kafka.network.Processor) > java.lang.NoClassDefFoundError: > org.apache.kafka.common.security.kerberos.KerberosError (initialization > failure) > at > java.lang.J9VMInternals.initializationAlreadyFailed(J9VMInternals.java:96) > at > org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:384) > at > org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:256) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:132) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532) > at org.apache.kafka.common.network.Selector.poll(Selector.java:467) > at kafka.network.Processor.poll(SocketServer.scala:689) > at kafka.network.Processor.run(SocketServer.scala:594) > at java.lang.Thread.run(Thread.java:811) > Caused by: org.apache.kafka.common.KafkaException: Kerberos exceptions could > not be initialized > at > org.apache.kafka.common.security.kerberos.KerberosError.(KerberosError.java:59) > ... 8 more > Caused by: java.lang.ClassNotFoundException: > com.ibm.security.krb5.internal.KrbException > at java.lang.Class.forNameImpl(Native Method) > at java.lang.Class.forName(Class.java:297) > at > org.apache.kafka.common.security.kerberos.KerberosError.(KerberosError.java:53) > ... 8 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8162) IBM JDK Class not found error when handling SASL authentication exception
[ https://issues.apache.org/jira/browse/KAFKA-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar reassigned KAFKA-8162: Assignee: Edoardo Comar > IBM JDK Class not found error when handling SASL authentication exception > - > > Key: KAFKA-8162 > URL: https://issues.apache.org/jira/browse/KAFKA-8162 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 2.1.0, 2.2.0, 2.1.1 > Environment: Any with IBM JDK 8 SR5 FP10 >Reporter: Arkadiusz Firus >Assignee: Edoardo Comar >Priority: Major > > When there is a problem with SASL authentication then enum KerberosError is > being used to retrieve the error code. When IBM JDK is being used it tries to > load a class com.ibm.security.krb5.internal.KrbException which is not present > in all IBM JDK versions. This leads to NoClassDefFoundError which is not > handled. > I tested it on: > java version "1.8.0_161" > Java(TM) SE Runtime Environment (build 8.0.5.10 - > pxa6480sr5fp10-20180214_01(SR5 FP10)) > IBM J9 VM (build 2.9, JRE 1.8.0 Linux amd64-64 Compressed References > 20180208_378436 (JIT enabled, AOT enabled) > In this version of JDK class KrbException is in package com.ibm.security.krb5 > (without internal). So the fully class name is: > com.ibm.security.krb5.KrbException > Full stack trace from the logs: > [2019-03-27 06:50:00,113] ERROR Processor got uncaught exception. > (kafka.network.Processor) > java.lang.NoClassDefFoundError: > org.apache.kafka.common.security.kerberos.KerberosError (initialization > failure) > at > java.lang.J9VMInternals.initializationAlreadyFailed(J9VMInternals.java:96) > at > org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:384) > at > org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:256) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:132) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532) > at org.apache.kafka.common.network.Selector.poll(Selector.java:467) > at kafka.network.Processor.poll(SocketServer.scala:689) > at kafka.network.Processor.run(SocketServer.scala:594) > at java.lang.Thread.run(Thread.java:811) > Caused by: org.apache.kafka.common.KafkaException: Kerberos exceptions could > not be initialized > at > org.apache.kafka.common.security.kerberos.KerberosError.(KerberosError.java:59) > ... 8 more > Caused by: java.lang.ClassNotFoundException: > com.ibm.security.krb5.internal.KrbException > at java.lang.Class.forNameImpl(Native Method) > at java.lang.Class.forName(Class.java:297) > at > org.apache.kafka.common.security.kerberos.KerberosError.(KerberosError.java:53) > ... 8 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients
[ https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804777#comment-16804777 ] Rajini Sivaram commented on KAFKA-8154: --- The assumption we make is that an incoming packet with the SSL session's packet buffer size can be unwrapped into an application buffer with the SSL session's application buffer size and the application buffer size is bigger than the unwrapped data (hence the >= check). We guarantee that we don't allocate a buffer that is larger than the total of SSL session's buffer sizes per-connection since this is typically used to calculate the total memory required for brokers based on maximum connection count and can lead to OOM if we arbitrarily increased the value. So I dont think we can apply the fix in the PR [https://github.com/apache/kafka/pull/5785]. With the standard SSL session packet size in normal JDKs, in order to hit the exception in PR [https://github.com/apache/kafka/pull/6495,] application buffer size needs to be 16384 (lower than the value typically used). But if we set application buffer size to 16384 and use the normal network read buffer size, for example with an Oracle 1.8 JDK, instead of throwing IllegalStateException, we would end up with a infinite loop with large packets that cannot be unwrapped with an application buffer size of 16384. I think we need to better understand the issue here. It will be good to know if the issues encountered in both the failing scenarios (in the two PRs) were the same. A stack trace from the first one will be helpful for this. It will also be useful to get buffer sizes (netReadBufferSize(), netWriteBufferSize() and applicationBufferSize()) for the two environments. At the moment, we just have the one number 16384 as application buffer size and we dont expect that to work. Obviously if we can recreate with an unit test, that would be good, but it may be easier to write an unit test once we can identify the exact issue that we need to fix. > Buffer Overflow exceptions between brokers and with clients > --- > > Key: KAFKA-8154 > URL: https://issues.apache.org/jira/browse/KAFKA-8154 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Rajesh Nataraja >Priority: Major > Attachments: server.properties.txt > > > https://github.com/apache/kafka/pull/6495 > https://github.com/apache/kafka/pull/5785 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8175) The broker block some minutes may occur expired error message for a period of time
[ https://issues.apache.org/jira/browse/KAFKA-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804684#comment-16804684 ] ASF GitHub Bot commented on KAFKA-8175: --- huangyiminghappy commented on pull request #6522: KAFKA-8175: Remove streams overrides on repartition topics URL: https://github.com/apache/kafka/pull/6522 As described in (https://issues.apache.org/jira/browse/KAFKA-8175). if one of the node is block in the cluster,and when the client can not send updateMetaData to the antother node,the client will print much log like `org.apache.kafka.common.errors.TimeoutException: Expiring 1062 record(s) for kafka_test_111-8: 23967 ms has passed since batch creation plus linger timeFri Mar 29 11:34:14 CST 2019 ` . in someTimes the controller can not find the broker is down is offline soon,then client's batches can not send to the offline node,and also can not trigger the update metaData.so we need to check the connection's read state,if it not ready in the config time,close the channel,and trigger update the metaData ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The broker block some minutes may occur expired error message for a period of > time > > > Key: KAFKA-8175 > URL: https://issues.apache.org/jira/browse/KAFKA-8175 > Project: Kafka > Issue Type: Improvement >Reporter: huangyiming >Priority: Minor > > when the broker block some minutes, the producer may occur expired error > message for a period of time,that may continued for a period of time. if the > broker cluster's ip is 100,101,102,and the controller is the 100,when the 101 > block 2minutes(you can use gdb simulation,and attach the pid for 2 > minutes,last quit it), if the controller can not find the machine 101 > offline in time(for example the controller found 101 offline only 60 seconds > later ),and the controller send leaderAndIsr only 60 seconds later,and in the > RecordAccumulator's batches may occur much deliveryTimeout. and the > topicAndParttion'leader in 101 may occur expired error,and can not send > update metadata to another 100 or 102,because the record in 101 can not > send,and can not trigger timeout to update the metadata. > so i use -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8175) The broker block some minutes may occur expired error message for a period of time
huangyiming created KAFKA-8175: -- Summary: The broker block some minutes may occur expired error message for a period of time Key: KAFKA-8175 URL: https://issues.apache.org/jira/browse/KAFKA-8175 Project: Kafka Issue Type: Improvement Reporter: huangyiming when the broker block some minutes, the producer may occur expired error message for a period of time,that may continued for a period of time. if the broker cluster's ip is 100,101,102,and the controller is the 100,when the 101 block 2minutes(you can use gdb simulation,and attach the pid for 2 minutes,last quit it), if the controller can not find the machine 101 offline in time(for example the controller found 101 offline only 60 seconds later ),and the controller send leaderAndIsr only 60 seconds later,and in the RecordAccumulator's batches may occur much deliveryTimeout. and the topicAndParttion'leader in 101 may occur expired error,and can not send update metadata to another 100 or 102,because the record in 101 can not send,and can not trigger timeout to update the metadata. so i use -- This message was sent by Atlassian JIRA (v7.6.3#76005)