[jira] [Commented] (KAFKA-12261) Splitting partition causes message loss for consumers with auto.offset.reset=latest

2021-02-18 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286901#comment-17286901
 ] 

Haruki Okada commented on KAFKA-12261:
--

Yeah, actually I've reconsidered about that and changed the description of this 
issue.

 

Agree with keeping the default setting. Then may I submit a patch about 
refining AUTO_OFFSET_RESET_DOC?

> Splitting partition causes message loss for consumers with 
> auto.offset.reset=latest
> ---
>
> Key: KAFKA-12261
> URL: https://issues.apache.org/jira/browse/KAFKA-12261
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0
>Reporter: Haruki Okada
>Assignee: Luke Chen
>Priority: Minor
>
> As of now, auto.offset.reset of ConsumerConfig is "latest" by default.
>  
> This could be a pitfall that causes message delivery loss when we split 
> topic's partitions like below:
> Say we have a topic-X which have only 1 partition.
>  # split topic-X to 2 partitions by kafka-topics.sh --alter --topic topic-X 
> --partitions 2 (topic-X-1 is added)
>  # producer knows that new partitions are added by refreshing metadata. 
> starts to produce to topic-X-1
>  # bit later, consumer knows that new partitions are added and triggering 
> consumer rebalance, then starts consuming topic-X-1
>  * 
>  ** upon starting consumption, it resets its offset to log-end-offset
> If the producer sent several records before 3, they could be not-delivered to 
> the consumer.
>  
>  
> This behavior isn't preferable in most cases, so it should be documented in 
> AUTO_OFFSET_RESET_DOC at least.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12261) Splitting partition causes message loss for consumers with auto.offset.reset=latest

2021-02-18 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286892#comment-17286892
 ] 

Luke Chen commented on KAFKA-12261:
---

I think we should document it, but not change the default setting to the 
*earlist*

> Splitting partition causes message loss for consumers with 
> auto.offset.reset=latest
> ---
>
> Key: KAFKA-12261
> URL: https://issues.apache.org/jira/browse/KAFKA-12261
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0
>Reporter: Haruki Okada
>Assignee: Luke Chen
>Priority: Minor
>
> As of now, auto.offset.reset of ConsumerConfig is "latest" by default.
>  
> This could be a pitfall that causes message delivery loss when we split 
> topic's partitions like below:
> Say we have a topic-X which have only 1 partition.
>  # split topic-X to 2 partitions by kafka-topics.sh --alter --topic topic-X 
> --partitions 2 (topic-X-1 is added)
>  # producer knows that new partitions are added by refreshing metadata. 
> starts to produce to topic-X-1
>  # bit later, consumer knows that new partitions are added and triggering 
> consumer rebalance, then starts consuming topic-X-1
>  * 
>  ** upon starting consumption, it resets its offset to log-end-offset
> If the producer sent several records before 3, they could be not-delivered to 
> the consumer.
>  
>  
> This behavior isn't preferable in most cases, so it should be documented in 
> AUTO_OFFSET_RESET_DOC at least.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12346) punctuate is called at twice the duration passed as the first argument to Processor.Schedule (with PunctuationType.WALL_CLOCK_TIME)

2021-02-18 Thread Arindam Ray (Jira)
Arindam Ray created KAFKA-12346:
---

 Summary: punctuate is called at twice the duration passed as the 
first argument to Processor.Schedule (with PunctuationType.WALL_CLOCK_TIME)
 Key: KAFKA-12346
 URL: https://issues.apache.org/jira/browse/KAFKA-12346
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.7.0
Reporter: Arindam Ray


A stream transform called with the idiom below causes punctuate to be called at 
twice the duration of the argument passed
{code:java}
.transform(new TransformerSupplier[String, TimeStampedString, 
KeyValue[String, TimeStampedString]]() {
  override def get(): Transformer[String, TimeStampedString, 
KeyValue[String, TimeStampedString]] = new Transformer[String, 
TimeStampedString, KeyValue[String, TimeStampedString]] {override 
def init(context: ProcessorContext): Unit = {
  val store = 
context.getStateStore(stateStoreName).asInstanceOf[KeyValueStore[String, 
ValueAndTimestamp[TimeStampedString]]]
  context.schedule(scanFrequency,
PunctuationType.WALL_CLOCK_TIME,
new Punctuator {
  override def punctuate(timestamp: Long): Unit = {
logger.info(s"Punctuate invoked with timestamp : 
${Instant.ofEpochMilli(timestamp)}")
  }
}
  )
}override def transform(key: String, value: 
TimeStampedString): KeyValue[String, TimeStampedString] = {
  // no need to return anything here, the Punctuator will emit the 
records when necessary
  null
}override def close(): Unit = {}
  }
},  /**
   * register that this Transformer needs to be connected to our state 
store.
   */
  stateStoreName
)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12261) Splitting partition causes message loss for consumers with auto.offset.reset=latest

2021-02-18 Thread Haruki Okada (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haruki Okada updated KAFKA-12261:
-
Description: 
As of now, auto.offset.reset of ConsumerConfig is "latest" by default.

 

This could be a pitfall that causes message delivery loss when we split topic's 
partitions like below:

Say we have a topic-X which have only 1 partition.
 # split topic-X to 2 partitions by kafka-topics.sh --alter --topic topic-X 
--partitions 2 (topic-X-1 is added)
 # producer knows that new partitions are added by refreshing metadata. starts 
to produce to topic-X-1
 # bit later, consumer knows that new partitions are added and triggering 
consumer rebalance, then starts consuming topic-X-1

 * 
 ** upon starting consumption, it resets its offset to log-end-offset

If the producer sent several records before 3, they could be not-delivered to 
the consumer.

 

 

This behavior isn't preferable in most cases, so it should be documented in 
AUTO_OFFSET_RESET_DOC at least.

  was:
As of now, auto.offset.reset of ConsumerConfig is "latest" by default.

 

This could be a pitfall that causes message delivery loss when we split topic's 
partitions like below:

Say we have a topic-X which have only 1 partition.
 # split topic-X to 2 partitions by kafka-topics.sh --alter --topic topic-X 
--partitions 2 (topic-X-1 is added)
 # producer knows that new partitions are added by refreshing metadata. starts 
to produce to topic-X-1
 # bit later, consumer knows that new partitions are added and triggering 
consumer rebalance, then starts consuming topic-X-1

 * 
 ** upon starting consumption, it resets its offset to log-end-offset

If the producer sent several records before 3, they could be not-delivered to 
the consumer.

 

 

This behavior isn't preferable in most cases, so auto.offset.reset should be 
set to "earliest" by default to avoid this pitfall.


> Splitting partition causes message loss for consumers with 
> auto.offset.reset=latest
> ---
>
> Key: KAFKA-12261
> URL: https://issues.apache.org/jira/browse/KAFKA-12261
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0
>Reporter: Haruki Okada
>Assignee: Luke Chen
>Priority: Minor
>
> As of now, auto.offset.reset of ConsumerConfig is "latest" by default.
>  
> This could be a pitfall that causes message delivery loss when we split 
> topic's partitions like below:
> Say we have a topic-X which have only 1 partition.
>  # split topic-X to 2 partitions by kafka-topics.sh --alter --topic topic-X 
> --partitions 2 (topic-X-1 is added)
>  # producer knows that new partitions are added by refreshing metadata. 
> starts to produce to topic-X-1
>  # bit later, consumer knows that new partitions are added and triggering 
> consumer rebalance, then starts consuming topic-X-1
>  * 
>  ** upon starting consumption, it resets its offset to log-end-offset
> If the producer sent several records before 3, they could be not-delivered to 
> the consumer.
>  
>  
> This behavior isn't preferable in most cases, so it should be documented in 
> AUTO_OFFSET_RESET_DOC at least.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException

2021-02-18 Thread GitBox


chia7712 commented on a change in pull request #10158:
URL: https://github.com/apache/kafka/pull/10158#discussion_r578965629



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -366,6 +346,42 @@ private void readToLogEnd() {
 }
 }
 
+// Visible for testing
+Map readEndOffsets(Set assignment) {
+log.trace("Reading to end of offset log");
+
+Map endOffsets;

Review comment:
   unused variable





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12261) Splitting partition causes message loss for consumers with auto.offset.reset=latest

2021-02-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-12261:
-

Assignee: Luke Chen

> Splitting partition causes message loss for consumers with 
> auto.offset.reset=latest
> ---
>
> Key: KAFKA-12261
> URL: https://issues.apache.org/jira/browse/KAFKA-12261
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0
>Reporter: Haruki Okada
>Assignee: Luke Chen
>Priority: Minor
>
> As of now, auto.offset.reset of ConsumerConfig is "latest" by default.
>  
> This could be a pitfall that causes message delivery loss when we split 
> topic's partitions like below:
> Say we have a topic-X which have only 1 partition.
>  # split topic-X to 2 partitions by kafka-topics.sh --alter --topic topic-X 
> --partitions 2 (topic-X-1 is added)
>  # producer knows that new partitions are added by refreshing metadata. 
> starts to produce to topic-X-1
>  # bit later, consumer knows that new partitions are added and triggering 
> consumer rebalance, then starts consuming topic-X-1
>  * 
>  ** upon starting consumption, it resets its offset to log-end-offset
> If the producer sent several records before 3, they could be not-delivered to 
> the consumer.
>  
>  
> This behavior isn't preferable in most cases, so auto.offset.reset should be 
> set to "earliest" by default to avoid this pitfall.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12273) InterBrokerSendThread#pollOnce throws FatalExitError even though it is shutdown correctly

2021-02-18 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-12273:
---
Fix Version/s: 3.0.0

> InterBrokerSendThread#pollOnce throws FatalExitError even though it is 
> shutdown correctly
> -
>
> Key: KAFKA-12273
> URL: https://issues.apache.org/jira/browse/KAFKA-12273
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 3.0.0, 2.8.0
>
>
> kafka tests sometimes shutdown gradle with non-zero code. The (one of) root 
> cause is that InterBrokerSendThread#pollOnce encounters DisconnectException 
> when NetworkClient is closing. DisconnectException should be viewed as 
> "expected" error as we do close it. In other words, 
> InterBrokerSendThread#pollOnce should swallow it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12336) custom stream naming does not work while calling stream[K, V](topicPattern: Pattern) API with named Consumed parameter

2021-02-18 Thread GeordieMai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GeordieMai reassigned KAFKA-12336:
--

Assignee: GeordieMai

> custom stream naming does not work while calling stream[K, V](topicPattern: 
> Pattern) API with named Consumed parameter 
> ---
>
> Key: KAFKA-12336
> URL: https://issues.apache.org/jira/browse/KAFKA-12336
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Ramil Israfilov
>Assignee: GeordieMai
>Priority: Minor
>  Labels: easy-fix, newbie
>
> In our Scala application I am trying to implement custom naming for Kafka 
> Streams application nodes.
> We are using topicPattern for our stream source.
> Here is an API which I am calling:
>  
> {code:java}
> val topicsPattern="t-[A-Za-z0-9-].suffix"
> val operations: KStream[MyKey, MyValue] =
>   builder.stream[MyKey, MyValue](Pattern.compile(topicsPattern))(
> Consumed.`with`[MyKey, MyValue].withName("my-fancy-name")
>   )
> {code}
>  Despite the fact that I am providing Consumed with custom name the topology 
> describe still show "KSTREAM-SOURCE-00" as name for our stream source.
> It is not a problem if I just use a name for topic. But our application needs 
> to get messages from set of topics based on topicname pattern matching.
> After checking the kakfa code I see that
> org.apache.kafka.streams.kstream.internals.InternalStreamBuilder (on line 
> 103) has a bug:
> {code:java}
> public  KStream stream(final Pattern topicPattern,
>final ConsumedInternal consumed) {
> final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
> final StreamSourceNode streamPatternSourceNode = new 
> StreamSourceNode<>(name, topicPattern, consumed);
> {code}
> node name construction does not take into account the name of consumed 
> parameter.
> For example code for another stream api call with topic name does it 
> correctly:
> {code:java}
> final String name = new 
> NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, 
> KStreamImpl.SOURCE_NAME);
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tinawenqiao commented on pull request #9235: KAFKA-10449: Add some important parameter desc in connect-distributed.properties

2021-02-18 Thread GitBox


tinawenqiao commented on pull request #9235:
URL: https://github.com/apache/kafka/pull/9235#issuecomment-781848357


   > Thanks for the PR @tinawenqiao.
   > 
   > It would be nice to also mention `rest.port` and `rest.host.name` are 
deprecated in their descriptions. Could you add something like:
   > `DEPRECATED: only used when listeners is not set. Use listeners instead. ` 
to their descriptions in `WorkerConfigs`?
   > 
   > Thanks
   
   Thanks for your notice. A new patch is ready.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

2021-02-18 Thread GitBox


mjsax commented on pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#issuecomment-781840391


   Updated.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

2021-02-18 Thread GitBox


mjsax commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r578937359



##
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##
@@ -32,6 +33,7 @@
 public final int topicGroupId;
 /** The ID of the partition. */
 public final int partition;
+public Task task;

Review comment:
   Passing the task into the `RecordCollector` also introduced a cyclic 
dependency, as we pass the collector into the task. But getting the changelog 
partitions within `handleCorrupted` makes sense -- it's actually even cleaner 
to begin with.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10157: MINOR: Raft request thread should discover api versions

2021-02-18 Thread GitBox


hachikuji commented on pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#issuecomment-781825379


   > Okay. I guess we need another KIP that explains how to upgrade a cluster 
and to determine when it is safe to enable the RaftClient on all of the brokers 
of a cluster.
   
   Yes, a KIP describing the upgrade process will be necessary. I think api 
version negotiation is probably the least of our concerns 😆 .



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException

2021-02-18 Thread GitBox


rhauch commented on pull request #10158:
URL: https://github.com/apache/kafka/pull/10158#issuecomment-781813200


   @chia7712, for what it's worth, I still think your #10152 PR is still an 
important fix.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

2021-02-18 Thread GitBox


rhauch commented on pull request #10152:
URL: https://github.com/apache/kafka/pull/10152#issuecomment-781812825


   Thanks, @chia7712. I've reviewed much of this, and it seems straightforward, 
though I plan to review more thoroughly tomorrow (~12 hours). I've asked a few 
others that might be more familiar with the admin client code to also take a 
look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException

2021-02-18 Thread GitBox


rhauch commented on a change in pull request #10158:
URL: https://github.com/apache/kafka/pull/10158#discussion_r578916881



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -390,7 +410,11 @@ public void run() {
 log.trace("Finished read to end log for topic {}", 
topic);
 } catch (TimeoutException e) {
 log.warn("Timeout while reading log to end for 
topic '{}'. Retrying automatically. " +
-"This may occur when brokers are unavailable 
or unreachable. Reason: {}", topic, e.getMessage());
+ "This may occur when brokers are 
unavailable or unreachable. Reason: {}", topic, e.getMessage());
+continue;
+} catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {

Review comment:
   The previous `readToLogEnd()` that used the consumer did not have any 
special exception handling in the `start()` method. Any problem to start seems 
like it should propagate up to result in the worker failing. Especially when 
coupled with your fix in #10152.
   
   I added this here because retriable exceptions should not stop the 
KafkaBasedLog's thread.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException

2021-02-18 Thread GitBox


rhauch commented on a change in pull request #10158:
URL: https://github.com/apache/kafka/pull/10158#discussion_r578919850



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -366,6 +348,44 @@ private void readToLogEnd() {
 }
 }
 
+// Visible for testing
+Map readEndOffsets(Set assignment) {
+log.trace("Reading to end of offset log");
+
+Map endOffsets;
+// Note that we'd prefer to not use the consumer to find the end 
offsets for the assigned topic partitions.
+// That is because it's possible that the consumer is already blocked 
waiting for new records to appear, when
+// the consumer is already at the end. In such cases, using 
'consumer.endOffsets(...)' will block until at least
+// one more record becomes available, meaning we can't even check 
whether we're at the end offset.
+// Since all we're trying to do here is get the end offset, we should 
use the supplied admin client
+// (if available)
+// (which prevents 'consumer.endOffsets(...)'
+// from
+
+// Deprecated constructors do not provide an admin supplier, so the 
admin is potentially null.
+if (useAdminForListOffsets) {
+// Use the admin client to immediately find the end offsets for 
the assigned topic partitions.
+// Unlike using the consumer
+try {
+endOffsets = admin.endOffsets(assignment);

Review comment:
   I did see a way of reducing the # of returns to 2, and removing one of 
the duplicated lines. Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException

2021-02-18 Thread GitBox


rhauch commented on pull request #10158:
URL: https://github.com/apache/kafka/pull/10158#issuecomment-781810306


   Thanks for the quick review, @chia7712. I've incorporated several of your 
suggestions, and added comments/responses on the others.
   
   BTW, a run of the relevant Connect distributed system tests passed with this 
branch: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4390/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException

2021-02-18 Thread GitBox


rhauch commented on a change in pull request #10158:
URL: https://github.com/apache/kafka/pull/10158#discussion_r578917177



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -366,6 +348,44 @@ private void readToLogEnd() {
 }
 }
 
+// Visible for testing
+Map readEndOffsets(Set assignment) {
+log.trace("Reading to end of offset log");
+
+Map endOffsets;
+// Note that we'd prefer to not use the consumer to find the end 
offsets for the assigned topic partitions.
+// That is because it's possible that the consumer is already blocked 
waiting for new records to appear, when
+// the consumer is already at the end. In such cases, using 
'consumer.endOffsets(...)' will block until at least
+// one more record becomes available, meaning we can't even check 
whether we're at the end offset.
+// Since all we're trying to do here is get the end offset, we should 
use the supplied admin client
+// (if available)
+// (which prevents 'consumer.endOffsets(...)'
+// from
+
+// Deprecated constructors do not provide an admin supplier, so the 
admin is potentially null.
+if (useAdminForListOffsets) {
+// Use the admin client to immediately find the end offsets for 
the assigned topic partitions.
+// Unlike using the consumer
+try {
+endOffsets = admin.endOffsets(assignment);

Review comment:
   I wanted to avoid 3 separate return statements.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException

2021-02-18 Thread GitBox


rhauch commented on a change in pull request #10158:
URL: https://github.com/apache/kafka/pull/10158#discussion_r578916881



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -390,7 +410,11 @@ public void run() {
 log.trace("Finished read to end log for topic {}", 
topic);
 } catch (TimeoutException e) {
 log.warn("Timeout while reading log to end for 
topic '{}'. Retrying automatically. " +
-"This may occur when brokers are unavailable 
or unreachable. Reason: {}", topic, e.getMessage());
+ "This may occur when brokers are 
unavailable or unreachable. Reason: {}", topic, e.getMessage());
+continue;
+} catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {

Review comment:
   The previous `readToLogEnd()` that used the consumer did not have any 
special exception handling in the `start()` method. I added this here because 
retriable exceptions should not stop the KafkaBasedLog's thread.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10154: MINOR: Added missing import to kafka.py

2021-02-18 Thread GitBox


chia7712 merged pull request #10154:
URL: https://github.com/apache/kafka/pull/10154


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10154: MINOR: Added missing import to kafka.py

2021-02-18 Thread GitBox


chia7712 commented on pull request #10154:
URL: https://github.com/apache/kafka/pull/10154#issuecomment-781801914


   run `downgrade_test` with this patch on my local. pass
   
   will merge this patch to trunk and 2.8



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10157: MINOR: Raft request thread should discover api versions

2021-02-18 Thread GitBox


hachikuji commented on pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#issuecomment-781799540


   @jsancio I may be misunderstanding your question, but api versions are 
negotiated internally in `NetworkClient`. If there are no compatible versions, 
then the client will raise an unsupported version error.
   
   It looks like it would be useful to have at least one test case to verify 
the expected behavior. At a glance, I could not find any logic to check the 
returned `ClientResponse` for version errors.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException

2021-02-18 Thread GitBox


chia7712 commented on a change in pull request #10158:
URL: https://github.com/apache/kafka/pull/10158#discussion_r578907382



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -366,6 +348,44 @@ private void readToLogEnd() {
 }
 }
 
+// Visible for testing
+Map readEndOffsets(Set assignment) {
+log.trace("Reading to end of offset log");
+
+Map endOffsets;
+// Note that we'd prefer to not use the consumer to find the end 
offsets for the assigned topic partitions.
+// That is because it's possible that the consumer is already blocked 
waiting for new records to appear, when
+// the consumer is already at the end. In such cases, using 
'consumer.endOffsets(...)' will block until at least
+// one more record becomes available, meaning we can't even check 
whether we're at the end offset.
+// Since all we're trying to do here is get the end offset, we should 
use the supplied admin client
+// (if available)
+// (which prevents 'consumer.endOffsets(...)'
+// from
+
+// Deprecated constructors do not provide an admin supplier, so the 
admin is potentially null.
+if (useAdminForListOffsets) {
+// Use the admin client to immediately find the end offsets for 
the assigned topic partitions.
+// Unlike using the consumer
+try {
+endOffsets = admin.endOffsets(assignment);

Review comment:
   How about `return admin.endOffsets(assignment);`

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -390,7 +410,11 @@ public void run() {
 log.trace("Finished read to end log for topic {}", 
topic);
 } catch (TimeoutException e) {
 log.warn("Timeout while reading log to end for 
topic '{}'. Retrying automatically. " +
-"This may occur when brokers are unavailable 
or unreachable. Reason: {}", topic, e.getMessage());
+ "This may occur when brokers are 
unavailable or unreachable. Reason: {}", topic, e.getMessage());
+continue;
+} catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {

Review comment:
   `readToLogEnd` is called by `start`. Should we add similar exception 
handle for that? 

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -321,29 +325,7 @@ private void readToLogEnd() {
 log.trace("Reading to end of offset log");

Review comment:
   redundant log message. `readEndOffsets(Set)` has similar 
log.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##
@@ -651,6 +651,10 @@ public Config describeTopicConfig(String topic) {
  * @param partitions the topic partitions
  * @return the map of offset for each topic partition, or an empty map if 
the supplied partitions
  * are null or empty
+ * @throws UnsupportedVersionException if the admin client cannot read end 
offsets
+ * @throws TimeoutException if the offset metadata could not be fetched 
before the amount of time allocated
+ * by {@code request.timeout.ms} expires, and this call can be 
retried
+ * @throws LeaderNotAvailableException if the leader was not available and 
this call can be retried
  * @throws RetriableException if a retriable error occurs, the operation 
takes too long, or the

Review comment:
   `the operation takes too long` 
   ^^^ this message gets invalid since `TimeoutException` 
is not wrapped to `RetriableException` anymore.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10147: MINOR: Add raft resigned state metric name

2021-02-18 Thread GitBox


dengziming commented on pull request #10147:
URL: https://github.com/apache/kafka/pull/10147#issuecomment-781799200


   @guozhangwang @hachikuji , Hello, PTAL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12258) Change the BatchAccumulator to split records into batches

2021-02-18 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12258.
-
Resolution: Fixed

> Change the BatchAccumulator to split records into batches
> -
>
> Key: KAFKA-12258
> URL: https://issues.apache.org/jira/browse/KAFKA-12258
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, core
>Affects Versions: 2.8.0
>Reporter: Alok Nikhil
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: kip-500
>
> Modify the `BatchAccumulator.append contract` to support splitting a batch of 
> records whose size is greater than the maximum allowed size (of 1048576 
> currently) into batches to avoid RaftClient failures such as
> {code:java}
> leader=0, leaderEpoch=0, partitionEpoch=0) at version 0), 
> ApiMessageAndVersion(TopicRecord(name='topic-BEHRW', 
> topicId=6cRudOGO3yqlsu48RwyPSw) at version 0), 
> ApiMessageAndVersion(PartitionRecord(partitionId=0, 
> topicId=6cRudOGO3yqlsu48RwyPSw, replicas=[1, 2, 0], isr=[1, 2, 0], 
> removingReplicas=null, addingReplicas=null, leader=1, leaderEpoch=0, 
> partitionEpoch=0) at version 0)] is 1088890, which exceeds the maximum 
> allowed batch size of 1048576 Jan 30 00:13:40 ip-10-0-0-254 
> kafka-server-start.sh[633637]: at 
> org.apache.kafka.raft.internals.BatchAccumulator.append(BatchAccumulator.java:110)
>  Jan 30 00:13:40 ip-10-0-0-254 kafka-server-start.sh[633637]: at 
> org.apache.kafka.raft.KafkaRaftClient.scheduleAppend(KafkaRaftClient.java:1885)
>  Jan 30 00:13:40 ip-10-0-0-254 kafka-server-start.sh[633637]: at 
> org.apache.kafka.raft.metadata.MetaLogRaftShim.scheduleWrite(MetaLogRaftShim.java:60)
>  Jan 30 00:13:40 ip-10-0-0-254 kafka-server-start.sh[633637]: at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:406)
>  Jan 30 00:13:40 ip-10-0-0-254 kafka-server-start.sh[633637]: at 
> org.apache.kafka.common.utils.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:117)
>  Jan 30 00:13:40 ip-10-0-0-254 kafka-server-start.sh[633637]: at 
> org.apache.kafka.common.utils.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:192)
>  Jan 30 00:13:40 ip-10-0-0-254 kafka-server-start.sh[633637]: at 
> org.apache.kafka.common.utils.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:165)
>  Jan 30 00:13:40 ip-10-0-0-254 kafka-server-start.sh[633637]: at 
> java.base/java.lang.Thread.run(Thread.java:834) Jan 30 00:13:40 ip-10-0-0-254 
> kafka-server-start.sh[633637]: [2021-01-30 00:13:40,277] INFO [Controller 
> 3000] Reverting to snapshot 2232 (org.apache.kafka.timeline.SnapshotRegistry)
> {code}
> *Example use-case*: Creating 10,000 topics in a single API call



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10063: KAFKA-12258: Add support for splitting appending records

2021-02-18 Thread GitBox


hachikuji merged pull request #10063:
URL: https://github.com/apache/kafka/pull/10063


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10063: KAFKA-12258: Add support for splitting appending records

2021-02-18 Thread GitBox


jsancio commented on pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#issuecomment-781790693


   The following tests failed:
   ```
   core:
   
   ConfigCommandTest. shouldFailIfUnresolvableHost()
   ConfigCommandTest. shouldFailIfUnresolvableHost()
   ConfigCommandTest. shouldFailIfUnresolvableHostUsingZookeeper()
   ConfigCommandTest. shouldFailIfUnresolvableHostUsingZookeeper()
   DynamicConfigChangeTest. testIpHandlerUnresolvableAddress()
   DynamicConfigChangeTest. testIpHandlerUnresolvableAddress()
   DynamicConfigTest. shouldFailIpConfigsWithBadHost()
   DynamicConfigTest. shouldFailIpConfigsWithBadHost()
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10157: MINOR: Raft request thread should discover api versions

2021-02-18 Thread GitBox


jsancio commented on pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#issuecomment-781784110


   > We do not plan to rely on the IBP in order to determine API versions for 
raft requests. Instead, we want to discover them through the ApiVersions API. 
This patch enables the flag to do so.
   
   How or where do we check that the connected nodes support the versions we 
require? Is that coming in a future PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #10159: KAFKA-12338: Consolidate MetadataRecordSerde and MetadataParser serial/deserial code

2021-02-18 Thread GitBox


dengziming opened a new pull request #10159:
URL: https://github.com/apache/kafka/pull/10159


   *More detailed description of your change*
   The logics are duplicated except that `MetadataRecordSerde` has an extra 
`DEFAULT_FRAME_VERSION`, if we want to change the serial/deserial format of 
metadata, we should modify 2 classes, this is unreasonable.
   
   *Summary of testing strategy (including rationale)*
   unit test
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10063: KAFKA-12258: Add support for splitting appending records

2021-02-18 Thread GitBox


jsancio commented on pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#issuecomment-781778478


   Running the following commands
   ```
   ./gradlew -version
   
   
   Gradle 6.8.1
   
   
   Build time:   2021-01-22 13:20:08 UTC
   Revision: 31f14a87d93945024ab7a78de84102a3400fa5b2
   
   Kotlin:   1.4.20
   Groovy:   2.5.12
   Ant:  Apache Ant(TM) version 1.10.9 compiled on September 27 2020
   JVM:  1.8.0_282 (Private Build 25.282-b08)
   OS:   Linux 5.8.0-7642-generic amd64
   
   ./gradlew -PscalaVersion=2.12 clean compileJava compileScala compileTestJava 
compileTestScala spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain 
rat --profile --no-daemon --continue -PxmlSpotBugsReport=true
   
   ./gradlew -PscalaVersion=2.13 clean compileJava compileScala compileTestJava 
compileTestScala spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain 
rat --profile --no-daemon --continue -PxmlSpotBugsReport=true
   
   ./gradlew -PscalaVersion=2.12 unitTest integrationTest --profile --no-daemon 
--continue -PtestLoggingEvents=started,passed,skipped,failed 
-PignoreFailures=true -PmaxTestRetries=1 -PmaxTestRetryFailures=5
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10091: KAFKA-9524: increase retention time for window and grace periods longer than one day

2021-02-18 Thread GitBox


mjsax commented on pull request #10091:
URL: https://github.com/apache/kafka/pull/10091#issuecomment-781777265


   Thanks for the PR @MarcoLotz! And congrats to your first code contribution!
   
   Thanks for reviewing @vcrfxia!
   
   Merged to `trunk` and cherry-picked to `2.8` branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9524) Default window retention does not consider grace period

2021-02-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9524.

Fix Version/s: 2.8.0
   Resolution: Fixed

> Default window retention does not consider grace period
> ---
>
> Key: KAFKA-9524
> URL: https://issues.apache.org/jira/browse/KAFKA-9524
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Bingham
>Assignee: Marco Lotz
>Priority: Minor
> Fix For: 2.8.0
>
>
> In a windowed aggregation, if you specify a window size larger than the 
> default window retention (1 day), Streams will implicitly set retention 
> accordingly to accommodate windows of that size. For example, 
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20))) 
> {code}
> In this case, Streams will implicitly set window retention to 20 days, and no 
> exceptions will occur.
> However, if you also include a non-zero grace period on the window, such as:
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
> {code}
> In this case, Streams will still implicitly set the window retention 20 days 
> (not 20 days + 5 minutes grace), and an exception will be thrown:
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: The retention 
> period of the window store KSTREAM-KEY-SELECT-02 must be no smaller 
> than its window size plus the grace period. Got size=[172800], 
> grace=[30], retention=[172800]{code}
> Ideally, Streams should include grace period when implicitly setting window 
> retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9524) Default window retention does not consider grace period

2021-02-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9524:
---
Issue Type: Improvement  (was: Bug)

> Default window retention does not consider grace period
> ---
>
> Key: KAFKA-9524
> URL: https://issues.apache.org/jira/browse/KAFKA-9524
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Bingham
>Assignee: Marco Lotz
>Priority: Minor
>
> In a windowed aggregation, if you specify a window size larger than the 
> default window retention (1 day), Streams will implicitly set retention 
> accordingly to accommodate windows of that size. For example, 
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20))) 
> {code}
> In this case, Streams will implicitly set window retention to 20 days, and no 
> exceptions will occur.
> However, if you also include a non-zero grace period on the window, such as:
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
> {code}
> In this case, Streams will still implicitly set the window retention 20 days 
> (not 20 days + 5 minutes grace), and an exception will be thrown:
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: The retention 
> period of the window store KSTREAM-KEY-SELECT-02 must be no smaller 
> than its window size plus the grace period. Got size=[172800], 
> grace=[30], retention=[172800]{code}
> Ideally, Streams should include grace period when implicitly setting window 
> retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch opened a new pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException

2021-02-18 Thread GitBox


rhauch opened a new pull request #10158:
URL: https://github.com/apache/kafka/pull/10158


   Refactored the KafkaBasedLog logic to read end offsets into a separate 
method to make it easier to test. Also changed the TopicAdmin.endOffsets method 
to throw the original UnsupportedVersionException, LeaderNotAvailableException, 
and TimeoutException rather than wrapping, to better conform with the consumer 
method and how the KafkaBasedLog retries those exceptions.
   
   Added new tests to verify various scenarios and errors.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

2021-02-18 Thread GitBox


jsancio commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r578886362



##
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
##
@@ -307,4 +309,38 @@ public int writeRecord(
 recordOutput.writeVarint(0);
 return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
 }
+
+private int batchHeaderSizeInBytes() {
+return AbstractRecords.recordBatchHeaderSizeInBytes(
+RecordBatch.MAGIC_VALUE_V2,
+compressionType
+);
+}
+
+private int bytesNeededForRecords(
+Collection records,
+ObjectSerializationCache serializationCache
+) {
+long expectedNextOffset = nextOffset;
+int bytesNeeded = 0;
+for (T record : records) {
+if (expectedNextOffset - baseOffset >= Integer.MAX_VALUE) {
+return Integer.MAX_VALUE;

Review comment:
   Updated it to use `Math.addExact`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state

2021-02-18 Thread Alok Nikhil (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alok Nikhil updated KAFKA-12345:

Affects Version/s: 2.8.0

> KIP-500: AlterIsrManager crashes on broker idle-state
> -
>
> Key: KAFKA-12345
> URL: https://issues.apache.org/jira/browse/KAFKA-12345
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Alok Nikhil
>Priority: Major
>  Labels: kip-500
>
> Occasionally, a scheduler thread on a broker crashes with this stack
>  
> {code:java}
> [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 
> 'send-alter-isr' (kafka.utils.KafkaScheduler)
>  java.lang.NullPointerException
>  at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117)
>  at 
> kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85)
>  at 
> kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834){code}
>  
> After that the broker is unable to fetch any records from any other broker 
> (and vice versa)
> {code:java}
> [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, 
> fetcherId=0] Error sending fetch request (sessionId=164432409
>  2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler)
>  java.io.IOException: Connection to 4 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state

2021-02-18 Thread Alok Nikhil (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286821#comment-17286821
 ] 

Alok Nikhil commented on KAFKA-12345:
-

Hi [~dengziming]. I have updated the Kafka version and the priority. This is 
part of the KIP-500 merge. So, it's not an active issue (considering KIP-500 is 
still in development and not the default mode the Broker will operate in).

> KIP-500: AlterIsrManager crashes on broker idle-state
> -
>
> Key: KAFKA-12345
> URL: https://issues.apache.org/jira/browse/KAFKA-12345
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Alok Nikhil
>Priority: Minor
>  Labels: kip-500
>
> Occasionally, a scheduler thread on a broker crashes with this stack
>  
> {code:java}
> [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 
> 'send-alter-isr' (kafka.utils.KafkaScheduler)
>  java.lang.NullPointerException
>  at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117)
>  at 
> kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85)
>  at 
> kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834){code}
>  
> After that the broker is unable to fetch any records from any other broker 
> (and vice versa)
> {code:java}
> [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, 
> fetcherId=0] Error sending fetch request (sessionId=164432409
>  2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler)
>  java.io.IOException: Connection to 4 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state

2021-02-18 Thread Alok Nikhil (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alok Nikhil updated KAFKA-12345:

Priority: Minor  (was: Major)

> KIP-500: AlterIsrManager crashes on broker idle-state
> -
>
> Key: KAFKA-12345
> URL: https://issues.apache.org/jira/browse/KAFKA-12345
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Alok Nikhil
>Priority: Minor
>  Labels: kip-500
>
> Occasionally, a scheduler thread on a broker crashes with this stack
>  
> {code:java}
> [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 
> 'send-alter-isr' (kafka.utils.KafkaScheduler)
>  java.lang.NullPointerException
>  at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117)
>  at 
> kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85)
>  at 
> kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834){code}
>  
> After that the broker is unable to fetch any records from any other broker 
> (and vice versa)
> {code:java}
> [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, 
> fetcherId=0] Error sending fetch request (sessionId=164432409
>  2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler)
>  java.io.IOException: Connection to 4 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon edited a comment on pull request #10118: KAFKA-10192: increase starting up waiting time

2021-02-18 Thread GitBox


showuon edited a comment on pull request #10118:
URL: https://github.com/apache/kafka/pull/10118#issuecomment-781768574


   > Fine by me, flaky tests suck. 
   
   Indeed! Thanks, @C0urante 
   
   @kkonstantine , could you check this PR? Thanks.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10129: KAFKA-10817; Add clusterId validation to Fetch handling

2021-02-18 Thread GitBox


hachikuji commented on a change in pull request #10129:
URL: https://github.com/apache/kafka/pull/10129#discussion_r578873790



##
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##
@@ -354,7 +355,8 @@
 "Requested position is not greater than or equal to zero, and less 
than the size of the snapshot.",
 PositionOutOfRangeException::new),
 UNKNOWN_TOPIC_ID(100, "This server does not host this topic ID.", 
UnknownTopicIdException::new),
-DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", 
DuplicateBrokerRegistrationException::new);
+DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", 
DuplicateBrokerRegistrationException::new),
+INVALID_CLUSTER_ID(102, "The supplied cluster id is not valid.", 
InvalidClusterIdException::new);

Review comment:
   I couldn't find any existing `INVALID*` error code that seems to fit 
this case. Usually "invalid" is reserved for cases where the field is 
structurally invalid. For example, `INVALID_GROUP_ID` is used when the groupid 
is empty in APIs where we require it to be non-empty. The closest similar case 
is `INVALID_PRODUCER_ID_MAPPING`. 
   
   We are going to add an `INCONSISTENT_TOPIC_ID` in 
https://github.com/apache/kafka/pull/10143. Perhaps that is  enough cover here? 
The usage is similar: the request indicates an id which does not match the 
local state.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10118: KAFKA-10192: increase starting up waiting time

2021-02-18 Thread GitBox


showuon commented on pull request #10118:
URL: https://github.com/apache/kafka/pull/10118#issuecomment-781768574


   > Fine by me, flaky tests suck. 
   Indeed! Thanks, @C0urante 
   
   @kkonstantine , could you check this PR? Thanks.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #10157: MINOR: Raft request thread should discover api versions

2021-02-18 Thread GitBox


hachikuji opened a new pull request #10157:
URL: https://github.com/apache/kafka/pull/10157


   We do not plan to rely on the IBP in order to determine API versions for 
raft requests. Instead, we want to discover them through the ApiVersions API. 
This patch enables the flag to do so.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-18 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r578876029



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
+import org.apache.kafka.common.errors.StaleBrokerEpochException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+
+public class ClusterControlManager {

Review comment:
   Could we add some comments to this class?

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLe

[GitHub] [kafka] mjsax merged pull request #10091: KAFKA-9524: increase retention time for window and grace periods longer than one day

2021-02-18 Thread GitBox


mjsax merged pull request #10091:
URL: https://github.com/apache/kafka/pull/10091


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10129: KAFKA-10817; Add clusterId validation to Fetch handling

2021-02-18 Thread GitBox


hachikuji commented on a change in pull request #10129:
URL: https://github.com/apache/kafka/pull/10129#discussion_r578873790



##
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##
@@ -354,7 +355,8 @@
 "Requested position is not greater than or equal to zero, and less 
than the size of the snapshot.",
 PositionOutOfRangeException::new),
 UNKNOWN_TOPIC_ID(100, "This server does not host this topic ID.", 
UnknownTopicIdException::new),
-DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", 
DuplicateBrokerRegistrationException::new);
+DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", 
DuplicateBrokerRegistrationException::new),
+INVALID_CLUSTER_ID(102, "The supplied cluster id is not valid.", 
InvalidClusterIdException::new);

Review comment:
   I couldn't find any existing `INVALID*` error code that seems to fit 
this case. Usually "invalid" is reserved for cases where the field is 
structurally invalid. For example, `INVALID_GROUP_ID` is used when the groupid 
is empty in APIs where we require it to be non-empty. The closest similar case 
is `INVALID_PRODUCER_ID_MAPPING`. 
   
   We are going to add an INCONSISTENT_TOPIC_ID in 
https://github.com/apache/kafka/pull/10143. Perhaps that is  enough cover here? 
The usage is similar: the request indicates an id which does not match the 
local state.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10129: KAFKA-10817; Add clusterId validation to Fetch handling

2021-02-18 Thread GitBox


hachikuji commented on a change in pull request #10129:
URL: https://github.com/apache/kafka/pull/10129#discussion_r578876834



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1751,6 +1768,7 @@ private FetchRequestData buildFetchRequest() {
 });
 return request
 .setMaxWaitMs(fetchMaxWaitMs)
+.setClusterId(clusterId.toString())

Review comment:
   One small detail which is probably ok. The clusterId field in the fetch 
schema is not currently marked as ignorable. That should be ok since it is only 
used in the raft implementation which can guarantee that we will have version 
12 and above. On the other hand, I don't see any harm making the field 
ignorable since we are accepting a null value anyway. Is it worth changing that?

##
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##
@@ -354,7 +355,8 @@
 "Requested position is not greater than or equal to zero, and less 
than the size of the snapshot.",
 PositionOutOfRangeException::new),
 UNKNOWN_TOPIC_ID(100, "This server does not host this topic ID.", 
UnknownTopicIdException::new),
-DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", 
DuplicateBrokerRegistrationException::new);
+DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", 
DuplicateBrokerRegistrationException::new),
+INVALID_CLUSTER_ID(102, "The supplied cluster id is not valid.", 
InvalidClusterIdException::new);

Review comment:
   I couldn't find any existing `INVALID*` error code that seems to fit 
this case. Usually "invalid" is reserved for cases where the field is 
structurally invalid. For example, `INVALID_GROUP_ID` is used when the groupid 
is empty. The closest similar case is `INVALID_PRODUCER_ID_MAPPING`. 
   
   We are going to add an INCONSISTENT_TOPIC_ID in 
https://github.com/apache/kafka/pull/10143. Perhaps that is  enough cover here? 
The usage is similar: the request indicates an id which does not match the 
local state.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

2021-02-18 Thread GitBox


dengziming commented on pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#issuecomment-781762837


   ping @hachikuji .
   retest this, please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state

2021-02-18 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286815#comment-17286815
 ] 

dengziming edited comment on KAFKA-12345 at 2/19/21, 2:02 AM:
--

hello, please provide your specified Kafka version, or the revision number if 
you are using the dev branch of kafka for convenience.


was (Author: dengziming):
hello, please provide your specified Kafka version, or the revision number if 
you are using the dev branch of kafka.

> KIP-500: AlterIsrManager crashes on broker idle-state
> -
>
> Key: KAFKA-12345
> URL: https://issues.apache.org/jira/browse/KAFKA-12345
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Alok Nikhil
>Priority: Major
>  Labels: kip-500
>
> Occasionally, a scheduler thread on a broker crashes with this stack
>  
> {code:java}
> [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 
> 'send-alter-isr' (kafka.utils.KafkaScheduler)
>  java.lang.NullPointerException
>  at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117)
>  at 
> kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85)
>  at 
> kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834){code}
>  
> After that the broker is unable to fetch any records from any other broker 
> (and vice versa)
> {code:java}
> [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, 
> fetcherId=0] Error sending fetch request (sessionId=164432409
>  2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler)
>  java.io.IOException: Connection to 4 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state

2021-02-18 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286815#comment-17286815
 ] 

dengziming commented on KAFKA-12345:


hello, please provide your specified Kafka version, or the revision number if 
you are using the dev branch of kafka.

> KIP-500: AlterIsrManager crashes on broker idle-state
> -
>
> Key: KAFKA-12345
> URL: https://issues.apache.org/jira/browse/KAFKA-12345
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Alok Nikhil
>Priority: Major
>  Labels: kip-500
>
> Occasionally, a scheduler thread on a broker crashes with this stack
>  
> {code:java}
> [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 
> 'send-alter-isr' (kafka.utils.KafkaScheduler)
>  java.lang.NullPointerException
>  at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117)
>  at 
> kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85)
>  at 
> kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834){code}
>  
> After that the broker is unable to fetch any records from any other broker 
> (and vice versa)
> {code:java}
> [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, 
> fetcherId=0] Error sending fetch request (sessionId=164432409
>  2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler)
>  java.io.IOException: Connection to 4 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

2021-02-18 Thread GitBox


hachikuji commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r578870759



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -298,28 +342,30 @@ public void close() {
 public final List records;
 public final MemoryRecords data;
 private final MemoryPool pool;
-private final ByteBuffer buffer;
+// Buffer that was allocated by the MemoryPool (pool). This may not be 
the buffer used in
+// the MemoryRecords (data) object.
+private final ByteBuffer pooledBuffer;
 
 private CompletedBatch(
 long baseOffset,
 List records,
 MemoryRecords data,
 MemoryPool pool,
-ByteBuffer buffer
+ByteBuffer pooledBuffer

Review comment:
   nit: might be worth using the same name in `BatchBuilder`. Currently we 
use `initialBuffer`

##
File path: 
raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
##
@@ -164,47 +170,84 @@ public void testUnflushedBuffersReleasedByClose() {
 
 @Test
 public void testSingleBatchAccumulation() {
-int leaderEpoch = 17;
-long baseOffset = 157;
-int lingerMs = 50;
-int maxBatchSize = 512;
-
-Mockito.when(memoryPool.tryAllocate(maxBatchSize))
-.thenReturn(ByteBuffer.allocate(maxBatchSize));
-
-BatchAccumulator acc = buildAccumulator(
-leaderEpoch,
-baseOffset,
-lingerMs,
-maxBatchSize
-);
-
-List records = asList("a", "b", "c", "d", "e", "f", "g", "h", 
"i");
-assertEquals(baseOffset, acc.append(leaderEpoch, records.subList(0, 
1)));
-assertEquals(baseOffset + 2, acc.append(leaderEpoch, 
records.subList(1, 3)));
-assertEquals(baseOffset + 5, acc.append(leaderEpoch, 
records.subList(3, 6)));
-assertEquals(baseOffset + 7, acc.append(leaderEpoch, 
records.subList(6, 8)));
-assertEquals(baseOffset + 8, acc.append(leaderEpoch, 
records.subList(8, 9)));
-
-time.sleep(lingerMs);
-assertTrue(acc.needsDrain(time.milliseconds()));
-
-List> batches = acc.drain();
-assertEquals(1, batches.size());
-assertFalse(acc.needsDrain(time.milliseconds()));
-assertEquals(Long.MAX_VALUE - time.milliseconds(), 
acc.timeUntilDrain(time.milliseconds()));
-
-BatchAccumulator.CompletedBatch batch = batches.get(0);
-assertEquals(records, batch.records);
-assertEquals(baseOffset, batch.baseOffset);
+asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
+int leaderEpoch = 17;
+long baseOffset = 157;
+int lingerMs = 50;
+int maxBatchSize = 512;
+
+Mockito.when(memoryPool.tryAllocate(maxBatchSize))
+.thenReturn(ByteBuffer.allocate(maxBatchSize));
+
+BatchAccumulator acc = buildAccumulator(
+leaderEpoch,
+baseOffset,
+lingerMs,
+maxBatchSize
+);
+
+List records = asList("a", "b", "c", "d", "e", "f", "g", 
"h", "i");
+assertEquals(baseOffset, appender.call(acc, leaderEpoch, 
records.subList(0, 1)));
+assertEquals(baseOffset + 2, appender.call(acc, leaderEpoch, 
records.subList(1, 3)));
+assertEquals(baseOffset + 5, appender.call(acc, leaderEpoch, 
records.subList(3, 6)));
+assertEquals(baseOffset + 7, appender.call(acc, leaderEpoch, 
records.subList(6, 8)));
+assertEquals(baseOffset + 8, appender.call(acc, leaderEpoch, 
records.subList(8, 9)));
+
+time.sleep(lingerMs);
+assertTrue(acc.needsDrain(time.milliseconds()));
+
+List> batches = 
acc.drain();
+assertEquals(1, batches.size());
+assertFalse(acc.needsDrain(time.milliseconds()));
+assertEquals(Long.MAX_VALUE - time.milliseconds(), 
acc.timeUntilDrain(time.milliseconds()));
+
+BatchAccumulator.CompletedBatch batch = batches.get(0);
+assertEquals(records, batch.records);
+assertEquals(baseOffset, batch.baseOffset);
+});
 }
 
 @Test
 public void testMultipleBatchAccumulation() {
+asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
+int leaderEpoch = 17;
+long baseOffset = 157;
+int lingerMs = 50;
+int maxBatchSize = 256;
+
+Mockito.when(memoryPool.tryAllocate(maxBatchSize))
+.thenReturn(ByteBuffer.allocate(maxBatchSize));
+
+BatchAccumulator acc = buildAccumulator(
+leaderEpoch,
+baseOffset,
+lingerMs,
+maxBatchSize
+);
+
+// Append entries until we have 4 batches to drain (3 completed, 1 
building)
+  

[GitHub] [kafka] hachikuji commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

2021-02-18 Thread GitBox


hachikuji commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r578870321



##
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
##
@@ -307,4 +309,38 @@ public int writeRecord(
 recordOutput.writeVarint(0);
 return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
 }
+
+private int batchHeaderSizeInBytes() {
+return AbstractRecords.recordBatchHeaderSizeInBytes(
+RecordBatch.MAGIC_VALUE_V2,
+compressionType
+);
+}
+
+private int bytesNeededForRecords(
+Collection records,
+ObjectSerializationCache serializationCache
+) {
+long expectedNextOffset = nextOffset;
+int bytesNeeded = 0;
+for (T record : records) {
+if (expectedNextOffset - baseOffset >= Integer.MAX_VALUE) {
+return Integer.MAX_VALUE;

Review comment:
   Since we are handling this case by raising an exception, is it worth 
checking for overflow of `bytesNeeded` as well?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state

2021-02-18 Thread Alok Nikhil (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286810#comment-17286810
 ] 

Alok Nikhil commented on KAFKA-12345:
-

Adding a bit more context. Seems like there was a controller quorum voting 
event just before this crash. Seems to be related


{code:java}
[2021-02-19 01:04:20,721] INFO [Controller 3000] Fenced broker: 
FenceBrokerRecord(id=3, epoch=2) 
(org.apache.kafka.controller.ClusterControlManager)
[2021-02-19 01:04:21,354] INFO [BrokerMetadataListener id=0] Tell Replica 
Manager to handle records (kafka.server.metadata.BrokerMetadataListener)
[2021-02-19 01:04:22,097] INFO [RaftManager broker=0,controller=3000] Completed 
transition to Unattached(epoch=2, voters=[3000, 3001, 3002, 3003, 3004], 
electionTimeoutMs=690) (org.apache.kafka.raft.QuorumState)
[2021-02-19 01:04:22,203] INFO [RaftManager broker=0,controller=3000] Completed 
transition to Unattached(epoch=3, voters=[3000, 3001, 3002, 3003, 3004], 
electionTimeoutMs=584) (org.apache.kafka.raft.QuorumState)
[2021-02-19 01:04:22,507] INFO [RaftManager broker=0,controller=3000] Completed 
transition to Unattached(epoch=4, voters=[3000, 3001, 3002, 3003, 3004], 
electionTimeoutMs=280) (org.apache.kafka.raft.QuorumState)
[2021-02-19 01:04:22,626] INFO [RaftManager broker=0,controller=3000] Completed 
transition to Unattached(epoch=5, voters=[3000, 3001, 3002, 3003, 3004], 
electionTimeoutMs=161) (org.apache.kafka.raft.QuorumState)
[2021-02-19 01:04:22,626] INFO [RaftManager broker=0,controller=3000] Completed 
transition to Voted(epoch=5, votedId=3004, voters=[3000, 3001, 3002, 3003, 
3004], electionTimeoutMs=520) (org.apache.kafka.raft.QuorumState)
[2021-02-19 01:04:22,631] INFO [RaftManager broker=0,controller=3000] Completed 
transition to FollowerState(fetchTimeoutMs=2000, epoch=5, leaderId=3004, 
voters=[3000, 3001, 3002, 3003, 3004]) (org.apache.kafka.raft.QuorumState){code}

> KIP-500: AlterIsrManager crashes on broker idle-state
> -
>
> Key: KAFKA-12345
> URL: https://issues.apache.org/jira/browse/KAFKA-12345
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Alok Nikhil
>Priority: Major
>  Labels: kip-500
>
> Occasionally, a scheduler thread on a broker crashes with this stack
>  
> {code:java}
> [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 
> 'send-alter-isr' (kafka.utils.KafkaScheduler)
>  java.lang.NullPointerException
>  at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117)
>  at 
> kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85)
>  at 
> kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834){code}
>  
> After that the broker is unable to fetch any records from any other broker 
> (and vice versa)
> {code:java}
> [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, 
> fetcherId=0] Error sending fetch request (sessionId=164432409
>  2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler)
>  java.io.IOException: Connection to 4 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state

2021-02-18 Thread Alok Nikhil (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alok Nikhil updated KAFKA-12345:

Description: 
Occasionally, a scheduler thread on a broker crashes with this stack

 
{code:java}
[2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 
'send-alter-isr' (kafka.utils.KafkaScheduler)
 java.lang.NullPointerException
 at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117)
 at 
kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85)
 at kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
 at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
 at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:834){code}
 

After that the broker is unable to fetch any records from any other broker (and 
vice versa)
{code:java}
[2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, 
fetcherId=0] Error sending fetch request (sessionId=164432409
 2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler)
 java.io.IOException: Connection to 4 was disconnected before the response was 
read
 at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
 at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
 at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139)
 at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
 

  was:
Occasionally, a scheduler thread on a broker crashes with this stack

```
 [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 
'send-alter-isr' (kafka.utils.KafkaScheduler)
 java.lang.NullPointerException
 at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117)
 at 
kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85)
 at kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
 at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
 at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:834)

```


 After that the broker is unable to fetch any records from any other broker 
(and vice versa)

```
 [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, 
fetcherId=0] Error sending fetch request (sessionId=164432409
 2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler)
 java.io.IOException: Connection to 4 was disconnected before the response was 
read
 at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
 at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
 at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139)
 at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

```


> KIP-500: AlterIsrManager crashes on broker idle-state
> -
>
> Key: KAFKA-12345
> URL: https://issues.apache.org/jira/browse/KAFKA-12345
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Alok Nikhil
>Priority: Major
>  Labels: kip-500
>
> Occa

[jira] [Commented] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join

2021-02-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286808#comment-17286808
 ] 

Guozhang Wang commented on KAFKA-10847:
---

[~spena][~vcrfxia][~mjsax] I'd like to dump some more thoughts here regarding 
the design:

1) Regarding `fetchAll`: intuitively, even if we call this function on the 
newly added bookkeeping stores for those records who have not found a match 
yet, since the expiration frequency is high, the scan function should only 
return a few records each time. Within RocksDB, a range scan function is 
usually executed as 1) find the starting point based on the lower bound, 2) 
start scanning through the sst tables, stop at the higher bound. Among these 
two steps, the test spent finding the starting point should be roughly the same 
as the time spent for a single-key lookup, and since the calling frequency is 
high, the `low` and `high` bound on timestamp should be pretty close (maybe 
just a couple milliseconds away), which means step 2) should stop fairly 
quickly as well. We could see if this is indeed the case or not, e.g. when 
running the benchmark we first break down the latency to multiple stages: 1) 
time spent reading / writing in the window store, 2) time spent range-fetching 
in the expiration store, 3) deleting records in the expiration store, 4) join 
operator itself. And then within 2), we can plot a chart where the x-axis is 
the number of records returned, and y-axis is the e2e latency of getting the 
iterator and looping through it. If it shows that, even with say zero or one 
record returned, there's still a huge constant cost in latency, we may then 
consider 2) below.

2) If 1) shows that range-fetch in rocksDB has a constant amortized cost even 
with very few records returned, then instead of trying to expire records from 
the expiration store on each input record, we only try to fetch-and-delete from 
the expiration store periodically. We should then be careful about the period 
since if it is too infrequent, then we may introduce a much longer output 
emission latency, plus having the risk of blocking a thread from calling 
consumer.poll().

3) Another idea I have in mind is that, for the expiration store, instead of 
using two physical stores, one for each side, we can consider just adding a 
single store for both sides, with the key prefixed by the logical stream (e.g. 
a single bit prefix, "0" for left, and "1" for right). By doing that we can 
have one physical store less, and since the records from either side is 
clustered in the underlying layout, the range query should not be impacted much.

4) One more idea I have for the expiration store again: instead of using the 
sequence id to disable deduplication, we can consider removing that from the 
key to save 4 bytes per record, and instead serialize the value schema as a 
list, assuming that in practice most should be a singleton list, which 
we can serialize with only very small byte overhead. The downside though is 
that instead of being able to do a blind write to append, we need to call a get 
and then a write, though the get should return null in most time. I'm hoping 
that with RocksDB's bloomfilter, such a get should be efficient and worth the 
tradeoff.

> Avoid spurious left/outer join results in stream-stream join 
> -
>
> Key: KAFKA-10847
> URL: https://issues.apache.org/jira/browse/KAFKA-10847
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sergio Peña
>Priority: Major
>
> KafkaStreams follows an eager execution model, ie, it never buffers input 
> records but processes them right away. For left/outer stream-stream join, 
> this implies that left/outer join result might be emitted before the window 
> end (or window close) time is reached. Thus, a record what will be an 
> inner-join result, might produce a eager (and spurious) left/outer join 
> result.
> We should change the implementation of the join, to not emit eager left/outer 
> join result, but instead delay the emission of such result after the window 
> grace period passed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state

2021-02-18 Thread Alok Nikhil (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alok Nikhil updated KAFKA-12345:

Description: 
Occasionally, a scheduler thread on a broker crashes with this stack

```
 [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 
'send-alter-isr' (kafka.utils.KafkaScheduler)
 java.lang.NullPointerException
 at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117)
 at 
kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85)
 at kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
 at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
 at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:834)

```


 After that the broker is unable to fetch any records from any other broker 
(and vice versa)

```
 [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, 
fetcherId=0] Error sending fetch request (sessionId=164432409
 2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler)
 java.io.IOException: Connection to 4 was disconnected before the response was 
read
 at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
 at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
 at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139)
 at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

```

  was:
Occasionally, a scheduler thread on a broker crashes with this stack
[2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 
'send-alter-isr' (kafka.utils.KafkaScheduler)
java.lang.NullPointerException
at 
kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117)
at 
kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85)
at 
kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66)
at 
kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at 
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
After that the broker is unable to fetch any records from any other broker (and 
vice versa)
[2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, 
fetcherId=0] Error sending fetch request (sessionId=164432409
2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Connection to 4 was disconnected before the response was 
read
at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313)
at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139)
at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)


> KIP-500: AlterIsrManager crashes on broker idle-state
> -
>
> Key: KAFKA-12345
> URL: https://issues.apache.org/jira/browse/KAFKA-12345
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Report

[jira] [Created] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state

2021-02-18 Thread Alok Nikhil (Jira)
Alok Nikhil created KAFKA-12345:
---

 Summary: KIP-500: AlterIsrManager crashes on broker idle-state
 Key: KAFKA-12345
 URL: https://issues.apache.org/jira/browse/KAFKA-12345
 Project: Kafka
  Issue Type: Task
  Components: core
Reporter: Alok Nikhil


Occasionally, a scheduler thread on a broker crashes with this stack
[2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 
'send-alter-isr' (kafka.utils.KafkaScheduler)
java.lang.NullPointerException
at 
kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117)
at 
kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85)
at 
kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66)
at 
kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at 
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
After that the broker is unable to fetch any records from any other broker (and 
vice versa)
[2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, 
fetcherId=0] Error sending fetch request (sessionId=164432409
2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Connection to 4 was disconnected before the response was 
read
at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313)
at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139)
at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-18 Thread GitBox


cmccabe commented on pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#issuecomment-781745956


   > I think we need to handle preferred leader election in a special way. For 
example, if the assigned replicas are 1,2,3, isr is 2,3 and the current leader 
is 3, when doing preferred leader election, we want to keep the leader as 3 
instead of changing it to 2.
   
   Hmm, wouldn't we want to switch the leader to 2 in that case, since 2 is 
more preferred?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-18 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r578857828



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData.Listener;
+import 
org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metalog.LocalLogManagerTestEnv;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER0;
+import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.CONFIGS;
+import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class QuorumControllerTest {
+private static final Logger log =
+LoggerFactory.getLogger(QuorumControllerTest.class);
+
+/**
+ * Test creating a new QuorumController and closing it.
+ */
+@Test
+public void testCreateAndClose() throws Throwable {
+try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
+try (QuorumControllerTestEnv controlEnv =
+ new QuorumControllerTestEnv(logEnv, __ -> { })) {
+}
+}
+}
+
+/**
+ * Test setting some configuration values and reading them back.
+ */
+@Test
+public void testConfigurationOperations() throws Throwable {
+try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
+try (QuorumControllerTestEnv controlEnv =
+ new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS))) {
+testConfigurationOperations(controlEnv.activeController());
+}
+}
+}
+
+private void testConfigurationOperations(QuorumController controller) 
throws Throwable {
+assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE),
+controller.incrementalAlterConfigs(Collections.singletonMap(
+BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), 
true).get());
+assertEquals(Collections.singletonMap(BROKER0,
+new ResultOrError<>(Collections.emptyMap())),
+controller.describeConfigs(Collections.singletonMap(
+BROKER0, Collections.emptyList())).get());
+assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE),
+controller.incrementalAlterConfigs(Collections.singletonMap(
+BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), 
false).get());
+assertEquals(Collections.singletonMap(BROKER0, new 
ResultOrError<>(Collections.
+singletonMap("baz", "123"))),
+controller.describeConfigs(Collections.singletonMap(
+BROKER0, Collections.emptyList())).get

[jira] [Resolved] (KAFKA-12232) Distinguish API scope by broker/controller

2021-02-18 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12232.
-
Resolution: Duplicate

> Distinguish API scope by broker/controller
> --
>
> Key: KAFKA-12232
> URL: https://issues.apache.org/jira/browse/KAFKA-12232
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> After KIP-500, not all APIs will be available on all listeners. Specifically, 
> there are controller-only APIs which are only accessible on the controller 
> listener (e.g. the Raft APIs). In general, we have three API scopes:
> client: must be exposed on client listener
> broker: must be exposed on inter-broker listener
> controller: must be exposed on controller listener
> These categories are not mutually exclusive. The `Fetch` API is required on 
> all listeners as an example, so we need a way to represent the scope as a set 
> in `ApiKeys`.
> We should also put some thought into how this scope is reflected through the 
> ApiVersions API. I think it makes sense to only advertise APIs that can be 
> handled. For example, if the controller does not have a handler for the 
> `FindCoordinator` API, then it doesn't make sense to advertise it. 
> Potentially we could be even more restrictive when it comes to the 
> inter-broker APIs. For example, we might not need to advertise 
> `WriteTxnMarkers` on client-only listeners since a client should never use 
> this API. Alternatively, we can make it simple and just identify APIs by 
> controller, broker, or both.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12278) Keep api versions consistent with api scope

2021-02-18 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286803#comment-17286803
 ] 

Jason Gustafson commented on KAFKA-12278:
-

[~tombentley] Haha, I was about to write something about great minds thinking 
alike when I noticed that I filed the original. Since we merged this one, I'll 
close the other as a duplicate.

> Keep api versions consistent with api scope
> ---
>
> Key: KAFKA-12278
> URL: https://issues.apache.org/jira/browse/KAFKA-12278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> With KIP-500, some APIs are only accessible by the broker and some are only 
> accessible by the controller. We need a better way to indicate the scope of 
> the API so that we can keep it consistent with the `ApiVersions` API. 
> Basically we have the following scopes:
> - zk broker (e.g. LeaderAndIsr)
> - kip-500 broker (e.g. DecommissionBroker)
> - kip-500 controller (e.g. Envelope)
> These categories are not mutually exclusive. For example, the `Fetch` API 
> must be exposed in all scopes. We could go even further by distinguishing an 
> inter-broker scope, but that is probably not needed for now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12278) Keep api versions consistent with api scope

2021-02-18 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12278.
-
Resolution: Fixed

> Keep api versions consistent with api scope
> ---
>
> Key: KAFKA-12278
> URL: https://issues.apache.org/jira/browse/KAFKA-12278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> With KIP-500, some APIs are only accessible by the broker and some are only 
> accessible by the controller. We need a better way to indicate the scope of 
> the API so that we can keep it consistent with the `ApiVersions` API. 
> Basically we have the following scopes:
> - zk broker (e.g. LeaderAndIsr)
> - kip-500 broker (e.g. DecommissionBroker)
> - kip-500 controller (e.g. Envelope)
> These categories are not mutually exclusive. For example, the `Fetch` API 
> must be exposed in all scopes. We could go even further by distinguishing an 
> inter-broker scope, but that is probably not needed for now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-18 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r578851481



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.FeatureMap;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+
+public class FeatureControlManager {
+/**
+ * The features supported by this controller's software.
+ */
+private final Map supportedFeatures;
+
+/**
+ * Maps feature names to finalized version ranges.
+ */
+private final TimelineHashMap finalizedVersions;
+
+/**
+ * The latest feature epoch.
+ */
+private final TimelineHashSet epoch;
+
+FeatureControlManager(Map supportedFeatures,
+  SnapshotRegistry snapshotRegistry) {
+this.supportedFeatures = supportedFeatures;
+this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
+this.epoch = new TimelineHashSet<>(snapshotRegistry, 0);
+}
+
+ControllerResult> updateFeatures(
+Map updates, Set downgradeables,
+Map> brokerFeatures) {
+TreeMap results = new TreeMap<>();
+List records = new ArrayList<>();
+for (Entry entry : updates.entrySet()) {
+results.put(entry.getKey(), updateFeature(entry.getKey(), 
entry.getValue(),
+downgradeables.contains(entry.getKey()), brokerFeatures, 
records));
+}
+return new ControllerResult<>(records, results);
+}
+
+private ApiError updateFeature(String featureName,
+   VersionRange newRange,
+   boolean downgradeable,
+   Map> 
brokerFeatures,
+   List records) {
+if (newRange.min() <= 0) {
+return new ApiError(Errors.INVALID_UPDATE_VERSION,
+"The lower value for the new range cannot be less than 1.");
+}
+if (newRange.max() <= 0) {
+return new ApiError(Errors.INVALID_UPDATE_VERSION,
+"The upper value for the new range cannot be less than 1.");
+}
+VersionRange localRange = supportedFeatures.get(featureName);

Review comment:
   Yes, that's the problem. From a consistency perspective, it seems that 
we should use the supported features from either all controller nodes or none.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-18 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r578850804



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+
+public final class MockControllerMetrics implements ControllerMetrics {

Review comment:
   It's used in unit tests





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-18 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r578850598



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.FeatureMap;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+
+public class FeatureControlManager {
+/**
+ * The features supported by this controller's software.
+ */
+private final Map supportedFeatures;
+
+/**
+ * Maps feature names to finalized version ranges.
+ */
+private final TimelineHashMap finalizedVersions;
+
+/**
+ * The latest feature epoch.
+ */
+private final TimelineHashSet epoch;
+
+FeatureControlManager(Map supportedFeatures,
+  SnapshotRegistry snapshotRegistry) {
+this.supportedFeatures = supportedFeatures;
+this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
+this.epoch = new TimelineHashSet<>(snapshotRegistry, 0);
+}
+
+ControllerResult> updateFeatures(
+Map updates, Set downgradeables,
+Map> brokerFeatures) {
+TreeMap results = new TreeMap<>();
+List records = new ArrayList<>();
+for (Entry entry : updates.entrySet()) {
+results.put(entry.getKey(), updateFeature(entry.getKey(), 
entry.getValue(),
+downgradeables.contains(entry.getKey()), brokerFeatures, 
records));
+}
+return new ControllerResult<>(records, results);
+}
+
+private ApiError updateFeature(String featureName,
+   VersionRange newRange,
+   boolean downgradeable,
+   Map> 
brokerFeatures,
+   List records) {
+if (newRange.min() <= 0) {
+return new ApiError(Errors.INVALID_UPDATE_VERSION,
+"The lower value for the new range cannot be less than 1.");
+}
+if (newRange.max() <= 0) {
+return new ApiError(Errors.INVALID_UPDATE_VERSION,
+"The upper value for the new range cannot be less than 1.");
+}
+VersionRange localRange = supportedFeatures.get(featureName);

Review comment:
   Hmm... right now, we don't have a good way of finding out what features 
the other controllers support.  Maybe we will have to think more about this 
when we support rolling upgrade in kip-500.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10138: KAFKA-12331: Use LEO for the base offset of LeaderChangeMessage batch

2021-02-18 Thread GitBox


jsancio commented on pull request #10138:
URL: https://github.com/apache/kafka/pull/10138#issuecomment-781734293


   We got the following failures:
   ```
   core:
   ConfigCommandTest. shouldFailIfUnresolvableHost()
   ConfigCommandTest. shouldFailIfUnresolvableHostUsingZookeeper()
   DelegationTokenCommandTest. testDelegationTokenRequests()
   MetricsTest. testMetrics()
   DynamicConfigChangeTest. testIpHandlerUnresolvableAddress()
   DynamicConfigTest. shouldFailIpConfigsWithBadHost()
   
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12331) KafkaRaftClient should use the LEO when appending LeaderChangeMessage

2021-02-18 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12331.
-
Resolution: Fixed

> KafkaRaftClient should use the LEO when appending LeaderChangeMessage
> -
>
> Key: KAFKA-12331
> URL: https://issues.apache.org/jira/browse/KAFKA-12331
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> KafkaMetadataLog's appendAsLeader expects the base offset to match the LEO. 
> This is enforced when KafkaRaftClient uses the BatchAccumulator to create 
> batches. When creating the control batch for the LeaderChangeMessage the 
> KafkaRaftClient doesn't use the BatchAccumulator and instead creates the 
> batch with the default base offset of 0.
> This causes the validation in KafkaMetadataLog to fail with the following 
> exception:
> {code:java}
> kafka.common.UnexpectedAppendOffsetException: Unexpected offset in append to 
> @metadata-0. First offset 0 is less than the next offset 5. First 10 offsets 
> in append: ArrayBuffer(0), last offset in append: 0. Log start offset = 0
>   at kafka.log.Log.append(Log.scala:1217)
>   at kafka.log.Log.appendAsLeader(Log.scala:1092)
>   at kafka.raft.KafkaMetadataLog.appendAsLeader(KafkaMetadataLog.scala:92)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.appendAsLeader(KafkaRaftClient.java:1158)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.appendLeaderChangeMessage(KafkaRaftClient.java:449)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.onBecomeLeader(KafkaRaftClient.java:409)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.maybeTransitionToLeader(KafkaRaftClient.java:463)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.handleVoteResponse(KafkaRaftClient.java:663)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1530)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1652)
>   at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2183)
>   at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 
> {code}
> We should make the following changes:
>  # Fix MockLog to perform similar validation as 
> KafkaMetadataLog::appendAsLeader
>  # Use the LEO when creating the control batch for the LeaderChangedMessage



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10138: KAFKA-12331: Use LEO for the base offset of LeaderChangeMessage batch

2021-02-18 Thread GitBox


hachikuji merged pull request #10138:
URL: https://github.com/apache/kafka/pull/10138


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan edited a comment on pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

2021-02-18 Thread GitBox


jolshan edited a comment on pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#issuecomment-781732199


   Ran tests locally with newest code from trunk. Here are the tests that 
failed:
   ```
   ConsumerBounceTest.testClose()
   
ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
   ```
   
   These are tests I've found to be flaky running locally in the past.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan edited a comment on pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

2021-02-18 Thread GitBox


jolshan edited a comment on pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#issuecomment-781732199


   Ran tests locally with newest code from trunk. Here are the tests that 
failed:
   ```
   ConsumerBounceTest.testClose()
   
ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
   ```
   
   These are tests I've found to be flaky running locally.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

2021-02-18 Thread GitBox


jolshan commented on pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#issuecomment-781732199


   Ran tests locally with newest code from trunk. Here are the tests that 
failed:
   ConsumerBounceTest. testClose()
   ConnectionQuotasTest. 
testListenerConnectionRateLimitWhenActualRateAboveLimit()
   DynamicConnectionQuotaTest. testDynamicListenerConnectionCreationRateQuota()
   
   These are tests I've found to be flaky running locally.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10138: KAFKA-12331: Use LEO for the base offset of LeaderChangeMessage batch

2021-02-18 Thread GitBox


jsancio commented on pull request #10138:
URL: https://github.com/apache/kafka/pull/10138#issuecomment-781730638


   Ran the following commands locally:
   ```
   $ ./gradlew -version
   
   
   Gradle 6.8.1
   
   
   Build time:   2021-01-22 13:20:08 UTC
   Revision: 31f14a87d93945024ab7a78de84102a3400fa5b2
   
   Kotlin:   1.4.20
   Groovy:   2.5.12
   Ant:  Apache Ant(TM) version 1.10.9 compiled on September 27 2020
   JVM:  1.8.0_282 (Private Build 25.282-b08)
   OS:   Linux 5.8.0-7642-generic amd64
   
   ./gradlew -PscalaVersion=2.12 clean compileJava compileScala compileTestJava 
compileTestScala spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain 
rat --profile --no-daemon --continue -PxmlSpotBugsReport=true
   
   ./gradlew -PscalaVersion=2.13 clean compileJava compileScala compileTestJava 
compileTestScala spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain 
rat --profile --no-daemon --continue -PxmlSpotBugsReport=true
   
   ./gradlew -PscalaVersion=2.12 unitTest integrationTest --profile --no-daemon 
--continue -PtestLoggingEvents=started,passed,skipped,failed 
-PignoreFailures=true -PmaxParallelForks=2 -PmaxTestRetries=1 
-PmaxTestRetryFailures=5
   ```
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe closed pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

2021-02-18 Thread GitBox


cmccabe closed pull request #10066:
URL: https://github.com/apache/kafka/pull/10066


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

2021-02-18 Thread GitBox


cmccabe commented on pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#issuecomment-781727318


   pushed



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10134: TRIVIAL: fix JavaDocs formatting

2021-02-18 Thread GitBox


mjsax commented on pull request #10134:
URL: https://github.com/apache/kafka/pull/10134#issuecomment-781715946


   Merged to `trunk` and cherry-picked to `2.8` branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10134: TRIVIAL: fix JavaDocs formatting

2021-02-18 Thread GitBox


mjsax merged pull request #10134:
URL: https://github.com/apache/kafka/pull/10134


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10192) Flaky test BlockingConnectorTest#testBlockInConnectorStop

2021-02-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286776#comment-17286776
 ] 

Matthias J. Sax commented on KAFKA-10192:
-

Failed in setup: 
[https://github.com/apache/kafka/pull/10134/checks?check_run_id=1915517987] 
{quote} {{org.opentest4j.AssertionFailedError: Condition not met within timeout 
3. Worker did not complete startup in time ==> expected:  but was: 

at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
at 
org.apache.kafka.connect.integration.BlockingConnectorTest.setup(BlockingConnectorTest.java:133)}}
{quote}

> Flaky test BlockingConnectorTest#testBlockInConnectorStop
> -
>
> Key: KAFKA-10192
> URL: https://issues.apache.org/jira/browse/KAFKA-10192
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Boyang Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 2.6.0, 2.7.0
>
>
> h3. [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1218/]
> h3. Error Message
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> execute PUT request. Error response: \{"error_code":500,"message":"Request 
> timed out"}
> h3. Stacktrace
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> execute PUT request. Error response: \{"error_code":500,"message":"Request 
> timed out"} at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.putConnectorConfig(EmbeddedConnectCluster.java:346)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.configureConnector(EmbeddedConnectCluster.java:300)
>  at 
> org.apache.kafka.connect.integration.BlockingConnectorTest.createConnectorWithBlock(BlockingConnectorTest.java:185)
>  at 
> org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop(BlockingConnectorTest.java:140)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>  at 

[jira] [Commented] (KAFKA-10579) Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy

2021-02-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286774#comment-17286774
 ] 

Matthias J. Sax commented on KAFKA-10579:
-

Different test method for same test: 
[https://github.com/apache/kafka/pull/10134/checks?check_run_id=1915520915]  

{{org.opentest4j.AssertionFailedError: Condition not met within timeout 3. 
Didn't find the topics [connect-storage-topic-connect-cluster-1, 
connect-config-topic-connect-cluster-1, connect-offset-topic-connect-cluster-1] 
==> expected:  but was: 
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertTopicsExist(EmbeddedConnectClusterAssertions.java:163)
at 
org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithDefaultSettings(InternalTopicsIntegrationTest.java:81)}}

> Flaky test 
> connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
> 
>
> Key: KAFKA-10579
> URL: https://issues.apache.org/jira/browse/KAFKA-10579
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
>  
> {{java.lang.NullPointerException
>   at 
> java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
>   at org.reflections.Store.getAllIncluding(Store.java:82)
>   at org.reflections.Store.getAll(Store.java:93)
>   at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
>   at 
> org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
>   at 
> org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
>   at 
> org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167)
>   at 
> org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy(InternalTopicsIntegrationTest.java:260)}}
> {{}}
> https://github.com/apache/kafka/pull/9280/checks?check_run_id=1214776222{{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12336) custom stream naming does not work while calling stream[K, V](topicPattern: Pattern) API with named Consumed parameter

2021-02-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12336:

Labels: easy-fix newbie  (was: )

> custom stream naming does not work while calling stream[K, V](topicPattern: 
> Pattern) API with named Consumed parameter 
> ---
>
> Key: KAFKA-12336
> URL: https://issues.apache.org/jira/browse/KAFKA-12336
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Ramil Israfilov
>Priority: Major
>  Labels: easy-fix, newbie
>
> In our Scala application I am trying to implement custom naming for Kafka 
> Streams application nodes.
> We are using topicPattern for our stream source.
> Here is an API which I am calling:
>  
> {code:java}
> val topicsPattern="t-[A-Za-z0-9-].suffix"
> val operations: KStream[MyKey, MyValue] =
>   builder.stream[MyKey, MyValue](Pattern.compile(topicsPattern))(
> Consumed.`with`[MyKey, MyValue].withName("my-fancy-name")
>   )
> {code}
>  Despite the fact that I am providing Consumed with custom name the topology 
> describe still show "KSTREAM-SOURCE-00" as name for our stream source.
> It is not a problem if I just use a name for topic. But our application needs 
> to get messages from set of topics based on topicname pattern matching.
> After checking the kakfa code I see that
> org.apache.kafka.streams.kstream.internals.InternalStreamBuilder (on line 
> 103) has a bug:
> {code:java}
> public  KStream stream(final Pattern topicPattern,
>final ConsumedInternal consumed) {
> final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
> final StreamSourceNode streamPatternSourceNode = new 
> StreamSourceNode<>(name, topicPattern, consumed);
> {code}
> node name construction does not take into account the name of consumed 
> parameter.
> For example code for another stream api call with topic name does it 
> correctly:
> {code:java}
> final String name = new 
> NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, 
> KStreamImpl.SOURCE_NAME);
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12336) custom stream naming does not work while calling stream[K, V](topicPattern: Pattern) API with named Consumed parameter

2021-02-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12336:

Priority: Minor  (was: Major)

> custom stream naming does not work while calling stream[K, V](topicPattern: 
> Pattern) API with named Consumed parameter 
> ---
>
> Key: KAFKA-12336
> URL: https://issues.apache.org/jira/browse/KAFKA-12336
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Ramil Israfilov
>Priority: Minor
>  Labels: easy-fix, newbie
>
> In our Scala application I am trying to implement custom naming for Kafka 
> Streams application nodes.
> We are using topicPattern for our stream source.
> Here is an API which I am calling:
>  
> {code:java}
> val topicsPattern="t-[A-Za-z0-9-].suffix"
> val operations: KStream[MyKey, MyValue] =
>   builder.stream[MyKey, MyValue](Pattern.compile(topicsPattern))(
> Consumed.`with`[MyKey, MyValue].withName("my-fancy-name")
>   )
> {code}
>  Despite the fact that I am providing Consumed with custom name the topology 
> describe still show "KSTREAM-SOURCE-00" as name for our stream source.
> It is not a problem if I just use a name for topic. But our application needs 
> to get messages from set of topics based on topicname pattern matching.
> After checking the kakfa code I see that
> org.apache.kafka.streams.kstream.internals.InternalStreamBuilder (on line 
> 103) has a bug:
> {code:java}
> public  KStream stream(final Pattern topicPattern,
>final ConsumedInternal consumed) {
> final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
> final StreamSourceNode streamPatternSourceNode = new 
> StreamSourceNode<>(name, topicPattern, consumed);
> {code}
> node name construction does not take into account the name of consumed 
> parameter.
> For example code for another stream api call with topic name does it 
> correctly:
> {code:java}
> final String name = new 
> NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, 
> KStreamImpl.SOURCE_NAME);
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12328) Expose TaskId partition number

2021-02-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12328:

Summary: Expose TaskId partition number  (was: Find out partition of a 
store iterator)

> Expose TaskId partition number
> --
>
> Key: KAFKA-12328
> URL: https://issues.apache.org/jira/browse/KAFKA-12328
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: fml2
>Priority: Major
>  Labels: needs-kip
>
> This question was posted [on 
> stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
>  and got an answer but the solution is quite complicated hence this ticket.
>  
> In my Kafka Streams application, I have a task that sets up a scheduled (by 
> the wall time) punctuator. The punctuator iterates over the entries of a 
> store and does something with them. Like this:
> {code:java}
> var store = context().getStateStore("MyStore");
> var iter = store.all();
> while (iter.hasNext()) {
>var entry = iter.next();
>// ... do something with the entry
> }
> // Print a summary (now): N entries processed
> // Print a summary (wish): N entries processed in partition P
> {code}
> Is it possible to find out which partition the punctuator operates on? The 
> java docs for {{ProcessorContext.partition()}} states that this method 
> returns {{-1}} within punctuators.
> I've read [Kafka Streams: Punctuate vs 
> Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
>  and the answers there. I can understand that a task is, in general, not tied 
> to a particular partition. But an iterator should be tied IMO.
> How can I find out the partition?
> Or is my assumption that a particular instance of a store iterator is tied to 
> a partion wrong?
> What I need it for: I'd like to include the partition number in some log 
> messages. For now, I have several nearly identical log messages stating that 
> the punctuator does this and that. In order to make those messages "unique" 
> I'd like to include the partition number into them.
> Since I'm working with a single store here (which might be partitioned), I 
> assume that every single execution of the punctuator is bound to a single 
> partition of that store.
>  
> It would be cool if there were a method {{iterator.partition}} (or similar) 
> to get this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12328) Find out partition of a store iterator

2021-02-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12328:

Labels: needs-kip  (was: )

> Find out partition of a store iterator
> --
>
> Key: KAFKA-12328
> URL: https://issues.apache.org/jira/browse/KAFKA-12328
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: fml2
>Priority: Major
>  Labels: needs-kip
>
> This question was posted [on 
> stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
>  and got an answer but the solution is quite complicated hence this ticket.
>  
> In my Kafka Streams application, I have a task that sets up a scheduled (by 
> the wall time) punctuator. The punctuator iterates over the entries of a 
> store and does something with them. Like this:
> {code:java}
> var store = context().getStateStore("MyStore");
> var iter = store.all();
> while (iter.hasNext()) {
>var entry = iter.next();
>// ... do something with the entry
> }
> // Print a summary (now): N entries processed
> // Print a summary (wish): N entries processed in partition P
> {code}
> Is it possible to find out which partition the punctuator operates on? The 
> java docs for {{ProcessorContext.partition()}} states that this method 
> returns {{-1}} within punctuators.
> I've read [Kafka Streams: Punctuate vs 
> Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
>  and the answers there. I can understand that a task is, in general, not tied 
> to a particular partition. But an iterator should be tied IMO.
> How can I find out the partition?
> Or is my assumption that a particular instance of a store iterator is tied to 
> a partion wrong?
> What I need it for: I'd like to include the partition number in some log 
> messages. For now, I have several nearly identical log messages stating that 
> the punctuator does this and that. In order to make those messages "unique" 
> I'd like to include the partition number into them.
> Since I'm working with a single store here (which might be partitioned), I 
> assume that every single execution of the punctuator is bound to a single 
> partition of that store.
>  
> It would be cool if there were a method {{iterator.partition}} (or similar) 
> to get this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12328) Find out partition of a store iterator

2021-02-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286771#comment-17286771
 ] 

Matthias J. Sax commented on KAFKA-12328:
-

Personally, I am open to expose the partition number from the task-id. It would 
require a KIP though. – I guess we can leave this ticket open an see if there 
is more interest.

> Find out partition of a store iterator
> --
>
> Key: KAFKA-12328
> URL: https://issues.apache.org/jira/browse/KAFKA-12328
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: fml2
>Priority: Major
>
> This question was posted [on 
> stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
>  and got an answer but the solution is quite complicated hence this ticket.
>  
> In my Kafka Streams application, I have a task that sets up a scheduled (by 
> the wall time) punctuator. The punctuator iterates over the entries of a 
> store and does something with them. Like this:
> {code:java}
> var store = context().getStateStore("MyStore");
> var iter = store.all();
> while (iter.hasNext()) {
>var entry = iter.next();
>// ... do something with the entry
> }
> // Print a summary (now): N entries processed
> // Print a summary (wish): N entries processed in partition P
> {code}
> Is it possible to find out which partition the punctuator operates on? The 
> java docs for {{ProcessorContext.partition()}} states that this method 
> returns {{-1}} within punctuators.
> I've read [Kafka Streams: Punctuate vs 
> Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
>  and the answers there. I can understand that a task is, in general, not tied 
> to a particular partition. But an iterator should be tied IMO.
> How can I find out the partition?
> Or is my assumption that a particular instance of a store iterator is tied to 
> a partion wrong?
> What I need it for: I'd like to include the partition number in some log 
> messages. For now, I have several nearly identical log messages stating that 
> the punctuator does this and that. In order to make those messages "unique" 
> I'd like to include the partition number into them.
> Since I'm working with a single store here (which might be partitioned), I 
> assume that every single execution of the punctuator is bound to a single 
> partition of that store.
>  
> It would be cool if there were a method {{iterator.partition}} (or similar) 
> to get this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

2021-02-18 Thread GitBox


cmccabe commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578826004



##
File path: checkstyle/import-control.xml
##
@@ -99,6 +100,7 @@
   
   
   
+  

Review comment:
   Hmm.  Let's revisit this after 2.8.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

2021-02-18 Thread GitBox


cmccabe commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578825895



##
File path: core/src/main/scala/kafka/server/ApiVersionManager.scala
##
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.api.ApiVersion
+import kafka.network
+import kafka.network.RequestChannel
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import org.apache.kafka.common.message.ApiVersionsResponseData
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.ApiVersionsResponse
+
+import scala.jdk.CollectionConverters._
+
+trait ApiVersionManager {
+  def listenerType: ListenerType
+  def enabledApis: collection.Set[ApiKeys]
+  def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
+  def isApiEnabled(apiKey: ApiKeys): Boolean = enabledApis.contains(apiKey)
+  def newRequestMetrics: RequestChannel.Metrics = new 
network.RequestChannel.Metrics(enabledApis)
+}
+
+object ApiVersionManager {
+  def apply(
+listenerType: ListenerType,
+config: KafkaConfig,
+forwardingManager: Option[ForwardingManager],

Review comment:
   ok





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

2021-02-18 Thread GitBox


hachikuji commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r578823423



##
File path: bin/kafka-metadata-shell.sh
##
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.shell.MetadataShell "$@"

Review comment:
   Actually, looks like this is documented in the KIP..





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

2021-02-18 Thread GitBox


hachikuji commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r578818603



##
File path: bin/kafka-metadata-shell.sh
##
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.shell.MetadataShell "$@"

Review comment:
   Or maybe an alternative is to put this under bin/metadata





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12344) Support SlidingWindows in the Scala API

2021-02-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12344:

Affects Version/s: 2.7.0

> Support SlidingWindows in the Scala API
> ---
>
> Key: KAFKA-12344
> URL: https://issues.apache.org/jira/browse/KAFKA-12344
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Leah Thomas
>Assignee: Leah Thomas
>Priority: Major
>
> in KIP-450 we implemented sliding windows for the Java API but left out a few 
> crucial methods to allow sliding windows to work through the Scala API. We 
> need to add those methods to make the Scala API fully leverage sliding windows



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-18 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r578817527



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef.ConfigKey;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource.Type;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
+
+public class ConfigurationControlManager {
+private final Logger log;
+private final SnapshotRegistry snapshotRegistry;
+private final Map configDefs;
+private final TimelineHashMap> configData;
+
+ConfigurationControlManager(LogContext logContext,
+SnapshotRegistry snapshotRegistry,
+Map 
configDefs) {
+this.log = logContext.logger(ConfigurationControlManager.class);
+this.snapshotRegistry = snapshotRegistry;
+this.configDefs = configDefs;
+this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+
+/**
+ * Determine the result of applying a batch of incremental configuration 
changes.  Note
+ * that this method does not change the contents of memory.  It just 
generates a
+ * result, that you can replay later if you wish using replay().
+ *
+ * Note that there can only be one result per ConfigResource.  So if you 
try to modify
+ * several keys and one modification fails, the whole ConfigKey fails and 
nothing gets
+ * changed.
+ *
+ * @param configChanges Maps each resource to a map from config keys to
+ *  operation data.
+ * @return  The result.
+ */
+ControllerResult> incrementalAlterConfigs(
+Map>> 
configChanges) {
+List outputRecords = new ArrayList<>();
+Map outputResults = new HashMap<>();
+for (Entry>> 
resourceEntry :
+configChanges.entrySet()) {
+incrementalAlterConfigResource(resourceEntry.getKey(),
+resourceEntry.getValue(),
+outputRecords,
+outputResults);
+}
+return new ControllerResult<>(outputRecords, outputResults);
+}
+
+private void incrementalAlterConfigResource(ConfigResource configResource,
+Map> keysToOps,
+List 
outputRecords,
+Map 
outputResults) {
+ApiError error = checkConfigResource(configResource);
+if (error.isFailure()) {
+outputResults.put(configResource, error);
+return;
+}
+List newRecords = new ArrayList<>();
+for (Entry> keysToOpsEntry : 
keysToOps.entrySet()) {
+String key = keysToOpsEntry.getKey();
+String currentValue = null;
+TimelineHashMap currentConfigs = 
configData.get(configResource);
+if (currentConfigs != null) {
+currentValue = currentConfigs.get(key);
+}
+String newValue = currentValue;
+Entry opTypeAndNewValue = 
keysToOpsEntry.getValue();
+OpType

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-18 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r578817611



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
##
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.UsableBroker;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static org.apache.kafka.controller.BrokerControlState.FENCED;
+import static 
org.apache.kafka.controller.BrokerControlState.CONTROLLED_SHUTDOWN;
+import static org.apache.kafka.controller.BrokerControlState.SHUTDOWN_NOW;
+import static org.apache.kafka.controller.BrokerControlState.UNFENCED;
+
+
+/**
+ * The BrokerHeartbeatManager manages all the soft state associated with 
broker heartbeats.
+ * Soft state is state which does not appear in the metadata log.  This state 
includes
+ * things like the last time each broker sent us a heartbeat, and whether the 
broker is
+ * trying to perform a controlled shutdown.
+ *
+ * Only the active controller has a BrokerHeartbeatManager, since only the 
active
+ * controller handles broker heartbeats.  Standby controllers will create a 
heartbeat
+ * manager as part of the process of activating.  This design minimizes the 
size of the
+ * metadata partition by excluding heartbeats from it.  However, it does mean 
that after
+ * a controller failover, we may take some extra time to fence brokers, since 
the new
+ * active controller does not know when the last heartbeats were received from 
each.
+ */
+public class BrokerHeartbeatManager {
+static class BrokerHeartbeatState {
+/**
+ * The broker ID.
+ */
+private final int id;
+
+/**
+ * The last time we received a heartbeat from this broker, in 
monotonic nanoseconds.
+ * When this field is updated, we also may have to update the broker's 
position in
+ * the unfenced list.
+ */
+long lastContactNs;
+
+/**
+ * The last metadata offset which this broker reported.  When this 
field is updated,
+ * we may also have to update the broker's position in the active set.
+ */
+long metadataOffset;
+
+/**
+ * The offset at which the broker should complete its controlled 
shutdown, or -1
+ * if the broker is not performing a controlled shutdown.  When this 
field is
+ * updated, we also have to update the broker's position in the 
shuttingDown set.
+ */
+private long controlledShutDownOffset;
+
+/**
+ * The previous entry in the unfenced list, or null if the broker is 
not in that list.
+ */
+private BrokerHeartbeatState prev;
+
+/**
+ * The next entry in the unfenced list, or null if the broker is not 
in that list.
+ */
+private BrokerHeartbeatState next;
+
+BrokerHeartbeatState(int id) {
+this.id = id;
+this.lastContactNs = 0;
+this.prev = null;
+this.next = null;
+this.metadataOffset = -1;
+this.controlledShutDownOffset = -1;
+}
+
+/**
+ * Returns the broker ID.
+ */
+int id() {
+return id;
+}
+
+/**
+ * Returns true only if the broker is fenced.
+ */
+boolean fenced() {
+return prev == null;
+}
+
+/**
+ * Returns true only if the broker is in controlled shutdown state.
+ */
+boolean shuttingDown() {
+return controlledShutDownOffset >= 0;
+   

[jira] [Updated] (KAFKA-12344) Support SlidingWindows in the Scala API

2021-02-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12344:

Labels:   (was: streams)

> Support SlidingWindows in the Scala API
> ---
>
> Key: KAFKA-12344
> URL: https://issues.apache.org/jira/browse/KAFKA-12344
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Leah Thomas
>Assignee: Leah Thomas
>Priority: Major
>
> in KIP-450 we implemented sliding windows for the Java API but left out a few 
> crucial methods to allow sliding windows to work through the Scala API. We 
> need to add those methods to make the Scala API fully leverage sliding windows



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12344) Support SlidingWindows in the Scala API

2021-02-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12344:

Component/s: streams

> Support SlidingWindows in the Scala API
> ---
>
> Key: KAFKA-12344
> URL: https://issues.apache.org/jira/browse/KAFKA-12344
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Leah Thomas
>Assignee: Leah Thomas
>Priority: Major
>  Labels: streams
>
> in KIP-450 we implemented sliding windows for the Java API but left out a few 
> crucial methods to allow sliding windows to work through the Scala API. We 
> need to add those methods to make the Scala API fully leverage sliding windows



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

2021-02-18 Thread GitBox


hachikuji commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r578813351



##
File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/**
+ * The LocalLogManager is a test implementation that relies on the contents of 
memory.
+ */
+public final class LocalLogManager implements MetaLogManager, AutoCloseable {

Review comment:
   We have this class already checked in under `metadata/src/test/java`. If 
it needs to be here, can we just move it?

##
File path: settings.gradle
##
@@ -29,6 +29,7 @@ include 'clients',
 'log4j-appender',
 'metadata',
 'raft',
+'shell',

Review comment:
   I liked @mumrah's suggestion to call this module `metashell`.

##
File path: core/src/main/scala/kafka/server/Server.scala
##
@@ -29,6 +29,7 @@ trait Server {
 }
 
 object Server {
+  val metadataTopicName = "@metadata"

Review comment:
   Can we use `KafkaRaftServer.MetadataTopic` and remove this?
   

##
File path: bin/kafka-metadata-shell.sh
##
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.shell.MetadataShell "$@"

Review comment:
   The shell basically becomes a public api with this. I thought I recalled 
that we were going to do a separate KIP? An alternative would be to locate this 
under `shell/bin`. Or maybe we can print a message when the tool starts out 
which emphasizes that this is an experimental tool without any compatibility 
guarantees.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10155: Fix Raft broker restart issue when offset partitions are deferred

2021-02-18 Thread GitBox


rondagostino commented on pull request #10155:
URL: https://github.com/apache/kafka/pull/10155#issuecomment-781694442


   Successfully ran `./gradlew build` after commenting out these two flaky 
tests, **both of which passed locally when run individually afterwards**:
   
   
`ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()`
   `ConsumerBounceTest.testClose()`
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12344) Support SlidingWindows in the Scala API

2021-02-18 Thread Leah Thomas (Jira)
Leah Thomas created KAFKA-12344:
---

 Summary: Support SlidingWindows in the Scala API
 Key: KAFKA-12344
 URL: https://issues.apache.org/jira/browse/KAFKA-12344
 Project: Kafka
  Issue Type: Improvement
Reporter: Leah Thomas
Assignee: Leah Thomas


in KIP-450 we implemented sliding windows for the Java API but left out a few 
crucial methods to allow sliding windows to work through the Scala API. We need 
to add those methods to make the Scala API fully leverage sliding windows



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda opened a new pull request #10156: KAFKA-10345 (WIP): File watch store reloading

2021-02-18 Thread GitBox


abbccdda opened a new pull request #10156:
URL: https://github.com/apache/kafka/pull/10156


   Add file-based store reloading mechanism, which does both file watch 
triggering and time based reloading in a separate thread.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10146: HOTFIX: fix Scala 2.12 build error caused by ControllerApisTest.scala

2021-02-18 Thread GitBox


ijuma merged pull request #10146:
URL: https://github.com/apache/kafka/pull/10146


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10146: HOTFIX: fix Scala 2.12 build error caused by ControllerApisTest.scala

2021-02-18 Thread GitBox


rondagostino commented on pull request #10146:
URL: https://github.com/apache/kafka/pull/10146#issuecomment-781651604


   Successfully confirmed this fix locally as per comment in #10155.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10155: Fix Raft broker restart issue when offset partitions are deferred

2021-02-18 Thread GitBox


rondagostino commented on pull request #10155:
URL: https://github.com/apache/kafka/pull/10155#issuecomment-781650728


   I successfully compiled locally.  Compiling with `-PscalaVersion=2.12` 
failed locally but then succeeded after applying the one-liner fix from 
https://github.com/apache/kafka/pull/10146.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10113: MINOR: Add KIP-500 BrokerServer and ControllerServer

2021-02-18 Thread GitBox


ijuma commented on pull request #10113:
URL: https://github.com/apache/kafka/pull/10113#issuecomment-781640866


   @cmccabe Looks like the commit message is not as helpful as the PR 
description. We typically copy the PR description to the commit message.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10127: MINOR: Remove unused LeaderAndIsrResponse.partitions() since it has been replaced with partitionErrors()

2021-02-18 Thread GitBox


ijuma commented on pull request #10127:
URL: https://github.com/apache/kafka/pull/10127#issuecomment-781639872


   The implementation that was removed was more efficient than the new 
implementation. Is there a reason for that?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >