[jira] [Commented] (KAFKA-2617) Move protocol field default values to Protocol

2019-03-29 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805568#comment-16805568
 ] 

Guozhang Wang commented on KAFKA-2617:
--

I'm closing this ticket as it is resolved by KAFKA-7609 now.

> Move protocol field default values to Protocol
> --
>
> Key: KAFKA-2617
> URL: https://issues.apache.org/jira/browse/KAFKA-2617
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Jakub Nowak
>Priority: Minor
>  Labels: newbie
>
> Right now the default values are scattered in the Request / Response classes, 
> and some duplicates already exists like JoinGroupRequest.UNKNOWN_CONSUMER_ID 
> and OffsetCommitRequest.DEFAULT_CONSUMER_ID. We would like to move all 
> default values into org.apache.kafka.common.protocol.Protocol since 
> org.apache.kafka.common.requests depends on org.apache.kafka.common.protocol 
> anyways.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-03-29 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4600.
--
   Resolution: Fixed
Fix Version/s: 0.11.0.0

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
> Fix For: 0.11.0.0
>
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-03-29 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805565#comment-16805565
 ] 

Guozhang Wang commented on KAFKA-4600:
--

This seems to be the same issue as KAFKA-5154 and has been fixed as in 0.11.0.0.

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4799) session timeout during event processing shuts down stream

2019-03-29 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4799.
--
   Resolution: Fixed
Fix Version/s: 0.11.0.1

> session timeout during event processing shuts down stream
> -
>
> Key: KAFKA-4799
> URL: https://issues.apache.org/jira/browse/KAFKA-4799
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: kafka streams client running on os x, with docker 
> machine running broker
>Reporter: Jacob Gur
>Priority: Critical
> Fix For: 0.11.0.1
>
>
> I have a simple stream application like this:
> {code:title=Part of my class|borderStyle=solid}
>   private  IConsumerSubscription buildSubscriptionStream(
>   Class clazz, Consumer consumer, String group,
>   Function> 
> topicStreamFunc)
>   {
>   KStreamBuilder builder = new KStreamBuilder();
>   KStream stream = topicStreamFunc.apply(builder);
>   stream.foreach((k, v) -> {
>   try {
>   T value = 
> _jsonObjectMapper.mapFromJsonString(v, clazz);
>   consumer.accept(value);
>   Logger.trace("Consumed message {}", value);
>   } catch (Throwable th) {
>   Logger.warn("Error while consuming message", 
> th);
>   }
>   });
>   final KafkaStreams streams = new KafkaStreams(builder, 
> constructProperties(group));
>   streams.start();
>   return streams::close;
>   }
> {code}
> There is just one client running this application stream.
> If I run the client in a debugger with a breakpoint on the event processor 
> (i.e., inside the foreach lambda) with debugger suspending all threads for 
> perhaps more than 10 seconds, then when I resume the application:
> Actual behavior - the stream shuts down
> Expected behavior - the stream should recover, perhaps temporarily removed 
> from partition but then re-added and recovered.
> It looks like what happens is this:
> 1) The kafka client session times out.
> 2) The partition is revoked
> 3) The streams library has a rebalance listener that tries to commit offsets, 
> but that commit fails due to a rebalance exception.
> 4) Stream shuts down.
> Steps 3 and 4 occur in StreamThread's rebalance listener.
> It seems that it should be more resilient and recover just like a regular 
> KafkaConsumer would. Its partition would be revoked, and then it would get it 
> back again and resume processing at the last offset.
> Is current behavior expected and I'm not understanding the intention? Or is 
> this a bug?
> Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6631) Kafka Streams - Rebalancing exception in Kafka 1.0.0

2019-03-29 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6631.
--
Resolution: Fixed

Just a side note that we are working on KAFKA-7149 to reduce the assignment 
metadata size with many topic partitions in the assignment.

> Kafka Streams - Rebalancing exception in Kafka 1.0.0
> 
>
> Key: KAFKA-6631
> URL: https://issues.apache.org/jira/browse/KAFKA-6631
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Container Linux by CoreOS 1576.5.0
>Reporter: Alexander Ivanichev
>Priority: Critical
>
>  
> In Kafka Streams 1.0.0, we saw a strange rebalance error, our stream app 
> performs window based aggregations, sometimes on start when all stream 
> workers  join the app just crash, however if we enable only one worker than 
> it works fine, sometime 2 workers work just fine, but when third join the app 
> crashes again, some critical issue with rebalance.
> {code:java}
> 018-03-08T18:51:01.226243000Z org.apache.kafka.common.KafkaException: 
> Unexpected error from SyncGroup: The server experienced an unexpected error 
> when processing the request
> 2018-03-08T18:51:01.226557000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566)
> 2018-03-08T18:51:01.22686Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:539)
> 2018-03-08T18:51:01.227328000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
> 2018-03-08T18:51:01.22763Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
> 2018-03-08T18:51:01.228152000Z at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> 2018-03-08T18:51:01.228449000Z at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> 2018-03-08T18:51:01.228897000Z at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> 2018-03-08T18:51:01.229196000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
> 2018-03-08T18:51:01.229673000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
> 2018-03-08T18:51:01.229971000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
> 2018-03-08T18:51:01.230436000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
> 2018-03-08T18:51:01.230749000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
> 2018-03-08T18:51:01.231065000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
> 2018-03-08T18:51:01.231584000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> 2018-03-08T18:51:01.231911000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> 2018-03-08T18:51:01.23219Z at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
> 2018-03-08T18:51:01.232643000Z at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
> 2018-03-08T18:51:01.233121000Z at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> 2018-03-08T18:51:01.233409000Z at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> 2018-03-08T18:51:01.23372Z at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> 2018-03-08T18:51:01.234196000Z at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> 2018-03-08T18:51:01.234655000Z org.apache.kafka.common.KafkaException: 
> Unexpected error from SyncGroup: The server experienced an unexpected error 
> when processing the request
> 2018-03-08T18:51:01.234972000Z exception in thread, closing process
> 2018-03-08T18:51:01.23550Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566)
> 2018-03-08T18:51:01.235839000Z at 
> 

[jira] [Commented] (KAFKA-6745) kafka consumer rebalancing takes long time (from 3 secs to 5 minutes)

2019-03-29 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805561#comment-16805561
 ] 

Guozhang Wang commented on KAFKA-6745:
--

I think the root cause is that when you are bouncing a consumer instance, the 
consumer's member.id is not kicked out of the group yet when it was re-started 
and hence re-join as a new member. In this case the old.member will never send 
a re-join group and the coordinator will always have to wait till the 
rebalance.timeout (5 min) has elapsed to kick out the member.

Could you describe how did you rebalance the consumer? Did you gracefully 
shutdown each instance, and then restarted them?

> kafka consumer rebalancing takes long time (from 3 secs to 5 minutes)
> -
>
> Key: KAFKA-6745
> URL: https://issues.apache.org/jira/browse/KAFKA-6745
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 0.11.0.0
>Reporter: Ramkumar
>Priority: Major
>
> Hi, We had an HTTP service 3 nodes around Kafka 0.8 . This http service acts 
> as a REST api for the publishers and consumers to use middleware intead of 
> using kafka client api. Here the when the consumers rebalance is not a major 
> issue.
> We wanted to upgrade to kafka 0.11 , we have updated our http services (3 
> node cluster) to use new Kafka consumer API , but it takes rebalancing of 
> consumer (multiple consumer under same Group) between secs to 5 mins 
> (max.poll.interval.ms). Because of this time our http clients are timing out 
> and do failover. This rebalancing time is major issue. It is not clear from 
> the documentation ,that rebalance activity for the group takes place after 
> max.poll.interval.ms  or it starts after 3 secs and complete any time with in 
> 5 minutes. We tried to reduce max.poll.interval.ms   to 15 seconds. but this 
> also triggers rebalance internally.
> Below are the other parameters we have set In our service
> max.poll.interval.ms = 30 sec
>  seconds heartbeat.interval.ms = 1
> minute session.timeout.ms = 4
> minutes consumer.cache.timeout = 2 min
>  
>  
> below is the log
> ""2018-03-26 12:53:23,009 [qtp1404928347-11556] INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
> (Re-)joining group firstnetportal_001
> ""2018-03-26 12:57:52,793 [qtp1404928347-11556] INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
> Successfully joined group firstnetportal_001 with generation 7475
> Please let me know if there are any other application/client use http 
> interace in 3 nodes with out any having this  issue
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5586) Handle client disconnects during JoinGroup

2019-03-29 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805554#comment-16805554
 ] 

Guozhang Wang commented on KAFKA-5586:
--

Hi [~hachikuji], with KIP-394 and KIP-91, that Streams is also going to remove 
its override on `max.poll.interval.ms` to use consumer default (5min) in 
KIP-442, I think this would not be an issue worth resolving now. WDYT?

> Handle client disconnects during JoinGroup
> --
>
> Key: KAFKA-5586
> URL: https://issues.apache.org/jira/browse/KAFKA-5586
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Priority: Major
>
> If a consumer disconnects with a JoinGroup in-flight, we do not remove it 
> from the group until after the Join phase completes. If the client 
> immediately re-sends the JoinGroup request and it already had a memberId, 
> then the callback will be replaced and there is no harm done. For the other 
> cases:
> 1. If the client disconnected due to a failure and does not re-send the 
> JoinGroup, the consumer will still be included in the new group generation 
> after the rebalance completes, but will immediately timeout and trigger a new 
> rebalance.
> 2. If the consumer was not a member of the group and re-sends JoinGroup, then 
> a new memberId will be created for that consumer and the old one will not be 
> removed. When the rebalance completes, the old memberId will timeout and a 
> rebalance will be triggered.
> To address these issues, we should add some additional logic to handle client 
> disconnections during the join phase. For newly generated memberIds, we 
> should simply remove them. For existing members, we should probably leave 
> them in the group and reset the heartbeat expiration task.
> Note that we currently have no facility to expose disconnects from the 
> network layer to the other layers, so we need to find a good approach for 
> this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds

2019-03-29 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7142.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

Resolving the ticket since the PR is merged as in 2.1.0 already.

> Rebalancing large consumer group can block the coordinator broker for several 
> seconds
> -
>
> Key: KAFKA-7142
> URL: https://issues.apache.org/jira/browse/KAFKA-7142
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 1.0.0, 
> 1.1.0
>Reporter: Ying Zheng
>Assignee: Ying Zheng
>Priority: Major
> Fix For: 2.1.0
>
>
> In our production cluster, we noticed that when a large consumer group (a few 
> thousand members) is rebalancing, the produce latency of the coordinator 
> broker can jump to several seconds.
>  
> Group rebalance is a very frequent operation, it can be triggered by adding / 
> removing / restarting a single member in the consumer group.
>  
> When this happens, jstack shows all the request handler threads of the broker 
> are waiting for group lock:
> {noformat}
> "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 
> nid=0x1b985 waiting on condition [0x7f98f1adb000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00024aa73b20> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
>   
> Besides one thread that is either doing GroupMetadata.supportsProtocols():
> {noformat}
> "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 
> nid=0x1b984 runnable [0x7f98f1bdc000]
>java.lang.Thread.State: RUNNABLE
> at scala.collection.immutable.List.map(List.scala:284)
> at 
> kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at scala.collection.immutable.List.map(List.scala:288)
> at 
> kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270)
> at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
> or GroupCoordinator.tryCompleteJoin
> {noformat}
> "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 
> nid=0x1ceff runnable [0x7fe8701ca000]
>java.lang.Thread.State: RUNNABLE
> at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at 
> 

[jira] [Resolved] (KAFKA-6681) Two instances of kafka consumer reading the same partition within a consumer group

2019-03-29 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6681.
--
   Resolution: Fixed
Fix Version/s: 0.11.0.0

Resolving as part of https://issues.apache.org/jira/browse/KAFKA-5154 now.

> Two instances of kafka consumer reading the same partition within a consumer 
> group
> --
>
> Key: KAFKA-6681
> URL: https://issues.apache.org/jira/browse/KAFKA-6681
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.10.2.1
>Reporter: Narayan Periwal
>Priority: Critical
> Fix For: 0.11.0.0
>
> Attachments: server-1.log, server-2.log
>
>
> We have seen this issue with the Kafka consumer, the new library that got 
> introduced in 0.9
> With this new client, the group management is done by kafka coordinator, 
> which is one of the kafka broker.
> We are using Kafka broker 0.10.2.1 and consumer client version is also 
> 0.10.2.1 
> The issue that we have faced is that, after rebalancing, some of the 
> partitions gets consumed by 2 instances within a consumer group, leading to 
> duplication of the entire partition data. Both the instances continue to read 
> until the next rebalancing, or the restart of those clients. 
> It looks like that a particular consumer goes on fetching the data from a 
> partition, but the broker is not able to identify this "stale" consumer 
> instance. 
> We have hit this twice in production. Please look at it the earliest. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8029) Add in-memory bytes-only session store implementation

2019-03-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805546#comment-16805546
 ] 

ASF GitHub Bot commented on KAFKA-8029:
---

ableegoldman commented on pull request #6525: KAFKA-8029: In memory session 
store
URL: https://github.com/apache/kafka/pull/6525
 
 
   First pass at an in-memory session store implementation. WIP (needs testing, 
KIP)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add in-memory bytes-only session store implementation
> -
>
> Key: KAFKA-8029
> URL: https://issues.apache.org/jira/browse/KAFKA-8029
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> As titled. We've added the window store and session store implementations in 
> memory, what's left is the session store now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown

2019-03-29 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805543#comment-16805543
 ] 

Guozhang Wang commented on KAFKA-7447:
--

Is it related to https://issues.apache.org/jira/browse/KAFKA-8069?

> Consumer offsets lost during leadership rebalance after bringing node back 
> from clean shutdown
> --
>
> Key: KAFKA-7447
> URL: https://issues.apache.org/jira/browse/KAFKA-7447
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Ben Isaacs
>Priority: Major
>
> *Summary:*
>  * When 1 of my 3 brokers is cleanly shut down, consumption and production 
> continues as normal due to replication. (Consumers are rebalanced to the 
> replicas, and producers are rebalanced to the remaining brokers). However, 
> when the cleanly-shut-down broker comes back, after about 10 minutes, a 
> flurry of production errors occur and my consumers suddenly go back in time 2 
> weeks, causing a long outage (12 hours+) as all messages are replayed on some 
> topics.
>  * The hypothesis is that the auto-leadership-rebalance is happening too 
> quickly after the downed broker returns, before it has had a chance to become 
> fully synchronised on all partitions. In particular, it seems that having 
> consumer offets ahead of the most recent data on the topic that consumer was 
> following causes the consumer to be reset to 0.
> *Expected:*
>  * bringing a node back from a clean shut down does not cause any consumers 
> to reset to 0.
> *Actual:*
>  * I experience approximately 12 hours of partial outage triggered at the 
> point that auto leadership rebalance occurs, after a cleanly shut down node 
> returns.
> *Workaround:*
>  * disable auto leadership rebalance entirely. 
>  * manually rebalance it from time to time when all nodes and all partitions 
> are fully replicated.
> *My Setup:*
>  * Kafka deployment with 3 brokers and 2 topics.
>  * Replication factor is 3, for all topics.
>  * min.isr is 2, for all topics.
>  * Zookeeper deployment with 3 instances.
>  * In the region of 10 to 15 consumers, with 2 user topics (and, of course, 
> the system topics such as consumer offsets). Consumer offsets has the 
> standard 50 partitions. The user topics have about 3000 partitions in total.
>  * Offset retention time of 7 days, and topic retention time of 14 days.
>  * Input rate ~1000 messages/sec.
>  * Deployment happens to be on Google compute engine.
> *Related Stack Overflow Post:*
> https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker
> It was suggested I open a ticket by "Muir" who says he they have also 
> experienced this.
> *Transcription of logs, showing the problem:*
> Below, you can see chronologically sorted, interleaved, logs from the 3 
> brokers. prod-kafka-2 is the node which was cleanly shut down and then 
> restarted. I filtered the messages only to those regardling 
> __consumer_offsets-29 because it's just too much to paste, otherwise.
> ||Broker host||Broker ID||
> |prod-kafka-1|0|
> |prod-kafka-2|1 (this one was restarted)|
> |prod-kafka-3|2|
> prod-kafka-2: (just starting up)
> {code}
> [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Based on follower's leader epoch, leader replied with an unknown 
> offset in __consumer_offsets-29. The initial fetch offset 0 will be used for 
> truncation. (kafka.server.ReplicaFetcherThread)
> {code}
> prod-kafka-3: (sees replica1 come back)
> {code}
> [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] 
> Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)
> {code}
> prod-kafka-2:
> {code}
> [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished 
> unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached 
> groups. (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions __consumer_offsets-29 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] 
> __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous 
> Leader Epoch was: 77 (kafka.cluster.Partition)
>  [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling 
> loading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,288] INFO [GroupMetadataManager brokerId=1] Finished 
> loading offsets and group 

[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.

2019-03-29 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805539#comment-16805539
 ] 

Jun Rao commented on KAFKA-8106:


[~Flower.min], one of the validation that the broker has to do is to verify 
that the timestamp of each record is within the allowed max diff. One can only 
know the timestamp of a record after decompressing the batch. 

> Remove unnecessary decompression operation when logValidator  do validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
>   
>  _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .We found the hot spot method is 
> *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and 
> *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When
>   we checking thread stack information we  also have found most CPU being 
> occupied by lots of thread  which  is busy decompressing messages.Then we 
> read source code of Kafka .
>        There is double-layer nested Iterator  when LogValidator do validate 
> every record.And There is a decompression for each message when traversing 
> every RecordBatch iterator. It is consuming CPU and affect total performance 
> that  decompress message._*The purpose of decompressing every messages just 
> for gain total size in bytes of one record and size in bytes of record body 
> when magic value to use is above 1 and no format conversion or value 
> overwriting is required for compressed messages.It is negative for 
> performance in common usage scenarios .*_{color:#33}Therefore, we suggest 
> that *_removing unnecessary decompression operation_* when doing  validation 
> for compressed message  when magic value to use is above 1 and no format 
> conversion or value overwriting is required for compressed messages.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8034) Replace DeleteTopics request/response with automated protocol

2019-03-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805463#comment-16805463
 ] 

ASF GitHub Bot commented on KAFKA-8034:
---

cmccabe commented on pull request #6366: KAFKA-8034: Use automatic RPC 
generation in DeleteTopics
URL: https://github.com/apache/kafka/pull/6366
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Replace DeleteTopics request/response with automated protocol
> -
>
> Key: KAFKA-8034
> URL: https://issues.apache.org/jira/browse/KAFKA-8034
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7502) Cleanup KTable materialization logic in a single place

2019-03-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805444#comment-16805444
 ] 

ASF GitHub Bot commented on KAFKA-7502:
---

bbejeck commented on pull request #6520: KAFKA-7502: Cleanup KTable 
materialization logic in a single place (doMapValues)
URL: https://github.com/apache/kafka/pull/6520
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup KTable materialization logic in a single place
> --
>
> Key: KAFKA-7502
> URL: https://issues.apache.org/jira/browse/KAFKA-7502
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Lee Dongjin
>Priority: Major
>
> Today since we pre-create all the `KTableXXX` operator along with the logical 
> node, we are effectively duplicating the logic to determine whether the 
> resulted KTable should be materialized. More specifically, the 
> materialization principle today is that:
> 1) If users specified Materialized in the DSL and it contains a queryable 
> name. We always materialize.
> 2) If users specified Materialized in the DSL but not contains a queryable 
> name, or if users do not specify a Materialized object at all, Streams may 
> choose to materialize or not. But in any cases, even if the KTable is 
> materialized it will not be queryable since there's no queryable name (i.e. 
> only storeName is not null, but queryableName is null):
> 2.a) If the resulted KTable is from an aggregation, we always materialize 
> since it is needed for storing the aggregation (i.e. we use the 
> MaterializedInternal constructor with nameProvider != null).
> 2.b) If the resulted KTable is from a source topic, we delay the 
> materialization until the downstream operator requires this KTable to be 
> materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
> 2.c) If the resulted KTable if from a join, we always materialize if users 
> creates a Materialized object even without a queryable name. However this can 
> be optimized similar to 2.b) but is orthogonal to this ticket (see 
> `KTableImpl#buildJoin` where we always use constructor with nameProvider != 
> null).
> 2.d) If the resulted KTable is from a stateless operation like filter / 
> mapValues, we never materialize.
> 
> Now, in all of these cases, we have logical node like "KTableKTableJoinNode", 
> as well as physical node like `ProcessorNode`. Ideally we should always 
> create the logical Plan (i.e. the StreamsGraph), and then optimize it if 
> necessary, and then generate the physical plan (i.e. the Topology), however 
> today we create some physical nodes beforehand, and the above logic is hence 
> duplicated in the creation of both physical nodes and logical nodes. For 
> example, in `KTableKTableJoinNode` we check if Materialized is null for 
> adding a state store, and in `KTableImpl#doJoin` we check if materialized is 
> specified (case 2.c) above). 
> Another example is in TableProcessorNode which is used for 2.d) above, in 
> which it includes the logic whereas its caller, `KTableImpl#doFilter` for 
> example, also contains the logic when deciding to pass `queryableName` 
> parameter to `KTableProcessorSupplier`.
> This is bug-vulnerable since we may update the logic in one class but forgot 
> to update the other class.
> --
> What we want to have is a cleaner code path similar to what we have for 2.b), 
> such that when creating the logical nodes we keep track of whether 1) 
> materialized is specified, and 2) queryable name is provided. And during 
> optimization phase, we may change the inner physical ProcessorBuilder's 
> parameters like queryable name etc, and then when it is time to generate the 
> physical node, we can just blindly take the parameters and go for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7502) Cleanup KTable materialization logic in a single place

2019-03-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805442#comment-16805442
 ] 

ASF GitHub Bot commented on KAFKA-7502:
---

bbejeck commented on pull request #6519: KAFKA-7502: Cleanup KTable 
materialization logic in a single place (doTransformValues)
URL: https://github.com/apache/kafka/pull/6519
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup KTable materialization logic in a single place
> --
>
> Key: KAFKA-7502
> URL: https://issues.apache.org/jira/browse/KAFKA-7502
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Lee Dongjin
>Priority: Major
>
> Today since we pre-create all the `KTableXXX` operator along with the logical 
> node, we are effectively duplicating the logic to determine whether the 
> resulted KTable should be materialized. More specifically, the 
> materialization principle today is that:
> 1) If users specified Materialized in the DSL and it contains a queryable 
> name. We always materialize.
> 2) If users specified Materialized in the DSL but not contains a queryable 
> name, or if users do not specify a Materialized object at all, Streams may 
> choose to materialize or not. But in any cases, even if the KTable is 
> materialized it will not be queryable since there's no queryable name (i.e. 
> only storeName is not null, but queryableName is null):
> 2.a) If the resulted KTable is from an aggregation, we always materialize 
> since it is needed for storing the aggregation (i.e. we use the 
> MaterializedInternal constructor with nameProvider != null).
> 2.b) If the resulted KTable is from a source topic, we delay the 
> materialization until the downstream operator requires this KTable to be 
> materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
> 2.c) If the resulted KTable if from a join, we always materialize if users 
> creates a Materialized object even without a queryable name. However this can 
> be optimized similar to 2.b) but is orthogonal to this ticket (see 
> `KTableImpl#buildJoin` where we always use constructor with nameProvider != 
> null).
> 2.d) If the resulted KTable is from a stateless operation like filter / 
> mapValues, we never materialize.
> 
> Now, in all of these cases, we have logical node like "KTableKTableJoinNode", 
> as well as physical node like `ProcessorNode`. Ideally we should always 
> create the logical Plan (i.e. the StreamsGraph), and then optimize it if 
> necessary, and then generate the physical plan (i.e. the Topology), however 
> today we create some physical nodes beforehand, and the above logic is hence 
> duplicated in the creation of both physical nodes and logical nodes. For 
> example, in `KTableKTableJoinNode` we check if Materialized is null for 
> adding a state store, and in `KTableImpl#doJoin` we check if materialized is 
> specified (case 2.c) above). 
> Another example is in TableProcessorNode which is used for 2.d) above, in 
> which it includes the logic whereas its caller, `KTableImpl#doFilter` for 
> example, also contains the logic when deciding to pass `queryableName` 
> parameter to `KTableProcessorSupplier`.
> This is bug-vulnerable since we may update the logic in one class but forgot 
> to update the other class.
> --
> What we want to have is a cleaner code path similar to what we have for 2.b), 
> such that when creating the logical nodes we keep track of whether 1) 
> materialized is specified, and 2) queryable name is provided. And during 
> optimization phase, we may change the inner physical ProcessorBuilder's 
> parameters like queryable name etc, and then when it is time to generate the 
> physical node, we can just blindly take the parameters and go for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2019-03-29 Thread Rajesh Nataraja (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805328#comment-16805328
 ] 

Rajesh Nataraja commented on KAFKA-8154:


[~rsivaram] The infinite loop is exactly what we noticed, when trying to 
implement as per oracle documentation. I tried some ways to avoid infinite 
loop, but all of them end up causing IO exception, because the other side 
closes the connection. Probably because client is unable to complete the read 
operation. But in most of the cases of buffer overflow trying to  increase  the 
application buffer results in buffer underflow. Here are the underflow outputs 
where you can see netread buffersize.

 

{color:#00} WARN org.apache.kafka.common.network.Selector - [Consumer 
clientId=xxx-a60c2c61-282d-4866-a3d1-f88579b44de7-StreamThread-2-consumer, 
groupId=1 ... ] Unexpected error from /[10.10.10.20|http://10.10.10.20/]; 
closing connection{color}
{color:#00}java.lang.IllegalStateException: Buffer underflow when available 
data size (18437) > packet buffer size (18437){color}
{color:#00} at 
org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:565){color}
{color:#00} at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117){color}
{color:#00} at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381){color}
{color:#00} at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342){color}
{color:#00} at 
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609){color}
{color:#00} at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541){color}
{color:#00} at 
org.apache.kafka.common.network.Selector.poll(Selector.java:467){color}
{color:#00} at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535){color}
{color:#00} at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265){color}
{color:#00} at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236){color}
{color:#00} at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243){color}
{color:#00} at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188){color}
{color:#00} at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164){color}
{color:#00} at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:913){color}
{color:#00} at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822){color}
{color:#00} at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777){color}
{color:#00} at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747){color}

> Buffer Overflow exceptions between brokers and with clients
> ---
>
> Key: KAFKA-8154
> URL: https://issues.apache.org/jira/browse/KAFKA-8154
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Rajesh Nataraja
>Priority: Major
> Attachments: server.properties.txt
>
>
> https://github.com/apache/kafka/pull/6495
> https://github.com/apache/kafka/pull/5785



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-03-29 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805258#comment-16805258
 ] 

Dhruvil Shah edited comment on KAFKA-7362 at 3/29/19 6:38 PM:
--

Thanks [~xiongqiwu]. It will be interesting to hear your proposal on how to 
make progress on topic deletion in spite of having offline brokers. I think the 
main challenge there is to figure out what happens on topic recreation, i.e. 
when you delete a topic and a topic with the same name is recreated.

I think it would make sense to decouple both of these issues though. Cleaning 
up orphaned partitions is useful to reclaim the disk space, regardless of 
whether we solve the topic deletion problem or not. I am also not sure if we 
need a KIP to implement cleanup of orphaned partitions, as this does not change 
user experience in any way. What do you think?


was (Author: dhruvilshah):
Thanks [~xiongqiwu]. It will be interesting to hear your proposal on how to 
make progress on topic deletion in spite of having offline brokers. I think the 
main challenge there is to figure out what happens on topic recreation, i.e. 
when you delete a topic and a topic with the same name is recreated.

 

I think it would make sense to decouple both of these issues though. Cleaning 
up orphaned partitions is useful to reclaim the disk space, regardless of 
whether we solve the topic deletion problem or not. I am also not sure if we 
need a KIP to implement cleanup of orphaned partitions, as this does not change 
user experience in any way. What do you think?

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-03-29 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805258#comment-16805258
 ] 

Dhruvil Shah commented on KAFKA-7362:
-

Thanks [~xiongqiwu]. It will be interesting to hear your proposal on how to 
make progress on topic deletion in spite of having offline brokers. I think the 
main challenge there is to figure out what happens on topic recreation, i.e. 
when you delete a topic and a topic with the same name is recreated.

 

I think it would make sense to decouple both of these issues though. Cleaning 
up orphaned partitions is useful to reclaim the disk space, regardless of 
whether we solve the topic deletion problem or not. I am also not sure if we 
need a KIP to implement cleanup of orphaned partitions, as this does not change 
user experience in any way. What do you think?

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8162) IBM JDK Class not found error when handling SASL authentication exception

2019-03-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805151#comment-16805151
 ] 

ASF GitHub Bot commented on KAFKA-8162:
---

edoardocomar commented on pull request #6524: KAFKA-8162: IBM JDK Class not 
found error when handling SASL
URL: https://github.com/apache/kafka/pull/6524
 
 
   Attempt to load the IBM internal class but fallback on loading the Sun
   class if the IBM one is not found.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> IBM JDK Class not found error when handling SASL authentication exception
> -
>
> Key: KAFKA-8162
> URL: https://issues.apache.org/jira/browse/KAFKA-8162
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
> Environment: Any with IBM JDK 8 SR5 FP10
>Reporter: Arkadiusz Firus
>Assignee: Edoardo Comar
>Priority: Major
>
> When there is a problem with SASL authentication then enum KerberosError is 
> being used to retrieve the error code. When IBM JDK is being used it tries to 
> load a class com.ibm.security.krb5.internal.KrbException which is not present 
> in all IBM JDK versions. This leads to NoClassDefFoundError which is not 
> handled.
> I tested it on:
>  java version "1.8.0_161"
>  Java(TM) SE Runtime Environment (build 8.0.5.10 - 
> pxa6480sr5fp10-20180214_01(SR5 FP10))
>  IBM J9 VM (build 2.9, JRE 1.8.0 Linux amd64-64 Compressed References 
> 20180208_378436 (JIT enabled, AOT enabled)
> In this version of JDK class KrbException is in package com.ibm.security.krb5 
> (without internal). So the fully class name is: 
> com.ibm.security.krb5.KrbException
> Full stack trace from the logs:
> [2019-03-27 06:50:00,113] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.NoClassDefFoundError: 
> org.apache.kafka.common.security.kerberos.KerberosError (initialization 
> failure)
>     at 
> java.lang.J9VMInternals.initializationAlreadyFailed(J9VMInternals.java:96)
>     at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:384)
>     at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:256)
>     at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:132)
>     at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
>     at kafka.network.Processor.poll(SocketServer.scala:689)
>     at kafka.network.Processor.run(SocketServer.scala:594)
>     at java.lang.Thread.run(Thread.java:811)
> Caused by: org.apache.kafka.common.KafkaException: Kerberos exceptions could 
> not be initialized
>     at 
> org.apache.kafka.common.security.kerberos.KerberosError.(KerberosError.java:59)
>     ... 8 more
> Caused by: java.lang.ClassNotFoundException: 
> com.ibm.security.krb5.internal.KrbException
>     at java.lang.Class.forNameImpl(Native Method)
>     at java.lang.Class.forName(Class.java:297)
>     at 
> org.apache.kafka.common.security.kerberos.KerberosError.(KerberosError.java:53)
>     ... 8 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8162) IBM JDK Class not found error when handling SASL authentication exception

2019-03-29 Thread Edoardo Comar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805115#comment-16805115
 ] 

Edoardo Comar commented on KAFKA-8162:
--

Newer IBM JDKs bundle the regular Sun class "sun.security.krb5.KrbException"

so the KerberosError class could attempt to load the IBM one but fallback on 
the Sun one.

> IBM JDK Class not found error when handling SASL authentication exception
> -
>
> Key: KAFKA-8162
> URL: https://issues.apache.org/jira/browse/KAFKA-8162
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
> Environment: Any with IBM JDK 8 SR5 FP10
>Reporter: Arkadiusz Firus
>Assignee: Edoardo Comar
>Priority: Major
>
> When there is a problem with SASL authentication then enum KerberosError is 
> being used to retrieve the error code. When IBM JDK is being used it tries to 
> load a class com.ibm.security.krb5.internal.KrbException which is not present 
> in all IBM JDK versions. This leads to NoClassDefFoundError which is not 
> handled.
> I tested it on:
>  java version "1.8.0_161"
>  Java(TM) SE Runtime Environment (build 8.0.5.10 - 
> pxa6480sr5fp10-20180214_01(SR5 FP10))
>  IBM J9 VM (build 2.9, JRE 1.8.0 Linux amd64-64 Compressed References 
> 20180208_378436 (JIT enabled, AOT enabled)
> In this version of JDK class KrbException is in package com.ibm.security.krb5 
> (without internal). So the fully class name is: 
> com.ibm.security.krb5.KrbException
> Full stack trace from the logs:
> [2019-03-27 06:50:00,113] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.NoClassDefFoundError: 
> org.apache.kafka.common.security.kerberos.KerberosError (initialization 
> failure)
>     at 
> java.lang.J9VMInternals.initializationAlreadyFailed(J9VMInternals.java:96)
>     at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:384)
>     at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:256)
>     at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:132)
>     at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
>     at kafka.network.Processor.poll(SocketServer.scala:689)
>     at kafka.network.Processor.run(SocketServer.scala:594)
>     at java.lang.Thread.run(Thread.java:811)
> Caused by: org.apache.kafka.common.KafkaException: Kerberos exceptions could 
> not be initialized
>     at 
> org.apache.kafka.common.security.kerberos.KerberosError.(KerberosError.java:59)
>     ... 8 more
> Caused by: java.lang.ClassNotFoundException: 
> com.ibm.security.krb5.internal.KrbException
>     at java.lang.Class.forNameImpl(Native Method)
>     at java.lang.Class.forName(Class.java:297)
>     at 
> org.apache.kafka.common.security.kerberos.KerberosError.(KerberosError.java:53)
>     ... 8 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8162) IBM JDK Class not found error when handling SASL authentication exception

2019-03-29 Thread Edoardo Comar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar reassigned KAFKA-8162:


Assignee: Edoardo Comar

> IBM JDK Class not found error when handling SASL authentication exception
> -
>
> Key: KAFKA-8162
> URL: https://issues.apache.org/jira/browse/KAFKA-8162
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
> Environment: Any with IBM JDK 8 SR5 FP10
>Reporter: Arkadiusz Firus
>Assignee: Edoardo Comar
>Priority: Major
>
> When there is a problem with SASL authentication then enum KerberosError is 
> being used to retrieve the error code. When IBM JDK is being used it tries to 
> load a class com.ibm.security.krb5.internal.KrbException which is not present 
> in all IBM JDK versions. This leads to NoClassDefFoundError which is not 
> handled.
> I tested it on:
>  java version "1.8.0_161"
>  Java(TM) SE Runtime Environment (build 8.0.5.10 - 
> pxa6480sr5fp10-20180214_01(SR5 FP10))
>  IBM J9 VM (build 2.9, JRE 1.8.0 Linux amd64-64 Compressed References 
> 20180208_378436 (JIT enabled, AOT enabled)
> In this version of JDK class KrbException is in package com.ibm.security.krb5 
> (without internal). So the fully class name is: 
> com.ibm.security.krb5.KrbException
> Full stack trace from the logs:
> [2019-03-27 06:50:00,113] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.NoClassDefFoundError: 
> org.apache.kafka.common.security.kerberos.KerberosError (initialization 
> failure)
>     at 
> java.lang.J9VMInternals.initializationAlreadyFailed(J9VMInternals.java:96)
>     at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:384)
>     at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:256)
>     at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:132)
>     at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
>     at kafka.network.Processor.poll(SocketServer.scala:689)
>     at kafka.network.Processor.run(SocketServer.scala:594)
>     at java.lang.Thread.run(Thread.java:811)
> Caused by: org.apache.kafka.common.KafkaException: Kerberos exceptions could 
> not be initialized
>     at 
> org.apache.kafka.common.security.kerberos.KerberosError.(KerberosError.java:59)
>     ... 8 more
> Caused by: java.lang.ClassNotFoundException: 
> com.ibm.security.krb5.internal.KrbException
>     at java.lang.Class.forNameImpl(Native Method)
>     at java.lang.Class.forName(Class.java:297)
>     at 
> org.apache.kafka.common.security.kerberos.KerberosError.(KerberosError.java:53)
>     ... 8 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2019-03-29 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804777#comment-16804777
 ] 

Rajini Sivaram commented on KAFKA-8154:
---

The assumption we make is that an incoming packet with the SSL session's packet 
buffer size can be unwrapped into an application buffer with the SSL session's 
application buffer size and the application buffer size is bigger than the 
unwrapped data (hence the >= check).


We guarantee that we don't allocate a buffer that is larger than the total of 
SSL session's buffer sizes per-connection since this is typically used to 
calculate the total memory required for brokers based on maximum connection 
count and can lead to OOM if we arbitrarily increased the value. So I dont 
think we can apply the fix in the PR 
[https://github.com/apache/kafka/pull/5785].

 

With the standard SSL session packet size in normal JDKs, in order to hit the 
exception in PR [https://github.com/apache/kafka/pull/6495,] application buffer 
size needs to be 16384 (lower than the value typically used). But if we set 
application buffer size to 16384 and use the normal network read buffer size, 
for example with an Oracle 1.8 JDK,  instead of throwing IllegalStateException, 
we would end up with a infinite loop with large packets that cannot be 
unwrapped with an application buffer size of 16384. 

 

I think we need to better understand the issue here. It will be good to know if 
the issues encountered in both the failing scenarios (in the two PRs) were the 
same. A stack trace from the first one will be helpful for this. It will also 
be useful to get buffer sizes (netReadBufferSize(), netWriteBufferSize() and 
applicationBufferSize()) for the two environments. At the moment, we just have 
the one number 16384 as application buffer size and we dont expect that to 
work. Obviously if we can recreate with an unit test, that would be good, but 
it may be easier to write an unit test once we can identify the exact issue 
that we need to fix.

> Buffer Overflow exceptions between brokers and with clients
> ---
>
> Key: KAFKA-8154
> URL: https://issues.apache.org/jira/browse/KAFKA-8154
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Rajesh Nataraja
>Priority: Major
> Attachments: server.properties.txt
>
>
> https://github.com/apache/kafka/pull/6495
> https://github.com/apache/kafka/pull/5785



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8175) The broker block some minutes may occur expired error message for a period of time

2019-03-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804684#comment-16804684
 ] 

ASF GitHub Bot commented on KAFKA-8175:
---

huangyiminghappy commented on pull request #6522: KAFKA-8175: Remove streams 
overrides on repartition topics 
URL: https://github.com/apache/kafka/pull/6522
 
 
   As described in  (https://issues.apache.org/jira/browse/KAFKA-8175).  
   if one of the node is block in the cluster,and when the client can not send 
updateMetaData to the antother node,the client will print much log like 
`org.apache.kafka.common.errors.TimeoutException: Expiring 1062 record(s) for 
kafka_test_111-8: 23967 ms has passed since batch creation plus linger timeFri 
Mar 29 11:34:14 CST 2019
   ` . in someTimes the controller can not  find the broker is down is offline 
soon,then client's batches can not send to the offline node,and also can not 
trigger the update metaData.so we need to check the connection's read state,if 
it not ready in the config time,close the channel,and trigger update the 
metaData
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The broker block some minutes may occur expired error message for a period of 
> time  
> 
>
> Key: KAFKA-8175
> URL: https://issues.apache.org/jira/browse/KAFKA-8175
> Project: Kafka
>  Issue Type: Improvement
>Reporter: huangyiming
>Priority: Minor
>
> when the broker block some minutes, the producer may occur expired error 
> message for a period of time,that may continued for a period of time. if the 
> broker cluster's ip is 100,101,102,and the controller is the 100,when the 101 
> block 2minutes(you can use gdb simulation,and attach the pid for 2 
> minutes,last quit it),  if the controller can not find the machine 101 
> offline in time(for example the controller found 101 offline  only 60 seconds 
> later ),and the controller send leaderAndIsr only 60 seconds later,and in the
> RecordAccumulator's batches may occur much   deliveryTimeout. and the 
> topicAndParttion'leader in 101 may occur expired error,and can not send 
> update metadata to another 100 or 102,because the record in 101 can not 
> send,and can not trigger timeout to update the metadata.
> so i use 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8175) The broker block some minutes may occur expired error message for a period of time

2019-03-29 Thread huangyiming (JIRA)
huangyiming created KAFKA-8175:
--

 Summary: The broker block some minutes may occur expired error 
message for a period of time  
 Key: KAFKA-8175
 URL: https://issues.apache.org/jira/browse/KAFKA-8175
 Project: Kafka
  Issue Type: Improvement
Reporter: huangyiming


when the broker block some minutes, the producer may occur expired error 
message for a period of time,that may continued for a period of time. if the 
broker cluster's ip is 100,101,102,and the controller is the 100,when the 101 
block 2minutes(you can use gdb simulation,and attach the pid for 2 minutes,last 
quit it),  if the controller can not find the machine 101 offline in time(for 
example the controller found 101 offline  only 60 seconds later ),and the 
controller send leaderAndIsr only 60 seconds later,and in the

RecordAccumulator's batches may occur much   deliveryTimeout. and the 
topicAndParttion'leader in 101 may occur expired error,and can not send update 
metadata to another 100 or 102,because the record in 101 can not send,and can 
not trigger timeout to update the metadata.

so i use 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)