[jira] [Commented] (KAFKA-7962) StickyAssignor: throws NullPointerException during assignments if topic is deleted

2019-02-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16774898#comment-16774898
 ] 

ASF GitHub Bot commented on KAFKA-7962:
---

huxihx commented on pull request #6308: KAFKA-7962: Avoid NPE for StickyAssignor
URL: https://github.com/apache/kafka/pull/6308
 
 
   https://issues.apache.org/jira/browse/KAFKA-7962
   
   Consumer using StickyAssignor throws NullPointerException if a subscribed 
topic was removed.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StickyAssignor: throws NullPointerException during assignments if topic is 
> deleted
> --
>
> Key: KAFKA-7962
> URL: https://issues.apache.org/jira/browse/KAFKA-7962
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.1.0
> Environment: 1. MacOS, com.salesforce.kafka.test.KafkaTestUtils (kind 
> of embedded kafka integration tests)
> 2. Linux, dockerised kafka and our service
>Reporter: Oleg Smirnov
>Assignee: huxihx
>Priority: Major
> Attachments: NPE-StickyAssignor-issues.apache.log
>
>
> Integration tests with  com.salesforce.kafka.test.KafkaTestUtils, local 
> setup, StickyAssignor used, local topics are created / removed, one topic is 
> created in the beginning of test and without unsubscribing from it - deleted.
> Same happens in real environment.
>  
>  # have single "topic" with 1 partition
>  # single consumer subscribed to this "topic" (StickyAssignor)
>  # delete "topic"
> =>
>  * rebalance starts, topic partition(s) is revoked
>  * on assignment StickyAssignor throws exception (line 223), because 
> partitionsPerTopic.("topic") returns null in for loop (topic deleted - no 
> partitions are present)
>  
> In the provided log part, tearDown() causes topic deletion, while consumer 
> still running and tries to poll data from topic.
> RangeAssignor works fine (revokes partition, assigns empty set).
> Problem doesn't have workaround (like handle i in onPartitionsAssigned and 
> remove unsubscribe topic), because everything happens before listener called.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7962) StickyAssignor: throws NullPointerException during assignments if topic is deleted

2019-02-22 Thread huxihx (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-7962:
-

Assignee: huxihx

> StickyAssignor: throws NullPointerException during assignments if topic is 
> deleted
> --
>
> Key: KAFKA-7962
> URL: https://issues.apache.org/jira/browse/KAFKA-7962
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.1.0
> Environment: 1. MacOS, com.salesforce.kafka.test.KafkaTestUtils (kind 
> of embedded kafka integration tests)
> 2. Linux, dockerised kafka and our service
>Reporter: Oleg Smirnov
>Assignee: huxihx
>Priority: Major
> Attachments: NPE-StickyAssignor-issues.apache.log
>
>
> Integration tests with  com.salesforce.kafka.test.KafkaTestUtils, local 
> setup, StickyAssignor used, local topics are created / removed, one topic is 
> created in the beginning of test and without unsubscribing from it - deleted.
> Same happens in real environment.
>  
>  # have single "topic" with 1 partition
>  # single consumer subscribed to this "topic" (StickyAssignor)
>  # delete "topic"
> =>
>  * rebalance starts, topic partition(s) is revoked
>  * on assignment StickyAssignor throws exception (line 223), because 
> partitionsPerTopic.("topic") returns null in for loop (topic deleted - no 
> partitions are present)
>  
> In the provided log part, tearDown() causes topic deletion, while consumer 
> still running and tries to poll data from topic.
> RangeAssignor works fine (revokes partition, assigns empty set).
> Problem doesn't have workaround (like handle i in onPartitionsAssigned and 
> remove unsubscribe topic), because everything happens before listener called.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7970) Missing topic causes service shutdown without exception

2019-02-22 Thread Jonny Heavey (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16774941#comment-16774941
 ] 

Jonny Heavey commented on KAFKA-7970:
-

Thanks for your swift reply Matthias,

I have set both an exception handler and a state listener as you suggest, but 
neither is triggered helpfully during these 'missing topic' circumstances:
 * No exception hits the uncaughtExceptionHandler
 * The stateListener receives transitions for RUNNING and REBALANCING only (it 
appears that the user 'Or' is seeing the same behaviour)

After these state transitions, the StreamThread logs that shutdown is complete, 
and nothing else happens (no further state transitions to listen for etc).

 

I'm fairly new to Kafka, so apologies if this is incorrect, but I wonder if the 
StreamThread should throw and exception when its shutting down for this reason, 
so that the uncaughtExceptionHandler receives notification, or that the 
stateListener isn't unset, so it (presumably) continues to receives these 
further state transitions?

 

> Missing topic causes service shutdown without exception
> ---
>
> Key: KAFKA-7970
> URL: https://issues.apache.org/jira/browse/KAFKA-7970
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Jonny Heavey
>Priority: Minor
>
> When launching a KafkaStreams application that depends on a topic that 
> doesn't exist, the streams application correctly logs an error such as:
> " is unknown yet during rebalance, please make sure they have 
> been pre-created before starting the Streams application."
> The stream is then shutdown, however, no exception is thrown indicating that 
> an error has occurred.
> In our circumstances, we run our streams app inside a container. The streams 
> service is shutdown, but the process is not exited, meaning that the 
> container does not crash (reducing visibility of the issue).
> As no exception is thrown in the missing topic scenario described above, our 
> application code has no way to determine that something is wrong that would 
> then allow it to terminate the process.
>  
> Could the onPartitionsAssigned method in StreamThread.java throw an exception 
> when it decides to shutdown the stream (somewhere around line 264)?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7970) Missing topic causes service shutdown without exception

2019-02-22 Thread Jonny Heavey (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16774941#comment-16774941
 ] 

Jonny Heavey edited comment on KAFKA-7970 at 2/22/19 9:30 AM:
--

Thanks for your swift reply Matthias,

I have set both an exception handler and a state listener as you suggest, but 
neither is triggered helpfully during these 'missing topic' circumstances:
 * No exception hits the uncaughtExceptionHandler
 * The stateListener receives transitions for RUNNING and REBALANCING only (it 
appears that the user 'Or' is seeing the same behaviour)

After these state transitions, the StreamThread logs that shutdown is complete, 
and nothing else happens (no further state transitions to listen for etc).

 

I'm fairly new to Kafka, so apologies if this is incorrect, but I wonder if the 
StreamThread could throw an exception when its shutting down for this reason, 
so that the uncaughtExceptionHandler receives notification, and/or that the 
stateListener isn't unset (as descirbed by 'Or'), so it could continue 
receiving further state transitions during shutdown?

 


was (Author: jonnyheavey):
Thanks for your swift reply Matthias,

I have set both an exception handler and a state listener as you suggest, but 
neither is triggered helpfully during these 'missing topic' circumstances:
 * No exception hits the uncaughtExceptionHandler
 * The stateListener receives transitions for RUNNING and REBALANCING only (it 
appears that the user 'Or' is seeing the same behaviour)

After these state transitions, the StreamThread logs that shutdown is complete, 
and nothing else happens (no further state transitions to listen for etc).

 

I'm fairly new to Kafka, so apologies if this is incorrect, but I wonder if the 
StreamThread should throw and exception when its shutting down for this reason, 
so that the uncaughtExceptionHandler receives notification, or that the 
stateListener isn't unset, so it (presumably) continues to receives these 
further state transitions?

 

> Missing topic causes service shutdown without exception
> ---
>
> Key: KAFKA-7970
> URL: https://issues.apache.org/jira/browse/KAFKA-7970
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Jonny Heavey
>Priority: Minor
>
> When launching a KafkaStreams application that depends on a topic that 
> doesn't exist, the streams application correctly logs an error such as:
> " is unknown yet during rebalance, please make sure they have 
> been pre-created before starting the Streams application."
> The stream is then shutdown, however, no exception is thrown indicating that 
> an error has occurred.
> In our circumstances, we run our streams app inside a container. The streams 
> service is shutdown, but the process is not exited, meaning that the 
> container does not crash (reducing visibility of the issue).
> As no exception is thrown in the missing topic scenario described above, our 
> application code has no way to determine that something is wrong that would 
> then allow it to terminate the process.
>  
> Could the onPartitionsAssigned method in StreamThread.java throw an exception 
> when it decides to shutdown the stream (somewhere around line 264)?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7970) Missing topic causes service shutdown without exception

2019-02-22 Thread Or (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16774956#comment-16774956
 ] 

Or commented on KAFKA-7970:
---

 

If anyone wants to easily produce the issue, I created a small test for it: 
[https://pastebin.com/43Zb4uhb]

(By the way, it also affect the latest version: 2.1.1)

> Missing topic causes service shutdown without exception
> ---
>
> Key: KAFKA-7970
> URL: https://issues.apache.org/jira/browse/KAFKA-7970
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Jonny Heavey
>Priority: Minor
>
> When launching a KafkaStreams application that depends on a topic that 
> doesn't exist, the streams application correctly logs an error such as:
> " is unknown yet during rebalance, please make sure they have 
> been pre-created before starting the Streams application."
> The stream is then shutdown, however, no exception is thrown indicating that 
> an error has occurred.
> In our circumstances, we run our streams app inside a container. The streams 
> service is shutdown, but the process is not exited, meaning that the 
> container does not crash (reducing visibility of the issue).
> As no exception is thrown in the missing topic scenario described above, our 
> application code has no way to determine that something is wrong that would 
> then allow it to terminate the process.
>  
> Could the onPartitionsAssigned method in StreamThread.java throw an exception 
> when it decides to shutdown the stream (somewhere around line 264)?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7970) Missing topic causes service shutdown without exception

2019-02-22 Thread Or (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16774956#comment-16774956
 ] 

Or edited comment on KAFKA-7970 at 2/22/19 9:59 AM:


If anyone wants to easily produce the issue, I created a small test for it: 
[https://pastebin.com/43Zb4uhb]

(By the way, it also affect the latest version: 2.1.1)


was (Author: or):
 

If anyone wants to easily produce the issue, I created a small test for it: 
[https://pastebin.com/43Zb4uhb]

(By the way, it also affect the latest version: 2.1.1)

> Missing topic causes service shutdown without exception
> ---
>
> Key: KAFKA-7970
> URL: https://issues.apache.org/jira/browse/KAFKA-7970
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Jonny Heavey
>Priority: Minor
>
> When launching a KafkaStreams application that depends on a topic that 
> doesn't exist, the streams application correctly logs an error such as:
> " is unknown yet during rebalance, please make sure they have 
> been pre-created before starting the Streams application."
> The stream is then shutdown, however, no exception is thrown indicating that 
> an error has occurred.
> In our circumstances, we run our streams app inside a container. The streams 
> service is shutdown, but the process is not exited, meaning that the 
> container does not crash (reducing visibility of the issue).
> As no exception is thrown in the missing topic scenario described above, our 
> application code has no way to determine that something is wrong that would 
> then allow it to terminate the process.
>  
> Could the onPartitionsAssigned method in StreamThread.java throw an exception 
> when it decides to shutdown the stream (somewhere around line 264)?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4090) JVM runs into OOM if (Java) client uses a SSL port without setting the security protocol

2019-02-22 Thread Matteo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16774981#comment-16774981
 ] 

Matteo commented on KAFKA-4090:
---

This is happening also on Apache Kafka 2.1.0

> JVM runs into OOM if (Java) client uses a SSL port without setting the 
> security protocol
> 
>
> Key: KAFKA-4090
> URL: https://issues.apache.org/jira/browse/KAFKA-4090
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1, 0.10.0.1
>Reporter: jaikiran pai
>Priority: Major
>
> Quoting from the mail thread that was sent to Kafka mailing list:
> {quote}
> We have been using Kafka 0.9.0.1 (server and Java client libraries). So far 
> we had been using it with plaintext transport but recently have been 
> considering upgrading to using SSL. It mostly works except that a 
> mis-configured producer (and even consumer) causes a hard to relate 
> OutOfMemory exception and thus causing the JVM in which the client is 
> running, to go into a bad state. We can consistently reproduce that OOM very 
> easily. We decided to check if this is something that is fixed in 0.10.0.1 so 
> upgraded one of our test systems to that version (both server and client 
> libraries) but still see the same issue. Here's how it can be easily 
> reproduced
> 1. Enable SSL listener on the broker via server.properties, as per the Kafka 
> documentation
> {code}
> listeners=PLAINTEXT://:9092,SSL://:9093
> ssl.keystore.location=
> ssl.keystore.password=pass
> ssl.key.password=pass
> ssl.truststore.location=
> ssl.truststore.password=pass
> {code}
> 2. Start zookeeper and kafka server
> 3. Create a "oom-test" topic (which will be used for these tests):
> {code}
> kafka-topics.sh --zookeeper localhost:2181 --create --topic oom-test  
> --partitions 1 --replication-factor 1
> {code}
> 4. Create a simple producer which sends a single message to the topic via 
> Java (new producer) APIs:
> {code}
> public class OOMTest {
> public static void main(final String[] args) throws Exception {
> final Properties kafkaProducerConfigs = new Properties();
> // NOTE: Intentionally use a SSL port without specifying 
> security.protocol as SSL
> 
> kafkaProducerConfigs.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9093");
> 
> kafkaProducerConfigs.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> 
> kafkaProducerConfigs.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>  StringSerializer.class.getName());
> try (KafkaProducer producer = new 
> KafkaProducer<>(kafkaProducerConfigs)) {
> System.out.println("Created Kafka producer");
> final String topicName = "oom-test";
> final String message = "Hello OOM!";
> // send a message to the topic
> final Future recordMetadataFuture = 
> producer.send(new ProducerRecord<>(topicName, message));
> final RecordMetadata sentRecordMetadata = 
> recordMetadataFuture.get();
> System.out.println("Sent message '" + message + "' to topic '" + 
> topicName + "'");
> }
> System.out.println("Tests complete");
> }
> }
> {code}
> Notice that the server URL is using a SSL endpoint localhost:9093 but isn't 
> specifying any of the other necessary SSL configs like security.protocol.
> 5. For the sake of easily reproducing this issue run this class with a max 
> heap size of 256MB (-Xmx256M). Running this code throws up the following 
> OutOfMemoryError in one of the Sender threads:
> {code}
> 18:33:25,770 ERROR [KafkaThread] - Uncaught exception in 
> kafka-producer-network-thread | producer-1:
> java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
> at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Note that I set it to 256MB as heap size to easily reproduce it but this 
> isn't specific to that size. We have been able to reproduce it 

[jira] [Created] (KAFKA-7981) Add Replica Fetcher and Log Cleaner Count Metrics

2019-02-22 Thread Viktor Somogyi-Vass (JIRA)
Viktor Somogyi-Vass created KAFKA-7981:
--

 Summary: Add Replica Fetcher and Log Cleaner Count Metrics
 Key: KAFKA-7981
 URL: https://issues.apache.org/jira/browse/KAFKA-7981
 Project: Kafka
  Issue Type: Improvement
  Components: metrics
Affects Versions: 2.3.0
Reporter: Viktor Somogyi-Vass
Assignee: Viktor Somogyi-Vass






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7971) Producer in Streams environment

2019-02-22 Thread Maciej Lizewski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775007#comment-16775007
 ] 

Maciej Lizewski commented on KAFKA-7971:


I know I can use transformer and in fact I do now (kindly please check 
stackoverflow link where I put snippet). The "problem" is that this processing 
only depend on wall clock time and state store. Attaching this transformer to 
some input stream is only needed to introduce component to streams environment 
and in fact all input messages are just dropped. State store is updated in 
different stream that produces different output. I am thinking about creating 
"dummy" topic without any producer just to attach there my transformer. In my 
opinion this is not very nice solution. It would be nice to have something 
like: 

KStream StreamsBuilder::producer(ProducerInterface, local_stores);

the component would be initialized same way transformers and processors are 
with ProcessorContext so it has access to stores, it could even register 
punctuator connected with wall_clock and forward newly created events to 
processing network. It is just a thought, improvement...

> Producer in Streams environment
> ---
>
> Key: KAFKA-7971
> URL: https://issues.apache.org/jira/browse/KAFKA-7971
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Maciej Lizewski
>Priority: Minor
>  Labels: newbie
>
> Would be nice to have Producers that can emit messages to topic just like any 
> producer but also have access to local stores from streams environment in 
> Spring.
> consider case: I have event sourced ordering process like this:
> [EVENTS QUEUE] -> [MERGING PROCESS] -> [ORDERS CHANGELOG/KTABLE]
> Merging process uses local storage "opened orders" to easily apply new 
> changes.
> Now I want to implement process of closing abandoned orders (orders that were 
> started, but for too long there was no change and they hang in beginning 
> status). Easiest way is to periodically scan "opened orders" store and 
> produce "abandon event" for every order that meets criteria. The obnly way 
> now i to create Transformer with punctuator and connect output to [EVENTS 
> QUEUE]. That is obvious. but Transformer must be also connected to some input 
> stream, but these events must be dropped as we want only the punctuator 
> results. This causes unnecessary overhead in processing input messages 
> (although they are just dropped) and it is not very elegant.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7981) Add Replica Fetcher and Log Cleaner Count Metrics

2019-02-22 Thread Viktor Somogyi-Vass (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-7981:
---
Labels: kip  (was: )

> Add Replica Fetcher and Log Cleaner Count Metrics
> -
>
> Key: KAFKA-7981
> URL: https://issues.apache.org/jira/browse/KAFKA-7981
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 2.3.0
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>  Labels: kip
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7925) Constant 100% cpu usage by all kafka brokers

2019-02-22 Thread Abhi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775061#comment-16775061
 ] 

Abhi commented on KAFKA-7925:
-

Any updates on this?

>From the threaddump, I see has 
>'kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-0' locked 
>'0x0006ca1c9a80' and doesn't seem to be making progress. The other network 
>threads 'kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-1' 
>and  and 
>'kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-2' are 
>waiting to lock '0x0006ca1c9a80'.

This is causing no new connection requests to be accepted. Can you please check 
this.



> Constant 100% cpu usage by all kafka brokers
> 
>
> Key: KAFKA-7925
> URL: https://issues.apache.org/jira/browse/KAFKA-7925
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
> Environment: Java 11, Kafka v2.1.0
>Reporter: Abhi
>Priority: Critical
> Attachments: threadump20190212.txt
>
>
> Hi,
> I am seeing constant 100% cpu usage on all brokers in our kafka cluster even 
> without any clients connected to any broker.
> This is a bug that we have seen multiple times in our kafka setup that is not 
> yet open to clients. It is becoming a blocker for our deployment now.
> I am seeing lot of connections to other brokers in CLOSE_WAIT state (see 
> below). In thread usage, I am seeing these threads 
> 'kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-0,kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-1,kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-2'
>  taking up more than 90% of the cpu time in a 60s interval.
> I have attached a thread dump of one of the brokers in the cluster.
> *Java version:*
> openjdk 11.0.2 2019-01-15
> OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
> OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)
> *Kafka verison:* v2.1.0
>  
> *connections:*
> java 144319 kafkagod 88u IPv4 3063266 0t0 TCP *:35395 (LISTEN)
> java 144319 kafkagod 89u IPv4 3063267 0t0 TCP *:9144 (LISTEN)
> java 144319 kafkagod 104u IPv4 3064219 0t0 TCP 
> mwkafka-prod-02.tbd:47292->mwkafka-zk-prod-05.tbd:2181 (ESTABLISHED)
> java 144319 kafkagod 2003u IPv4 3055115 0t0 TCP *:9092 (LISTEN)
> java 144319 kafkagod 2013u IPv4 7220110 0t0 TCP 
> mwkafka-prod-02.tbd:60724->mwkafka-zk-prod-04.dr:2181 (ESTABLISHED)
> java 144319 kafkagod 2020u IPv4 30012904 0t0 TCP 
> mwkafka-prod-02.tbd:38988->mwkafka-prod-02.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2021u IPv4 30012961 0t0 TCP 
> mwkafka-prod-02.tbd:58420->mwkafka-prod-01.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2027u IPv4 30015723 0t0 TCP 
> mwkafka-prod-02.tbd:58398->mwkafka-prod-01.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2028u IPv4 30015630 0t0 TCP 
> mwkafka-prod-02.tbd:36248->mwkafka-prod-02.dr:9092 (ESTABLISHED)
> java 144319 kafkagod 2030u IPv4 30015726 0t0 TCP 
> mwkafka-prod-02.tbd:39012->mwkafka-prod-02.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2031u IPv4 30013619 0t0 TCP 
> mwkafka-prod-02.tbd:38986->mwkafka-prod-02.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2032u IPv4 30015604 0t0 TCP 
> mwkafka-prod-02.tbd:36246->mwkafka-prod-02.dr:9092 (ESTABLISHED)
> java 144319 kafkagod 2033u IPv4 30012981 0t0 TCP 
> mwkafka-prod-02.tbd:36924->mwkafka-prod-01.dr:9092 (ESTABLISHED)
> java 144319 kafkagod 2034u IPv4 30012967 0t0 TCP 
> mwkafka-prod-02.tbd:39036->mwkafka-prod-02.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2035u IPv4 30012898 0t0 TCP 
> mwkafka-prod-02.tbd:36866->mwkafka-prod-01.dr:9092 (FIN_WAIT2)
> java 144319 kafkagod 2036u IPv4 30004729 0t0 TCP 
> mwkafka-prod-02.tbd:36882->mwkafka-prod-01.dr:9092 (ESTABLISHED)
> java 144319 kafkagod 2037u IPv4 30004914 0t0 TCP 
> mwkafka-prod-02.tbd:58426->mwkafka-prod-01.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2038u IPv4 30015651 0t0 TCP 
> mwkafka-prod-02.tbd:36884->mwkafka-prod-01.dr:9092 (ESTABLISHED)
> java 144319 kafkagod 2039u IPv4 30012966 0t0 TCP 
> mwkafka-prod-02.tbd:58422->mwkafka-prod-01.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2040u IPv4 30005643 0t0 TCP 
> mwkafka-prod-02.tbd:36252->mwkafka-prod-02.dr:9092 (ESTABLISHED)
> java 144319 kafkagod 2041u IPv4 30012944 0t0 TCP 
> mwkafka-prod-02.tbd:36286->mwkafka-prod-02.dr:9092 (ESTABLISHED)
> java 144319 kafkagod 2042u IPv4 30012973 0t0 TCP 
> mwkafka-prod-02.tbd:9092->mwkafka-prod-01.nyc:51924 (ESTABLISHED)
> java 144319 kafkagod 2043u sock 0,7 0t0 30012463 protocol: TCP
> java 144319 kafkagod 2044u IPv4 30012979 0t0 TCP 
> mwkafka-prod-02.tbd:9092->mwkafka-prod-01.dr:39994 (ESTABLISHED)
> java 144319 kafkagod 2045u IPv4 30012899 0t0 TCP 
> mwkafka-prod-02.tbd:9092->mwkafka-prod-02.nyc:34548 (ESTABLISHED)
> java 144319 kafkagod 2046u sock 0,7 0t0 

[jira] [Comment Edited] (KAFKA-7971) Producer in Streams environment

2019-02-22 Thread Maciej Lizewski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775007#comment-16775007
 ] 

Maciej Lizewski edited comment on KAFKA-7971 at 2/22/19 12:51 PM:
--

I know I can use transformer and in fact I do now (kindly please check 
stackoverflow link where I put snippet). The "problem" is that this processing 
only depends on wall clock time and state store. Attaching this transformer to 
some input stream is only needed to introduce component to streams environment 
and in fact all input messages are just dropped. State store is updated in 
different stream that produces different output. I am thinking about creating 
"dummy" topic without any producer just to attach there my transformer. In my 
opinion this is not very nice solution. It would be nice to have something 
like: 

KStream StreamsBuilder::producer(ProducerInterface, local_stores);

the component would be initialized same way transformers and processors are 
with ProcessorContext so it has access to stores, it could even register 
punctuator connected with wall_clock and forward newly created events to 
processing network. It is just a thought, improvement...


was (Author: redguy666):
I know I can use transformer and in fact I do now (kindly please check 
stackoverflow link where I put snippet). The "problem" is that this processing 
only depend on wall clock time and state store. Attaching this transformer to 
some input stream is only needed to introduce component to streams environment 
and in fact all input messages are just dropped. State store is updated in 
different stream that produces different output. I am thinking about creating 
"dummy" topic without any producer just to attach there my transformer. In my 
opinion this is not very nice solution. It would be nice to have something 
like: 

KStream StreamsBuilder::producer(ProducerInterface, local_stores);

the component would be initialized same way transformers and processors are 
with ProcessorContext so it has access to stores, it could even register 
punctuator connected with wall_clock and forward newly created events to 
processing network. It is just a thought, improvement...

> Producer in Streams environment
> ---
>
> Key: KAFKA-7971
> URL: https://issues.apache.org/jira/browse/KAFKA-7971
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Maciej Lizewski
>Priority: Minor
>  Labels: newbie
>
> Would be nice to have Producers that can emit messages to topic just like any 
> producer but also have access to local stores from streams environment in 
> Spring.
> consider case: I have event sourced ordering process like this:
> [EVENTS QUEUE] -> [MERGING PROCESS] -> [ORDERS CHANGELOG/KTABLE]
> Merging process uses local storage "opened orders" to easily apply new 
> changes.
> Now I want to implement process of closing abandoned orders (orders that were 
> started, but for too long there was no change and they hang in beginning 
> status). Easiest way is to periodically scan "opened orders" store and 
> produce "abandon event" for every order that meets criteria. The obnly way 
> now i to create Transformer with punctuator and connect output to [EVENTS 
> QUEUE]. That is obvious. but Transformer must be also connected to some input 
> stream, but these events must be dropped as we want only the punctuator 
> results. This causes unnecessary overhead in processing input messages 
> (although they are just dropped) and it is not very elegant.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7982) ConcurrentModificationException and Continuous warnings "Attempting to send response via channel for which there is no open connection"

2019-02-22 Thread Abhi (JIRA)
Abhi created KAFKA-7982:
---

 Summary: ConcurrentModificationException and Continuous warnings 
"Attempting to send response via channel for which there is no open connection"
 Key: KAFKA-7982
 URL: https://issues.apache.org/jira/browse/KAFKA-7982
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.1.1
Reporter: Abhi


Hi,

I am getting follow warnings in server.log continuosly and due to this client 
consumer is not able to consumer messages.

[2019-02-20 10:26:30,312] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.218.27.45:9092-10.219.25.239:35248-6259 (kafka.network.Processor)
[2019-02-20 10:26:56,760] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.218.27.45:9092-10.219.25.239:35604-6261 (kafka.network.Processor)

I also noticed that before these warnings started to appear, following 
concurrent modification exception for the same IP address:

[2019-02-20 09:01:11,175] INFO Initiating logout for 
kafka/u-kafkatst-kafkadev-1.sd@unix.com 
(org.apache.kafka.common.security.kerberos.KerberosLogin)
[2019-02-20 09:01:11,176] WARN [SocketServer brokerId=1] Unexpected error from 
/10.219.25.239; closing connection (org.apache.kafka.common.network.Selector)
java.util.ConcurrentModificationException
at 
java.base/java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:970)
at java.base/java.util.LinkedList$ListItr.next(LinkedList.java:892)
at 
java.base/javax.security.auth.Subject$SecureSet$1.next(Subject.java:1096)
at 
java.base/javax.security.auth.Subject$ClassSet$1.run(Subject.java:1501)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at 
java.base/javax.security.auth.Subject$ClassSet.populateSet(Subject.java:1499)
at 
java.base/javax.security.auth.Subject$ClassSet.(Subject.java:1472)
at 
java.base/javax.security.auth.Subject.getPrivateCredentials(Subject.java:764)
at java.security.jgss/sun.security.jgss.GSSUtil$1.run(GSSUtil.java:336)
at java.security.jgss/sun.security.jgss.GSSUtil$1.run(GSSUtil.java:328)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at 
java.security.jgss/sun.security.jgss.GSSUtil.searchSubject(GSSUtil.java:328)
at 
java.security.jgss/sun.security.jgss.wrapper.NativeGSSFactory.getCredFromSubject(NativeGSSFactory.java:53)
at 
java.security.jgss/sun.security.jgss.wrapper.NativeGSSFactory.getCredentialElement(NativeGSSFactory.java:116)
at 
java.security.jgss/sun.security.jgss.GSSManagerImpl.getCredentialElement(GSSManagerImpl.java:187)
at 
java.security.jgss/sun.security.jgss.GSSCredentialImpl.add(GSSCredentialImpl.java:439)
at 
java.security.jgss/sun.security.jgss.GSSCredentialImpl.(GSSCredentialImpl.java:74)
at 
java.security.jgss/sun.security.jgss.GSSManagerImpl.createCredential(GSSManagerImpl.java:148)
at 
jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Server.(GssKrb5Server.java:108)
at 
jdk.security.jgss/com.sun.security.sasl.gsskerb.FactoryImpl.createSaslServer(FactoryImpl.java:85)
at 
java.security.sasl/javax.security.sasl.Sasl.createSaslServer(Sasl.java:537)
at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.lambda$createSaslKerberosServer$12(SaslServerAuthenticator.java:212)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.createSaslKerberosServer(SaslServerAuthenticator.java:211)
at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.createSaslServer(SaslServerAuthenticator.java:164)
at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleKafkaRequest(SaslServerAuthenticator.java:450)
at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:248)
at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:132)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532)
at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
at kafka.network.Processor.poll(SocketServer.scala:689)
at kafka.network.Processor.run(SocketServer.scala:594)
at java.base/java.lang.Thread.run(Thread.java:834)
[2019-02-22 00:18:29,439] INFO Initiating re-login for 
kafka/u-kafkatst-kafkadev-1.sd.deshaw@unix.deshaw.com 
(org.apache.kafka.common.security.kerberos.KerberosLogin)
[2019-02-22 00:18:29,440] WARN [SocketServer brokerId=1] Unexpected error from 
/10.219.25.239; closing connection (org.

[jira] [Updated] (KAFKA-7982) ConcurrentModificationException and Continuous warnings "Attempting to send response via channel for which there is no open connection"

2019-02-22 Thread Abhi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhi updated KAFKA-7982:

Description: 
Hi,

I am getting following warnings in server.log continuosly and due to this 
client consumer is not able to consumer messages.

[2019-02-20 10:26:30,312] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.218.27.45:9092-10.219.25.239:35248-6259 (kafka.network.Processor)
 [2019-02-20 10:26:56,760] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.218.27.45:9092-10.219.25.239:35604-6261 (kafka.network.Processor)

I also noticed that before these warnings started to appear, following 
concurrent modification exception for the same IP address:

[2019-02-20 09:01:11,175] INFO Initiating logout for 
kafka/u-kafkatst-kafkadev-1.sd@unix.com 
(org.apache.kafka.common.security.kerberos.KerberosLogin)
 [2019-02-20 09:01:11,176] WARN [SocketServer brokerId=1] Unexpected error from 
/10.219.25.239; closing connection (org.apache.kafka.common.network.Selector)
 java.util.ConcurrentModificationException
 at 
java.base/java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:970)
 at java.base/java.util.LinkedList$ListItr.next(LinkedList.java:892)
 at java.base/javax.security.auth.Subject$SecureSet$1.next(Subject.java:1096)
 at java.base/javax.security.auth.Subject$ClassSet$1.run(Subject.java:1501)
 at java.base/java.security.AccessController.doPrivileged(Native Method)
 at 
java.base/javax.security.auth.Subject$ClassSet.populateSet(Subject.java:1499)
 at java.base/javax.security.auth.Subject$ClassSet.(Subject.java:1472)
 at 
java.base/javax.security.auth.Subject.getPrivateCredentials(Subject.java:764)
 at java.security.jgss/sun.security.jgss.GSSUtil$1.run(GSSUtil.java:336)
 at java.security.jgss/sun.security.jgss.GSSUtil$1.run(GSSUtil.java:328)
 at java.base/java.security.AccessController.doPrivileged(Native Method)
 at java.security.jgss/sun.security.jgss.GSSUtil.searchSubject(GSSUtil.java:328)
 at 
java.security.jgss/sun.security.jgss.wrapper.NativeGSSFactory.getCredFromSubject(NativeGSSFactory.java:53)
 at 
java.security.jgss/sun.security.jgss.wrapper.NativeGSSFactory.getCredentialElement(NativeGSSFactory.java:116)
 at 
java.security.jgss/sun.security.jgss.GSSManagerImpl.getCredentialElement(GSSManagerImpl.java:187)
 at 
java.security.jgss/sun.security.jgss.GSSCredentialImpl.add(GSSCredentialImpl.java:439)
 at 
java.security.jgss/sun.security.jgss.GSSCredentialImpl.(GSSCredentialImpl.java:74)
 at 
java.security.jgss/sun.security.jgss.GSSManagerImpl.createCredential(GSSManagerImpl.java:148)
 at 
jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Server.(GssKrb5Server.java:108)
 at 
jdk.security.jgss/com.sun.security.sasl.gsskerb.FactoryImpl.createSaslServer(FactoryImpl.java:85)
 at java.security.sasl/javax.security.sasl.Sasl.createSaslServer(Sasl.java:537)
 at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.lambda$createSaslKerberosServer$12(SaslServerAuthenticator.java:212)
 at java.base/java.security.AccessController.doPrivileged(Native Method)
 at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
 at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.createSaslKerberosServer(SaslServerAuthenticator.java:211)
 at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.createSaslServer(SaslServerAuthenticator.java:164)
 at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleKafkaRequest(SaslServerAuthenticator.java:450)
 at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:248)
 at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:132)
 at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
 at kafka.network.Processor.poll(SocketServer.scala:689)
 at kafka.network.Processor.run(SocketServer.scala:594)
 at java.base/java.lang.Thread.run(Thread.java:834)
 [2019-02-22 00:18:29,439] INFO Initiating re-login for 
kafka/u-kafkatst-kafkadev-1.sd.deshaw@unix.deshaw.com 
(org.apache.kafka.common.security.kerberos.KerberosLogin)
 [2019-02-22 00:18:29,440] WARN [SocketServer brokerId=1] Unexpected error from 
/10.219.25.239; closing connection (org.apache.kafka.common.network.Selector)
 org.apache.kafka.common.KafkaException: Principal could not be determined from 
Subject, this may be a transient failure due to Kerberos re-login
 at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.firstPrincipal(SaslClientAuthenticator.java:435)
 at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.createSaslKerberosServer(SaslServerAuthenticator.java:177)
 at 
org.apache.kafka.common.security.authenticat

[jira] [Updated] (KAFKA-7981) Add Replica Fetcher and Log Cleaner Count Metrics

2019-02-22 Thread Viktor Somogyi-Vass (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-7981:
---
Description: 
In some occasions we detected errors where replica fetcher threads or log 
cleaners died because of an unrecoverable error and caused more serious issues 
in the brokers (from lagging to offline replicas, filling up disks, etc.). It 
would often help if the monitoring systems attached to Kafka could detect these 
problems early on as it would allow a prompt response from the user and the 
greater possibility of capturing the root cause.


> Add Replica Fetcher and Log Cleaner Count Metrics
> -
>
> Key: KAFKA-7981
> URL: https://issues.apache.org/jira/browse/KAFKA-7981
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 2.3.0
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>  Labels: kip
>
> In some occasions we detected errors where replica fetcher threads or log 
> cleaners died because of an unrecoverable error and caused more serious 
> issues in the brokers (from lagging to offline replicas, filling up disks, 
> etc.). It would often help if the monitoring systems attached to Kafka could 
> detect these problems early on as it would allow a prompt response from the 
> user and the greater possibility of capturing the root cause.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7982) ConcurrentModificationException and Continuous warnings "Attempting to send response via channel for which there is no open connection"

2019-02-22 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775251#comment-16775251
 ] 

Ismael Juma commented on KAFKA-7982:


cc [~rsivaram] [~omkreddy]

> ConcurrentModificationException and Continuous warnings "Attempting to send 
> response via channel for which there is no open connection"
> ---
>
> Key: KAFKA-7982
> URL: https://issues.apache.org/jira/browse/KAFKA-7982
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: Abhi
>Priority: Major
>
> Hi,
> I am getting following warnings in server.log continuosly and due to this 
> client consumer is not able to consumer messages.
> [2019-02-20 10:26:30,312] WARN Attempting to send response via channel for 
> which there is no open connection, connection id 
> 10.218.27.45:9092-10.219.25.239:35248-6259 (kafka.network.Processor)
>  [2019-02-20 10:26:56,760] WARN Attempting to send response via channel for 
> which there is no open connection, connection id 
> 10.218.27.45:9092-10.219.25.239:35604-6261 (kafka.network.Processor)
> I also noticed that before these warnings started to appear, following 
> concurrent modification exception for the same IP address:
> [2019-02-20 09:01:11,175] INFO Initiating logout for 
> kafka/u-kafkatst-kafkadev-1.sd@unix.com 
> (org.apache.kafka.common.security.kerberos.KerberosLogin)
>  [2019-02-20 09:01:11,176] WARN [SocketServer brokerId=1] Unexpected error 
> from /10.219.25.239; closing connection 
> (org.apache.kafka.common.network.Selector)
>  java.util.ConcurrentModificationException
>  at 
> java.base/java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:970)
>  at java.base/java.util.LinkedList$ListItr.next(LinkedList.java:892)
>  at java.base/javax.security.auth.Subject$SecureSet$1.next(Subject.java:1096)
>  at java.base/javax.security.auth.Subject$ClassSet$1.run(Subject.java:1501)
>  at java.base/java.security.AccessController.doPrivileged(Native Method)
>  at 
> java.base/javax.security.auth.Subject$ClassSet.populateSet(Subject.java:1499)
>  at java.base/javax.security.auth.Subject$ClassSet.(Subject.java:1472)
>  at 
> java.base/javax.security.auth.Subject.getPrivateCredentials(Subject.java:764)
>  at java.security.jgss/sun.security.jgss.GSSUtil$1.run(GSSUtil.java:336)
>  at java.security.jgss/sun.security.jgss.GSSUtil$1.run(GSSUtil.java:328)
>  at java.base/java.security.AccessController.doPrivileged(Native Method)
>  at 
> java.security.jgss/sun.security.jgss.GSSUtil.searchSubject(GSSUtil.java:328)
>  at 
> java.security.jgss/sun.security.jgss.wrapper.NativeGSSFactory.getCredFromSubject(NativeGSSFactory.java:53)
>  at 
> java.security.jgss/sun.security.jgss.wrapper.NativeGSSFactory.getCredentialElement(NativeGSSFactory.java:116)
>  at 
> java.security.jgss/sun.security.jgss.GSSManagerImpl.getCredentialElement(GSSManagerImpl.java:187)
>  at 
> java.security.jgss/sun.security.jgss.GSSCredentialImpl.add(GSSCredentialImpl.java:439)
>  at 
> java.security.jgss/sun.security.jgss.GSSCredentialImpl.(GSSCredentialImpl.java:74)
>  at 
> java.security.jgss/sun.security.jgss.GSSManagerImpl.createCredential(GSSManagerImpl.java:148)
>  at 
> jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Server.(GssKrb5Server.java:108)
>  at 
> jdk.security.jgss/com.sun.security.sasl.gsskerb.FactoryImpl.createSaslServer(FactoryImpl.java:85)
>  at 
> java.security.sasl/javax.security.sasl.Sasl.createSaslServer(Sasl.java:537)
>  at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.lambda$createSaslKerberosServer$12(SaslServerAuthenticator.java:212)
>  at java.base/java.security.AccessController.doPrivileged(Native Method)
>  at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
>  at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.createSaslKerberosServer(SaslServerAuthenticator.java:211)
>  at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.createSaslServer(SaslServerAuthenticator.java:164)
>  at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleKafkaRequest(SaslServerAuthenticator.java:450)
>  at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:248)
>  at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:132)
>  at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532)
>  at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
>  at kafka.network.Processor.poll(SocketServer.scala:689)
>  at kafka.network.Processor.run(SocketServer.scala:594)
>  at java.base/java.lang.Thread.run(Thread.java:834)
>  [

[jira] [Commented] (KAFKA-7492) Explain `null` handling for reduce and aggregate

2019-02-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775386#comment-16775386
 ] 

ASF GitHub Bot commented on KAFKA-7492:
---

bbejeck commented on pull request #6285: KAFKA-7492 : Updated javadocs for 
aggregate and reduce methods returning null behavior.
URL: https://github.com/apache/kafka/pull/6285
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Explain `null` handling for reduce and aggregate
> 
>
> Key: KAFKA-7492
> URL: https://issues.apache.org/jira/browse/KAFKA-7492
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, docs, newbie
>
> We currently don't explain how records with `null` value are handled in 
> reduce and aggregate. In particular, what happens when the users' 
> aggregation/reduce `apply()` implementation returns `null`.
> We should update the JavaDocs accordingly and maybe also update the docs on 
> the web page.
> Cf. 
> https://stackoverflow.com/questions/52692202/what-happens-if-the-aggregator-of-a-kgroupedstream-returns-null



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7864) AdminZkClient.validateTopicCreate() should validate that partitions are 0-based

2019-02-22 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7864.

   Resolution: Fixed
Fix Version/s: 2.3.0

Merged the PR to trunk.

> AdminZkClient.validateTopicCreate() should validate that partitions are 
> 0-based
> ---
>
> Key: KAFKA-7864
> URL: https://issues.apache.org/jira/browse/KAFKA-7864
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Ryan
>Priority: Major
>  Labels: newbie
> Fix For: 2.3.0
>
>
> AdminZkClient.validateTopicCreate() currently doesn't validate that partition 
> ids in a topic are consecutive, starting from 0. The client code depends on 
> that. So, it would be useful to tighten up the check.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7864) AdminZkClient.validateTopicCreate() should validate that partitions are 0-based

2019-02-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775388#comment-16775388
 ] 

ASF GitHub Bot commented on KAFKA-7864:
---

junrao commented on pull request #6246: KAFKA-7864; validate partitions are 
0-based
URL: https://github.com/apache/kafka/pull/6246
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> AdminZkClient.validateTopicCreate() should validate that partitions are 
> 0-based
> ---
>
> Key: KAFKA-7864
> URL: https://issues.apache.org/jira/browse/KAFKA-7864
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Ryan
>Priority: Major
>  Labels: newbie
>
> AdminZkClient.validateTopicCreate() currently doesn't validate that partition 
> ids in a topic are consecutive, starting from 0. The client code depends on 
> that. So, it would be useful to tighten up the check.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7492) Explain `null` handling for reduce and aggregate

2019-02-22 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-7492:
--

Assignee: (was: Bill Bejeck)

> Explain `null` handling for reduce and aggregate
> 
>
> Key: KAFKA-7492
> URL: https://issues.apache.org/jira/browse/KAFKA-7492
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, docs, newbie
>
> We currently don't explain how records with `null` value are handled in 
> reduce and aggregate. In particular, what happens when the users' 
> aggregation/reduce `apply()` implementation returns `null`.
> We should update the JavaDocs accordingly and maybe also update the docs on 
> the web page.
> Cf. 
> https://stackoverflow.com/questions/52692202/what-happens-if-the-aggregator-of-a-kgroupedstream-returns-null



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7492) Explain `null` handling for reduce and aggregate

2019-02-22 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-7492.

Resolution: Fixed

Thanks asutosh for the contribution!

> Explain `null` handling for reduce and aggregate
> 
>
> Key: KAFKA-7492
> URL: https://issues.apache.org/jira/browse/KAFKA-7492
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, docs, newbie
>
> We currently don't explain how records with `null` value are handled in 
> reduce and aggregate. In particular, what happens when the users' 
> aggregation/reduce `apply()` implementation returns `null`.
> We should update the JavaDocs accordingly and maybe also update the docs on 
> the web page.
> Cf. 
> https://stackoverflow.com/questions/52692202/what-happens-if-the-aggregator-of-a-kgroupedstream-returns-null



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7492) Explain `null` handling for reduce and aggregate

2019-02-22 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-7492:
--

Assignee: Bill Bejeck

> Explain `null` handling for reduce and aggregate
> 
>
> Key: KAFKA-7492
> URL: https://issues.apache.org/jira/browse/KAFKA-7492
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Minor
>  Labels: beginner, docs, newbie
>
> We currently don't explain how records with `null` value are handled in 
> reduce and aggregate. In particular, what happens when the users' 
> aggregation/reduce `apply()` implementation returns `null`.
> We should update the JavaDocs accordingly and maybe also update the docs on 
> the web page.
> Cf. 
> https://stackoverflow.com/questions/52692202/what-happens-if-the-aggregator-of-a-kgroupedstream-returns-null



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7492) Explain `null` handling for reduce and aggregate

2019-02-22 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-7492:
---
Affects Version/s: 2.3.0

> Explain `null` handling for reduce and aggregate
> 
>
> Key: KAFKA-7492
> URL: https://issues.apache.org/jira/browse/KAFKA-7492
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, docs, newbie
>
> We currently don't explain how records with `null` value are handled in 
> reduce and aggregate. In particular, what happens when the users' 
> aggregation/reduce `apply()` implementation returns `null`.
> We should update the JavaDocs accordingly and maybe also update the docs on 
> the web page.
> Cf. 
> https://stackoverflow.com/questions/52692202/what-happens-if-the-aggregator-of-a-kgroupedstream-returns-null



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7983) supporting replication.throttled.replicas in dynamic broker configuration

2019-02-22 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7983:
--

 Summary: supporting replication.throttled.replicas in dynamic 
broker configuration
 Key: KAFKA-7983
 URL: https://issues.apache.org/jira/browse/KAFKA-7983
 Project: Kafka
  Issue Type: New Feature
  Components: core
Reporter: Jun Rao


In 
[KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration#KIP-226-DynamicBrokerConfiguration-DefaultTopicconfigs],
 we added the support to change broker defaults dynamically. However, it didn't 
support changing leader.replication.throttled.replicas and 
follower.replication.throttled.replicas. These 2 configs were introduced in 
[KIP-73|https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas]
 and controls the set of topic partitions on which replication throttling will 
be engaged. One useful case is to be able to set a default value for both 
configs to * to allow throttling to be engaged for all topic partitions. 
Currently, the static default value for both configs are ignored for 
replication throttling, it would be useful to fix that as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7984) Do not rebuild leader epochs on segments that do not support it

2019-02-22 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7984:
--

 Summary: Do not rebuild leader epochs on segments that do not 
support it
 Key: KAFKA-7984
 URL: https://issues.apache.org/jira/browse/KAFKA-7984
 Project: Kafka
  Issue Type: Bug
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


h3. Preface

https://issues.apache.org/jira/browse/KAFKA-7897 (logs would store some leader 
epochs even if they did not support them - this is essentially a regression 
from https://issues.apache.org/jira/browse/KAFKA-7415)
https://issues.apache.org/jira/browse/KAFKA-7959

If users are running Kafka with 
https://issues.apache.org/jira/browse/KAFKA-7415 merged in, chances are they 
have sparsely-populated leader epoch cache files.
KAFKA-7897's implementation unintentionally handled the case of deletes those 
leader epoch cache files for versions 2.1+. For versions below, KAFKA-7959 
fixes that.

In any case, as it currently stands, a broker started up with a message format 
of `0.10.0` will have those leader epoch cache files deleted.


h3. Problem

We have logic [that rebuilds these leader epoch cache 
files|https://github.com/apache/kafka/blob/217f45ed554b34d5221e1dd3db76e4be892661cf/core/src/main/scala/kafka/log/Log.scala#L614]
 when recovering segments that do not have a clean shutdown file. It goes over 
the record batches and rebuilds the leader epoch.
KAFKA-7959's implementation guards against this by checking that the 
log.message.format supports it, *but* that issue is only merged for versions 
*below 2.1*.

Moreover, the case where `log.message.format >= 0.11` *is not handled*. If a 
broker has the following log segment file:
{code:java}
offset 0, format v2, epoch 1
offset 1, format v2, epoch 1
offset 2, format v1, no epoch
offset 3, format v1, no epoch
{code}
and gets upgraded to a new log message format that supports it, the rebuild of 
any logs that had an unclean shutdown will populate the leader epoch cache 
again, potentially resulting in the issue described in KAFKA-7897

One potential simple way to solve this is to clear the accumulated leader epoch 
cache when encountering a batch with no epoch upon segment rebuilding.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7502) Cleanup KTable materialization logic in a single place

2019-02-22 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7502:
-
Description: 
Today since we pre-create all the `KTableXXX` operator along with the logical 
node, we are effectively duplicating the logic to determine whether the 
resulted KTable should be materialized. More specifically, the materialization 
principle today is that:

1) If users specified Materialized in the DSL and it contains a queryable name. 
We always materialize.
2) If users specified Materialized in the DSL but not contains a queryable 
name, or if users do not specify a Materialized object at all, Streams may 
choose to materialize or not. But in any cases, even if the KTable is 
materialized it will not be queryable since there's no queryable name (i.e. 
only storeName is not null, but queryableName is null):
2.a) If the resulted KTable is from an aggregation, we always materialize since 
it is needed for storing the aggregation (i.e. we use the MaterializedInternal 
constructor with nameProvider != null).
2.b) If the resulted KTable is from a source topic, we delay the 
materialization until the downstream operator requires this KTable to be 
materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
2.c) If the resulted KTable if from a join, we always materialize. However this 
can be optimized similar to 2.b) but is orthogonal to this ticket (see 
`KTableImpl#buildJoin` where we always use constructor with nameProvider != 
null).
2.d) If the resulted KTable is from a stateless operation like filter / 
mapValues, we never materialize.



Now, in all of these cases, we have logical node like "KTableKTableJoinNode", 
as well as physical node like `ProcessorNode`. Ideally we should always create 
the logical Plan (i.e. the StreamsGraph), and then optimize it if necessary, 
and then generate the physical plan (i.e. the Topology), however today we 
create some physical nodes beforehand, and the above logic is hence duplicated 
in the creation of both physical nodes and logical nodes. For example, in 
`KTableKTableJoinNode` we check if Materialized is null for adding a state 
store, and in `KTableImpl#doJoin` we check if materialized is specified (case 
2.c) above). 

Another example is in TableProcessorNode which is used for 2.d) above, in which 
it includes the logic whereas its caller, `KTableImpl#doFilter` for example, 
also contains the logic when deciding to pass `queryableName` parameter to 
`KTableProcessorSupplier`.

This is bug-vulnerable since we may update the logic in one class but forgot to 
update the other class.

--

What we want to have is a cleaner code path similar to what we have for 2.b), 
such that when creating the logical nodes we keep track of whether 1) 
materialized is specified, and 2) queryable name is provided. And during 
optimization phase, we may change the inner physical ProcessorBuilder's 
parameters like queryable name etc, and then when it is time to generate the 
physical node, we can just blindly take the parameters and go for it.


  was:Today since we pre-create all the `KTableXXX` operator along with the 
logical node, we are effectively duplicating the logic to determine whether the 
resulted KTable should be materialized. For example, in `KTableKTableJoinNode` 
and in `KTableImpl#doJoin`. This is bug-vulnerable since we may update the 
logic in one class but forgot to update the other class.


> Cleanup KTable materialization logic in a single place
> --
>
> Key: KAFKA-7502
> URL: https://issues.apache.org/jira/browse/KAFKA-7502
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Minor
>
> Today since we pre-create all the `KTableXXX` operator along with the logical 
> node, we are effectively duplicating the logic to determine whether the 
> resulted KTable should be materialized. More specifically, the 
> materialization principle today is that:
> 1) If users specified Materialized in the DSL and it contains a queryable 
> name. We always materialize.
> 2) If users specified Materialized in the DSL but not contains a queryable 
> name, or if users do not specify a Materialized object at all, Streams may 
> choose to materialize or not. But in any cases, even if the KTable is 
> materialized it will not be queryable since there's no queryable name (i.e. 
> only storeName is not null, but queryableName is null):
> 2.a) If the resulted KTable is from an aggregation, we always materialize 
> since it is needed for storing the aggregation (i.e. we use the 
> MaterializedInternal constructor with nameProvider != null).
> 2.b) If the resulted KTable is from a source topic, we delay the 
> materialization until the downstream operator r

[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-02-22 Thread Gwen Shapira (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775486#comment-16775486
 ] 

Gwen Shapira commented on KAFKA-7965:
-

Assigning to Stanislav because https://github.com/apache/kafka/pull/6238 may 
fix this.

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Stanislav Kozlovski
>Priority: Critical
> Fix For: 2.2.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-02-22 Thread Gwen Shapira (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira reassigned KAFKA-7965:
---

Assignee: Stanislav Kozlovski

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Stanislav Kozlovski
>Priority: Critical
> Fix For: 2.2.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7672) The local state not fully restored after KafkaStream rebalanced, resulting in data loss

2019-02-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775500#comment-16775500
 ] 

ASF GitHub Bot commented on KAFKA-7672:
---

guozhangwang commented on pull request #6113: KAFKA-7672: Restoring tasks need 
to be closed upon task suspension
URL: https://github.com/apache/kafka/pull/6113
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The local state not fully restored after KafkaStream rebalanced, resulting in 
> data loss
> ---
>
> Key: KAFKA-7672
> URL: https://issues.apache.org/jira/browse/KAFKA-7672
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
>Reporter: linyue li
>Assignee: linyue li
>Priority: Critical
> Fix For: 2.2.0
>
>
> Normally, when a task is migrated to a new thread and no checkpoint file was 
> found under its task folder, Kafka Stream needs to restore the local state 
> for remote changelog topic completely and then resume running. However, in 
> some scenarios, we found that Kafka Stream *NOT* restore this state even no 
> checkpoint was found, but just clean the state folder and transition to 
> running state directly, resulting the historic data loss. 
> To be specific, I will give the detailed logs for Kafka Stream in our project 
> to show this scenario: 
> {quote}2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Revoking previously assigned partitions [AuditTrailBatch-0-5]
> 2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to 
> PARTITIONS_REVOKED
> 2018-10-23 08:27:10,856 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> (Re-)joining group
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Successfully joined group with generation 323
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to 
> PARTITIONS_ASSIGNED
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1*
> 2018-10-23 08:27:53,622 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms.
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.streams.processor.internals.StoreChangelogReader - 
> stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task 
> 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* 
> *Reinitializing the task and restore its state from the beginning.*
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher  - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting 
> offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
> 2018-10-23 08:27:54,653 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to 
> RUNNING*
> {quote}
> From the logs above, we can get the procedure for thread 
> AuditTrailBatch-StreamThread-1:
>  # the previous running task assigned to thread 1 is task 0_5 (the 
> corresponding partition is AuditTrailBatch-0-5)
>  # group begins to rebalance, the new task 1_1 is assigned to thread 1.
>  # no checkpoint was found under 1_1 state folder, so reset the offset to 0 
> and clean the local state folder.
>  # thread 1 transitions to RUNNING state directly without the restoration for 
> task 1_1, so the historic data for state 1_1 is lost for thread 1. 
> *

[jira] [Created] (KAFKA-7985) Cleanup AssignedTasks / AbstractTask logic

2019-02-22 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-7985:


 Summary: Cleanup AssignedTasks / AbstractTask logic
 Key: KAFKA-7985
 URL: https://issues.apache.org/jira/browse/KAFKA-7985
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang


Today the life time of a task is:

created -> [initializeStateStores] -> 
restoring (writes to the initialized state stores) -> [initializeTopology] -> 
running -> [closeTopology] -> 
suspended -> [closeStateManager] -> 
dead

And hence the assigned tasks contains the following non-overlapping sets : 
created, restoring, running, suspended, (dead tasks do no need to be 
maintained). Normally `created` should be empty since once a task is created it 
should move on transit to either restoring or running immediately. So whenever 
we are suspending tasks, we should go through these sets and act accordingly:

1. `created` and `suspended`: just check these two sets are always empty.
2. `running`: transit to `suspended`.
3. `restoring`: transite to `suspended`. But the difference here is that we do 
not need to close topology since it was not created yet at all; we just need to 
remember the restored position, and keep the restorers on hold instead of 
clearing all of them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7985) Cleanup AssignedTasks / AbstractTask logic

2019-02-22 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7985:
-
Component/s: streams

> Cleanup AssignedTasks / AbstractTask logic
> --
>
> Key: KAFKA-7985
> URL: https://issues.apache.org/jira/browse/KAFKA-7985
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Today the life time of a task is:
> created -> [initializeStateStores] -> 
> restoring (writes to the initialized state stores) -> [initializeTopology] -> 
> running -> [closeTopology] -> 
> suspended -> [closeStateManager] -> 
> dead
> And hence the assigned tasks contains the following non-overlapping sets : 
> created, restoring, running, suspended, (dead tasks do no need to be 
> maintained). Normally `created` should be empty since once a task is created 
> it should move on transit to either restoring or running immediately. So 
> whenever we are suspending tasks, we should go through these sets and act 
> accordingly:
> 1. `created` and `suspended`: just check these two sets are always empty.
> 2. `running`: transit to `suspended`.
> 3. `restoring`: transite to `suspended`. But the difference here is that we 
> do not need to close topology since it was not created yet at all; we just 
> need to remember the restored position, and keep the restorers on hold 
> instead of clearing all of them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7937) Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup

2019-02-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775526#comment-16775526
 ] 

ASF GitHub Bot commented on KAFKA-7937:
---

gwenshap commented on pull request #6311: KAFKA-7937: Fix Flaky Test 
ResetConsumerGroupOffsetTest.testResetOffs…
URL: https://github.com/apache/kafka/pull/6311
 
 
   Address the comments on PR-6307. Sorry for new PR, but one of the comments 
was to move the PR to another branch.
   
   ***
   Since the test fails sometimes on lack of coordinator, I'm giving it a bit 
more attempts to find it.
   
   I admit that I haven't been able to actually reproduce this failure, so I'm 
only hoping this fixes it. But it doesn't fail more often than it used to (on 
my machine)
   
   Fixing on 2.2 because the intent is to fix enough flakes to allow for a 
clean release.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup
> 
>
> Key: KAFKA-7937
> URL: https://issues.apache.org/jira/browse/KAFKA-7937
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, clients, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Gwen Shapira
>Priority: Critical
> Fix For: 2.2.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/19/pipeline
> {quote}kafka.admin.ResetConsumerGroupOffsetTest > 
> testResetOffsetsNotExistingGroup FAILED 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available. at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:306)
>  at 
> kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:89)
>  Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7937) Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup

2019-02-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775527#comment-16775527
 ] 

ASF GitHub Bot commented on KAFKA-7937:
---

gwenshap commented on pull request #6307: KAFKA-7937: Fix Flaky Test 
ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup
URL: https://github.com/apache/kafka/pull/6307
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup
> 
>
> Key: KAFKA-7937
> URL: https://issues.apache.org/jira/browse/KAFKA-7937
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, clients, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Gwen Shapira
>Priority: Critical
> Fix For: 2.2.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/19/pipeline
> {quote}kafka.admin.ResetConsumerGroupOffsetTest > 
> testResetOffsetsNotExistingGroup FAILED 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available. at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:306)
>  at 
> kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:89)
>  Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7986) distinguish the logging from different ZooKeeperClient instances

2019-02-22 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7986:
--

 Summary: distinguish the logging from different ZooKeeperClient 
instances
 Key: KAFKA-7986
 URL: https://issues.apache.org/jira/browse/KAFKA-7986
 Project: Kafka
  Issue Type: Improvement
Reporter: Jun Rao


It's possible for each broker to have more than 1 ZooKeeperClient instance. For 
example, SimpleAclAuthorizer creates a separate ZooKeeperClient instance when 
configured. It would be useful to distinguish the logging from different 
ZooKeeperClient instances.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7986) distinguish the logging from different ZooKeeperClient instances

2019-02-22 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775560#comment-16775560
 ] 

Jun Rao commented on KAFKA-7986:


This can potentially be done by passing in a clientID string in the constructor 
of ZooKeeperClient.

> distinguish the logging from different ZooKeeperClient instances
> 
>
> Key: KAFKA-7986
> URL: https://issues.apache.org/jira/browse/KAFKA-7986
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Priority: Major
>  Labels: newbie
>
> It's possible for each broker to have more than 1 ZooKeeperClient instance. 
> For example, SimpleAclAuthorizer creates a separate ZooKeeperClient instance 
> when configured. It would be useful to distinguish the logging from different 
> ZooKeeperClient instances.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7987) a broker's ZK session may die on transient auth failure

2019-02-22 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7987:
--

 Summary: a broker's ZK session may die on transient auth failure
 Key: KAFKA-7987
 URL: https://issues.apache.org/jira/browse/KAFKA-7987
 Project: Kafka
  Issue Type: Improvement
Reporter: Jun Rao


After a transient network issue, we saw the following log in a broker.
{code:java}
[23:37:02,102] ERROR SASL authentication with Zookeeper Quorum member failed: 
javax.security.sasl.SaslException: An error: 
(java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
GSS initiate failed [Caused by GSSException: No valid credentials provided 
(Mechanism level: Server not found in Kerberos database (7))]) occurred when 
evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client will 
go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn)
[23:37:02,102] ERROR [ZooKeeperClient] Auth failed. 
(kafka.zookeeper.ZooKeeperClient)
{code}
The network issue prevented the broker from communicating to ZK. The broker's 
ZK session then expired, but the broker didn't know that yet since it couldn't 
establish a connection to ZK. When the network was back, the broker tried to 
establish a connection to ZK, but failed due to auth failure (likely due to a 
transient KDC issue). The current logic just ignores the auth failure without 
trying to create a new ZK session. Then the broker will be permanently in a 
state that it's alive, but not registered in ZK.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7987) a broker's ZK session may die on transient auth failure

2019-02-22 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775565#comment-16775565
 ] 

Jun Rao commented on KAFKA-7987:


One potential way to fix this is to handle auth failure in ZooKeeperClient in 
the same way as session expiration by constantly retrying establishing the 
connection until success.

> a broker's ZK session may die on transient auth failure
> ---
>
> Key: KAFKA-7987
> URL: https://issues.apache.org/jira/browse/KAFKA-7987
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Priority: Major
>
> After a transient network issue, we saw the following log in a broker.
> {code:java}
> [23:37:02,102] ERROR SASL authentication with Zookeeper Quorum member failed: 
> javax.security.sasl.SaslException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Server not found in Kerberos database (7))]) occurred when 
> evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client 
> will go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn)
> [23:37:02,102] ERROR [ZooKeeperClient] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> {code}
> The network issue prevented the broker from communicating to ZK. The broker's 
> ZK session then expired, but the broker didn't know that yet since it 
> couldn't establish a connection to ZK. When the network was back, the broker 
> tried to establish a connection to ZK, but failed due to auth failure (likely 
> due to a transient KDC issue). The current logic just ignores the auth 
> failure without trying to create a new ZK session. Then the broker will be 
> permanently in a state that it's alive, but not registered in ZK.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7938) Flaky Test DeleteConsumerGroupsTest#testDeleteCmdWithShortInitialization

2019-02-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775583#comment-16775583
 ] 

ASF GitHub Bot commented on KAFKA-7938:
---

gwenshap commented on pull request #6312: KAFKA-7938: Fix test flakiness in 
DeleteConsumerGroupsTest
URL: https://github.com/apache/kafka/pull/6312
 
 
   Address the comments on PR-6307. Sorry for new PR, but one of the comments 
was to move the PR to another branch.
   
   
   This attempts to fix KAFKA-7938 and KAFKA-7946.
   
   I removed two tests:
   
   * testDeleteWithShortInitialization basically didn't check the result and 
therefore always passed
   * testDeleteCmdWithShortInitialization has no way to enforce that 
initialization is indeed short, and therefore sometimes FAILED because the 
group would be created before the CMD tried to delete it.
   
   I thought the tests had limited value relative to the effort of figuring out 
a way to make the timing work.
   
   I also fixed testDeleteCmdNonEmptyGroup and testDeleteNonEmptyGroup so they 
will validate that the group both exists and is non-empty before starting the 
test itself. I also added some extra information for future debugging sessions 
:)
   
   I ran the tests LOTS of times to validate, but with flaky tests, it is hard 
to tell :)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky Test DeleteConsumerGroupsTest#testDeleteCmdWithShortInitialization
> 
>
> Key: KAFKA-7938
> URL: https://issues.apache.org/jira/browse/KAFKA-7938
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Gwen Shapira
>Priority: Critical
> Fix For: 2.2.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/18/pipeline
> {quote}kafka.admin.DeleteConsumerGroupsTest > 
> testDeleteCmdWithShortInitialization FAILED java.lang.AssertionError: The 
> consumer group deletion did not work as expected at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.admin.DeleteConsumerGroupsTest.testDeleteCmdWithShortInitialization(DeleteConsumerGroupsTest.scala:210){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7938) Flaky Test DeleteConsumerGroupsTest#testDeleteCmdWithShortInitialization

2019-02-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775584#comment-16775584
 ] 

ASF GitHub Bot commented on KAFKA-7938:
---

gwenshap commented on pull request #6306: KAFKA-7938: Fix test flakiness in 
DeleteConsumerGroupsTest
URL: https://github.com/apache/kafka/pull/6306
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky Test DeleteConsumerGroupsTest#testDeleteCmdWithShortInitialization
> 
>
> Key: KAFKA-7938
> URL: https://issues.apache.org/jira/browse/KAFKA-7938
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Gwen Shapira
>Priority: Critical
> Fix For: 2.2.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/18/pipeline
> {quote}kafka.admin.DeleteConsumerGroupsTest > 
> testDeleteCmdWithShortInitialization FAILED java.lang.AssertionError: The 
> consumer group deletion did not work as expected at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.admin.DeleteConsumerGroupsTest.testDeleteCmdWithShortInitialization(DeleteConsumerGroupsTest.scala:210){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment

2019-02-22 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775598#comment-16775598
 ] 

GEORGE LI commented on KAFKA-6794:
--

I also have seen this issue.  When more than one broker is in the New Replicas 
of the reassignments,  the topic is big,  even with throttle,  the leader is 
working hard to sync to all the extra followers and could cause latency jump. 

 

{{One of solutions is execute the reassignment plans in an "Optimal" way.  
Submit the reassignment plans in batches.   making sure each batch, the 
topic/partition will have only one extra New broker in the New Replicas,  wait 
till that reassignment completes, then resubmit another one.  e.g.  for if the 
reassignment is (1,2,3,4) =>  (5,6,7,8).   Split it in 4 batches (buckets), 
every batch only 1 new replica.  }}

 

{{Batch 1:  (1,2,3,5)}}

{{Batch 2:  (1,2,5,6)}}

{{Batch 3:  (1,5,6,7)}}

{{Batch 4:  (5,6,7,8)}}

 

Between each batch,  check ZK node /admin/reassign_partitions exists,  if yes, 
sleep and check again,   if not, submit next batch. 

 

 

> Support for incremental replica reassignment
> 
>
> Key: KAFKA-6794
> URL: https://issues.apache.org/jira/browse/KAFKA-6794
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Say you have a replication factor of 4 and you trigger a reassignment which 
> moves all replicas to new brokers. Now 8 replicas are fetching at the same 
> time which means you need to account for 8 times the current producer load 
> plus the catch-up replication. To make matters worse, the replicas won't all 
> become in-sync at the same time; in the worst case, you could have 7 replicas 
> in-sync while one is still catching up. Currently, the old replicas won't be 
> disabled until all new replicas are in-sync. This makes configuring the 
> throttle tricky since ISR traffic is not subject to it.
> Rather than trying to bring all 4 new replicas online at the same time, a 
> friendlier approach would be to do it incrementally: bring one replica 
> online, bring it in-sync, then remove one of the old replicas. Repeat until 
> all replicas have been changed. This would reduce the impact of a 
> reassignment and make configuring the throttle easier at the cost of a slower 
> overall reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6794) Support for incremental replica reassignment

2019-02-22 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775598#comment-16775598
 ] 

GEORGE LI edited comment on KAFKA-6794 at 2/22/19 9:21 PM:
---

I also have seen this issue.  When more than one broker is in the New Replicas 
of the reassignments,  the topic is big,  even with throttle,  the leader is 
working hard to sync to all the extra followers and could cause latency jump. 

 

{{One of the solutions is to execute the reassignment plans in an "Optimal" 
way.  Submit the reassignment plans in batches.   making sure each batch, the 
topic/partition will have only one extra New broker in the New Replicas,  wait 
till that reassignment completes, then resubmit another one.  e.g.  if the 
reassignment is (1,2,3,4) =>  (5,6,7,8),    Split it in 4 batches (buckets), 
every batch only 1 new replica.  }}

 

{{Batch 1:  (1,2,3,5)}}

{{Batch 2:  (1,2,5,6)}}

{{Batch 3:  (1,5,6,7)}}

{{Batch 4:  (5,6,7,8)}}

 

Between each batch,  check ZK node /admin/reassign_partitions exists,  if yes, 
sleep and check again,   if not, submit next batch. 

 

 


was (Author: sql_consulting):
I also have seen this issue.  When more than one broker is in the New Replicas 
of the reassignments,  the topic is big,  even with throttle,  the leader is 
working hard to sync to all the extra followers and could cause latency jump. 

 

{{One of solutions is execute the reassignment plans in an "Optimal" way.  
Submit the reassignment plans in batches.   making sure each batch, the 
topic/partition will have only one extra New broker in the New Replicas,  wait 
till that reassignment completes, then resubmit another one.  e.g.  for if the 
reassignment is (1,2,3,4) =>  (5,6,7,8).   Split it in 4 batches (buckets), 
every batch only 1 new replica.  }}

 

{{Batch 1:  (1,2,3,5)}}

{{Batch 2:  (1,2,5,6)}}

{{Batch 3:  (1,5,6,7)}}

{{Batch 4:  (5,6,7,8)}}

 

Between each batch,  check ZK node /admin/reassign_partitions exists,  if yes, 
sleep and check again,   if not, submit next batch. 

 

 

> Support for incremental replica reassignment
> 
>
> Key: KAFKA-6794
> URL: https://issues.apache.org/jira/browse/KAFKA-6794
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Say you have a replication factor of 4 and you trigger a reassignment which 
> moves all replicas to new brokers. Now 8 replicas are fetching at the same 
> time which means you need to account for 8 times the current producer load 
> plus the catch-up replication. To make matters worse, the replicas won't all 
> become in-sync at the same time; in the worst case, you could have 7 replicas 
> in-sync while one is still catching up. Currently, the old replicas won't be 
> disabled until all new replicas are in-sync. This makes configuring the 
> throttle tricky since ISR traffic is not subject to it.
> Rather than trying to bring all 4 new replicas online at the same time, a 
> friendlier approach would be to do it incrementally: bring one replica 
> online, bring it in-sync, then remove one of the old replicas. Repeat until 
> all replicas have been changed. This would reduce the impact of a 
> reassignment and make configuring the throttle easier at the cost of a slower 
> overall reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7959) Clear/delete epoch cache if old message format is in use

2019-02-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775669#comment-16775669
 ] 

ASF GitHub Bot commented on KAFKA-7959:
---

hachikuji commented on pull request #6298: KAFKA-7959: Delete leader epoch 
cache files with old message format versions
URL: https://github.com/apache/kafka/pull/6298
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Clear/delete epoch cache if old message format is in use
> 
>
> Key: KAFKA-7959
> URL: https://issues.apache.org/jira/browse/KAFKA-7959
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Stanislav Kozlovski
>Priority: Major
>
> Because of KAFKA-7897, it is possible to have a sparse epoch cache when using 
> the old message format. The fix for that issue addresses the problem of 
> improper use of that cache while the message format remains on an older 
> version. However, it leaves the possibility of misuse during a message format 
> upgrade, which can cause unexpected truncation and re-replication. To fix the 
> problem, we should delete or at least clear the cache whenever the old 
> message format is used.
> Note that this problem was fixed unintentionally in 2.1 with the patch for 
> KAFKA-7897. This issue applies specifically to the 2.0 branch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7778) Add KTable.suppress to Scala API

2019-02-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775681#comment-16775681
 ] 

ASF GitHub Bot commented on KAFKA-7778:
---

casey-green commented on pull request #6314: KAFKA-7778: Add KTable.suppress to 
Scala API
URL: https://github.com/apache/kafka/pull/6314
 
 
   ### Detailed description
   
   Adds `KTable.suppress` to the Scala API.
   
   NOTE: `KTable.count.suppress` **does not work**! If I replace 
`.aggregate(0L)((_, _, agg) => agg + 1)` with `count()` in my unit tests, I get 
this exception:
   
   ```
   Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to [B
at 
org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
at 
org.apache.kafka.streams.kstream.internals.FullChangeSerde$1.serialize(FullChangeSerde.java:73)
at 
org.apache.kafka.streams.kstream.internals.FullChangeSerde$1.serialize(FullChangeSerde.java:60)
at 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:96)
at 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
at 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
   ```
   
   This appears to be because the value serde is not getting passed down to the 
`KTable` in count: 
https://github.com/casey-green/kafka/blob/trunk/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala#L50.
 Subsequent `KTable`s then fallback to the default serde (byte array), which is 
incorrect. I tried fiddling with this code a bit to get it to work, to no avail.
   
   ### Testing strategy
   
   I added unit tests requested in this PR: 
https://github.com/apache/kafka/pull/6092
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add KTable.suppress to Scala API
> 
>
> Key: KAFKA-7778
> URL: https://issues.apache.org/jira/browse/KAFKA-7778
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Jacek Laskowski
>Assignee: John Roesler
>Priority: Major
>  Labels: newbie
>
> {{KTable.suppress}} is not available in Scala API.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7959) Clear/delete epoch cache if old message format is in use

2019-02-22 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7959.

   Resolution: Fixed
Fix Version/s: 2.0.2

> Clear/delete epoch cache if old message format is in use
> 
>
> Key: KAFKA-7959
> URL: https://issues.apache.org/jira/browse/KAFKA-7959
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Stanislav Kozlovski
>Priority: Major
> Fix For: 2.0.2
>
>
> Because of KAFKA-7897, it is possible to have a sparse epoch cache when using 
> the old message format. The fix for that issue addresses the problem of 
> improper use of that cache while the message format remains on an older 
> version. However, it leaves the possibility of misuse during a message format 
> upgrade, which can cause unexpected truncation and re-replication. To fix the 
> problem, we should delete or at least clear the cache whenever the old 
> message format is used.
> Note that this problem was fixed unintentionally in 2.1 with the patch for 
> KAFKA-7897. This issue applies specifically to the 2.0 branch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7492) Explain `null` handling for reduce and aggregate

2019-02-22 Thread Asutosh Pandya (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775728#comment-16775728
 ] 

Asutosh Pandya commented on KAFKA-7492:
---

[~mjsax] 

> Explain `null` handling for reduce and aggregate
> 
>
> Key: KAFKA-7492
> URL: https://issues.apache.org/jira/browse/KAFKA-7492
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, docs, newbie
>
> We currently don't explain how records with `null` value are handled in 
> reduce and aggregate. In particular, what happens when the users' 
> aggregation/reduce `apply()` implementation returns `null`.
> We should update the JavaDocs accordingly and maybe also update the docs on 
> the web page.
> Cf. 
> https://stackoverflow.com/questions/52692202/what-happens-if-the-aggregator-of-a-kgroupedstream-returns-null



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7492) Explain `null` handling for reduce and aggregate

2019-02-22 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-7492:
--

Assignee: Asutosh Pandya

> Explain `null` handling for reduce and aggregate
> 
>
> Key: KAFKA-7492
> URL: https://issues.apache.org/jira/browse/KAFKA-7492
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Asutosh Pandya
>Priority: Minor
>  Labels: beginner, docs, newbie
>
> We currently don't explain how records with `null` value are handled in 
> reduce and aggregate. In particular, what happens when the users' 
> aggregation/reduce `apply()` implementation returns `null`.
> We should update the JavaDocs accordingly and maybe also update the docs on 
> the web page.
> Cf. 
> https://stackoverflow.com/questions/52692202/what-happens-if-the-aggregator-of-a-kgroupedstream-returns-null



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7961) Handle subscription changes with a rebalance in progress

2019-02-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775799#comment-16775799
 ] 

ASF GitHub Bot commented on KAFKA-7961:
---

hachikuji commented on pull request #6304: KAFKA-7961: Ignore assignment for 
un-subscribed partitions
URL: https://github.com/apache/kafka/pull/6304
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle subscription changes with a rebalance in progress
> 
>
> Key: KAFKA-7961
> URL: https://issues.apache.org/jira/browse/KAFKA-7961
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> Due to wakeups or poll timeouts, it is possible to have a subscription 
> changed while a rebalance is in progress. This can lead to an illegal state 
> error such as the following if some of the assigned partitions no longer 
> match the subscription:
> {code}
> java.lang.IllegalArgumentException: Assigned partition foo-0 for 
> non-subscribed topic; subscription is [bar]
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:192)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:249)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:410)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:344)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:344)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
> {code}
> Rather than requiring the assignment received from a rebalance to match the 
> subscription, we should just request a rebalance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7961) Handle subscription changes with a rebalance in progress

2019-02-22 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-7961:
---
Component/s: consumer

> Handle subscription changes with a rebalance in progress
> 
>
> Key: KAFKA-7961
> URL: https://issues.apache.org/jira/browse/KAFKA-7961
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
> Fix For: 2.3.0
>
>
> Due to wakeups or poll timeouts, it is possible to have a subscription 
> changed while a rebalance is in progress. This can lead to an illegal state 
> error such as the following if some of the assigned partitions no longer 
> match the subscription:
> {code}
> java.lang.IllegalArgumentException: Assigned partition foo-0 for 
> non-subscribed topic; subscription is [bar]
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:192)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:249)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:410)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:344)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:344)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
> {code}
> Rather than requiring the assignment received from a rebalance to match the 
> subscription, we should just request a rebalance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-02-22 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775800#comment-16775800
 ] 

Matthias J. Sax commented on KAFKA-7965:


This just failed locally for me. Thus I guess there is fair chance to reproduce 
locally.
{quote}kafka.api.ConsumerBounceTest > 
testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup FAILED
 java.lang.AssertionError: Received 0, expected at least 68
 at org.junit.Assert.fail(Assert.java:89)
 at org.junit.Assert.assertTrue(Assert.java:42)
 at 
kafka.api.ConsumerBounceTest.kafka$api$ConsumerBounceTest$$receiveAndCommit(ConsumerBounceTest.scala:562)
 at 
kafka.api.ConsumerBounceTest$$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$2.apply(ConsumerBounceTest.scala:325)
 at 
kafka.api.ConsumerBounceTest$$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$2.apply(ConsumerBounceTest.scala:324)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at 
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:324)
{quote}

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Stanislav Kozlovski
>Priority: Critical
> Fix For: 2.2.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7672) The local state not fully restored after KafkaStream rebalanced, resulting in data loss

2019-02-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775806#comment-16775806
 ] 

ASF GitHub Bot commented on KAFKA-7672:
---

guozhangwang commented on pull request #6115:  KAFKA-7672 : force write 
checkpoint during StreamTask #suspend
URL: https://github.com/apache/kafka/pull/6115
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The local state not fully restored after KafkaStream rebalanced, resulting in 
> data loss
> ---
>
> Key: KAFKA-7672
> URL: https://issues.apache.org/jira/browse/KAFKA-7672
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
>Reporter: linyue li
>Assignee: linyue li
>Priority: Critical
> Fix For: 2.2.0
>
>
> Normally, when a task is migrated to a new thread and no checkpoint file was 
> found under its task folder, Kafka Stream needs to restore the local state 
> for remote changelog topic completely and then resume running. However, in 
> some scenarios, we found that Kafka Stream *NOT* restore this state even no 
> checkpoint was found, but just clean the state folder and transition to 
> running state directly, resulting the historic data loss. 
> To be specific, I will give the detailed logs for Kafka Stream in our project 
> to show this scenario: 
> {quote}2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Revoking previously assigned partitions [AuditTrailBatch-0-5]
> 2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to 
> PARTITIONS_REVOKED
> 2018-10-23 08:27:10,856 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> (Re-)joining group
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Successfully joined group with generation 323
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to 
> PARTITIONS_ASSIGNED
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1*
> 2018-10-23 08:27:53,622 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms.
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.streams.processor.internals.StoreChangelogReader - 
> stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task 
> 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* 
> *Reinitializing the task and restore its state from the beginning.*
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher  - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting 
> offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
> 2018-10-23 08:27:54,653 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to 
> RUNNING*
> {quote}
> From the logs above, we can get the procedure for thread 
> AuditTrailBatch-StreamThread-1:
>  # the previous running task assigned to thread 1 is task 0_5 (the 
> corresponding partition is AuditTrailBatch-0-5)
>  # group begins to rebalance, the new task 1_1 is assigned to thread 1.
>  # no checkpoint was found under 1_1 state folder, so reset the offset to 0 
> and clean the local state folder.
>  # thread 1 transitions to RUNNING state directly without the restoration for 
> task 1_1, so the historic data for state 1_1 is lost for thread 1. 
> *Tho