[GitHub] [kafka] kowshik edited a comment on pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-14 Thread GitBox


kowshik edited a comment on pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#issuecomment-861219607


   I ran the system tests in `kafkatest.tests.client.consumer_test` again:
* [System test run 
#4564](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4564/) 
against  `trunk/6de37e536ac76ef13530d49dc7320110332cd1ee`.
* [System test run 
#4566](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4566/) 
against 008b701386ce5a4d892d6ac5b90798b981c4fba0 (the latest commit from this 
PR).
   
   All tests passed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-14 Thread GitBox


kowshik commented on pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#issuecomment-861219607


   I ran the system tests in `kafkatest.tests.client.consumer_test` again:
* 
[4564](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4564/) 
against  `trunk/6de37e536ac76ef13530d49dc7320110332cd1ee`.
* 
[4566](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4566/) 
against 008b701386ce5a4d892d6ac5b90798b981c4fba0 (the latest commit from this 
PR).
   
   All tests passed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik edited a comment on pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-14 Thread GitBox


kowshik edited a comment on pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#issuecomment-860537313


   @junrao Thanks for the review! I ran the system tests.
   1. [System test run 
#4560](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4560/) 
on top of the latest commit 008b701386ce5a4d892d6ac5b90798b981c4fba0 from this 
PR. The run finished with 12 test failures.
   2. [System test run 
#4561](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4561/) 
against AK trunk on top of commit 6de37e536ac76ef13530d49dc7320110332cd1ee 
which does not contain changes from this PR. The run finished with 13 test 
failures.
   
   There were 11 overlapping failures in both (1) and (2). For these, I didn't 
find anything abnormal in the logs so far, the failure reason seems similar in 
both.
   
   The only new failure in (1) that's not present in (2) was:
   
   ```
   Module: kafkatest.tests.client.consumer_test
   Class:  OffsetValidationTest
   Method: test_broker_failure
   Arguments:
   {
 "clean_shutdown": true,
 "enable_autocommit": false,
 "metadata_quorum": "REMOTE_KRAFT"
   }
   ```
   
   Logs indicate that the test failed [at this 
line](https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/tests/kafkatest/tests/client/consumer_test.py#L388)
 because one of the worker nodes running the consumer didn't complete within 
the timeout of 30s. This doesn't seem indicative of a real failure (yet), so 
I'm rerunning the system tests again to check if the failure is consistent. 
I'll keep you posted on the outcome of this second run.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #10843: MINOR: Log formatting for exceptions during configuration related operations

2021-06-14 Thread GitBox


dajac merged pull request #10843:
URL: https://github.com/apache/kafka/pull/10843


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #10843: MINOR: Log formatting for exceptions during configuration related operations

2021-06-14 Thread GitBox


dajac commented on pull request #10843:
URL: https://github.com/apache/kafka/pull/10843#issuecomment-861202533


   @YiDing-Duke Thanks for the patch! Merging to trunk and 2.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10871: KAFKA-8940: decrease session timeout to make test faster and reliable

2021-06-14 Thread GitBox


showuon commented on pull request #10871:
URL: https://github.com/apache/kafka/pull/10871#issuecomment-861147367


   @ableegoldman , thanks for good reminder. I totally agree with you. I've 
updated the PR description and in the JIRA ticket comment. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8295) Optimize count() using RocksDB merge operator

2021-06-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363356#comment-17363356
 ] 

A. Sophie Blee-Goldman commented on KAFKA-8295:
---

Yes, any of the count DSL operators. It may be a bit more tricky than it 
appears on the surface because count is actually converted into a generic 
aggregation under the covers, so you'd have to tease it out into its own 
independent optimized implementation. To be honest, I don't have a good sense 
of whether it's even worth the additional code complexity, because I don't know 
how much additional code and/or code paths this will introduce :) I recommend 
looking into that before jumping straight in.

Of course, we could consider introducing some kind of top-level merge-based 
operator to the DSL as a feature in its own right. Then count could just be 
converted to use that instead of the aggregation implementation. 

Not sure what that would look like, or if it would even be useful at all – just 
throwing out thoughts here. Anyways I just thought it would be interesting to 
explore what we might be able to do with this merge operator in Kafka Streams, 
whether that's an optimization of existing operators or some kind of first 
class operator of its own. That's really the point of this ticket: to explore 
the merge operator.

> Optimize count() using RocksDB merge operator
> -
>
> Key: KAFKA-8295
> URL: https://issues.apache.org/jira/browse/KAFKA-8295
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>
> In addition to regular put/get/delete RocksDB provides a fourth operation, 
> merge. This essentially provides an optimized read/update/write path in a 
> single operation. One of the built-in (C++) merge operators exposed over the 
> Java API is a counter. We should be able to leverage this for a more 
> efficient implementation of count()
>  
> (Note: Unfortunately it seems unlikely we can use this to optimize general 
> aggregations, even if RocksJava allowed for a custom merge operator, unless 
> we provide a way for the user to specify and connect a C++ implemented 
> aggregator – otherwise we incur too much cost crossing the jni for a net 
> performance benefit)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12844) KIP-740 follow up: clean up TaskId

2021-06-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363355#comment-17363355
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12844:


As mentioned elsewhere, this ticket is marked for 4.0 and as such, cannot be 
worked on yet. When 4.0 is announced you are free to pick this up and work on 
it again, but as of this point we don't yet know when version 4.0 will be 
released. Most likely we will have 3.1 after the in-progress 3.0, though I 
suppose it depends on the Zookeeper removal work.

> KIP-740 follow up: clean up TaskId
> --
>
> Key: KAFKA-12844
> URL: https://issues.apache.org/jira/browse/KAFKA-12844
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: loboxu
>Priority: Blocker
> Fix For: 4.0.0
>
>
> See 
> [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]
>  – for the TaskId class, we need to remove the following deprecated APIs:
>  # The public partition and topicGroupId fields should be "removed", ie made 
> private (can also now rename topicGroupId to subtopology to match the getter)
>  # The two #readFrom and two #writeTo methods can be removed (they have 
> already been converted to internal utility methods we now use instead, so 
> just remove them)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata

2021-06-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363354#comment-17363354
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12843:


This was pointed out on the PR, but just leaving a note on the ticket for 
visibility/, plus to clarify for anyone else who comes across this:

This ticket is marked for fix in version 4.0, which means we can't work on it 
yet. We are only in the process of releasing 3.0 at the moment, and it's likely 
that 3.1 will come after that. Once the bump to 4.0 has been decided, you can 
pick this up again and actually work on it. 

> KIP-740 follow up: clean up TaskMetadata
> 
>
> Key: KAFKA-12843
> URL: https://issues.apache.org/jira/browse/KAFKA-12843
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: loboxu
>Priority: Blocker
> Fix For: 4.0.0
>
>
> See 
> [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]
>  – for the TaskMetadata class, we need to:
>  # Deprecate the TaskMetadata#getTaskId method
>  # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() 
> API that returns a TaskId instead of a String
>  # Remove the deprecated constructor



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10871: KAFKA-8940: decrease session timeout to make test faster and reliable

2021-06-14 Thread GitBox


ableegoldman commented on pull request #10871:
URL: https://github.com/apache/kafka/pull/10871#issuecomment-861138874


   Thanks @showuon. Can you add a comment or update the description with the 
specific error message in the failure mode that this fix is intended to 
address? As you point out, my analysis of the test from a while back shows that 
we need to shore up either the input data production or the output verification 
itself to get this totally correct. You can detect when the failure is due to 
that bug in the test assumptions because the associated error is the 
`java.lang.AssertionError: verifying tagg` exception message. 
   
   It would be good to explicitly point out what kind of failure (ie the error 
message/exception/stacktrace) this fix was directed at, so we can keep an eye 
out for it and adjust the session timeout further if necessary. (I don't really 
expect it will, but you know how it is 🙂 )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12690) Remove deprecated Producer#sendOffsetsToTransaction

2021-06-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363343#comment-17363343
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12690:


Hey [~loboxu], as pointed out elsewhere this ticket is marked for 4.0, and is 
not yet ready to be worked on. You're absolutely free to pick this up when the 
time comes around, so you can leave yourself assigned if you'd like, but it may 
be a while before 4.0 comes around. Just a heads up :) 

> Remove deprecated Producer#sendOffsetsToTransaction
> ---
>
> Key: KAFKA-12690
> URL: https://issues.apache.org/jira/browse/KAFKA-12690
> Project: Kafka
>  Issue Type: Task
>  Components: producer 
>Reporter: A. Sophie Blee-Goldman
>Assignee: loboxu
>Priority: Blocker
> Fix For: 4.0.0
>
>
> In 
> [KIP-732|https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2]
>  we deprecated the EXACTLY_ONCE and EXACTLY_ONCE_BETA configs in 
> StreamsConfig, to be removed in 4.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12689) Remove deprecated EOS configs

2021-06-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363342#comment-17363342
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12689:


Hey [~loboxu], this ticket is marked with 4.0 as the Fix Version, which means 
it can't be worked on until the version 4.0. This version has yet to be 
announced, and since 3.0 is only just about to be released I would not assume 
that 4.0 is definitely right around the corner. Most likely 4.0 will be 
soon-ish, but I would look for other tickets to work on for now.

> Remove deprecated EOS configs
> -
>
> Key: KAFKA-12689
> URL: https://issues.apache.org/jira/browse/KAFKA-12689
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: loboxu
>Priority: Blocker
> Fix For: 4.0.0
>
>
> In 
> [KIP-732|https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2]
>  we deprecated the EXACTLY_ONCE and EXACTLY_ONCE_BETA configs in 
> StreamsConfig, to be removed in 4.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10878: KAFKA-12898; Owned partitions in the subscription must be sorted

2021-06-14 Thread GitBox


ableegoldman commented on a change in pull request #10878:
URL: https://github.com/apache/kafka/pull/10878#discussion_r651411365



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
##
@@ -70,16 +71,24 @@ public static ByteBuffer serializeSubscription(final 
Subscription subscription,
 version = checkSubscriptionVersion(version);
 
 ConsumerProtocolSubscription data = new ConsumerProtocolSubscription();
-data.setTopics(subscription.topics());
+
+List topics = new ArrayList<>(subscription.topics());
+topics.sort(null);

Review comment:
   ```suggestion
   Collections.sort(topics);
   ```

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
##
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import java.nio.BufferUnderflowException;
+import java.util.Comparator;

Review comment:
   nit: move these below the `o.a.k` imports




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10883: KAFKA-12949: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh

2021-06-14 Thread GitBox


ijuma commented on pull request #10883:
URL: https://github.com/apache/kafka/pull/10883#issuecomment-861129160


   @hachikuji @jsancio do we want to keep this class? It's the second time it 
breaks so we either need to improve test coverage or we should remove it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10876: KAFKA-12843: KIP-740 follow up: clean up TaskMetadata

2021-06-14 Thread GitBox


ableegoldman commented on pull request #10876:
URL: https://github.com/apache/kafka/pull/10876#issuecomment-861126045


   Thanks for the PR, but @jlprat is correct. Unfortunately this ticket will 
need to wait for the 4.0 release, which as of this time has not yet even been 
announced. You can work on this when that time does come around, but for now I 
recommend looking for another ticket that can actually be worked on right away. 
Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #10367: KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector

2021-06-14 Thread GitBox


showuon edited a comment on pull request #10367:
URL: https://github.com/apache/kafka/pull/10367#issuecomment-861125710


   @kkonstantine , could you check this PR? Or I should find another guy to 
review this PR since it's been 3 months? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10367: KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector

2021-06-14 Thread GitBox


showuon commented on pull request #10367:
URL: https://github.com/apache/kafka/pull/10367#issuecomment-861125710


   @kkonstantine , could you check this PR? Or I should find another guy to 
review this PR? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-7360) Code example in "Accessing Processor Context" misses a closing parenthesis

2021-06-14 Thread Vijay (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17362470#comment-17362470
 ] 

Vijay edited comment on KAFKA-7360 at 6/15/21, 2:39 AM:


Can this be assigned to me ? I'd like to work on this.


was (Author: vijaykriishna):
Can you please assign it to me ?

> Code example in "Accessing Processor Context" misses a closing parenthesis
> --
>
> Key: KAFKA-7360
> URL: https://issues.apache.org/jira/browse/KAFKA-7360
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 2.0.0
>Reporter: Sven Erik Knop
>Priority: Minor
>
> https://kafka.apache.org/20/documentation/streams/developer-guide/processor-api.html#accessing-processor-context
> Code example has some issues:
> public void process(String key, String value) {
>  
>  // add a header to the elements
>  context().headers().add.("key", "key"
>  }
> Should be
> public void process(String key, String value) {
>  
>  // add a header to the elements
>  context().headers().add("key", "value")
> }



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-7302) Remove Java7 examples from Streams Docs

2021-06-14 Thread Vijay (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17362471#comment-17362471
 ] 

Vijay edited comment on KAFKA-7302 at 6/15/21, 2:38 AM:


Can this be assigned to me ? I'd like to work on this.


was (Author: vijaykriishna):
Can you please assign it to me ?

> Remove Java7 examples from Streams Docs
> ---
>
> Key: KAFKA-7302
> URL: https://issues.apache.org/jira/browse/KAFKA-7302
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Priority: Minor
>
> In 2.0 release, Java7 support was dropped. We might consider removing Java7 
> examples from the Streams docs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] iamgd67 commented on pull request #10818: KAFKA-12889: log clean relative index range check of group consider empty log segment to avoid too many empty log segment left

2021-06-14 Thread GitBox


iamgd67 commented on pull request #10818:
URL: https://github.com/apache/kafka/pull/10818#issuecomment-861117484


   @guozhangwang  could you please review this, thanks in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-14 Thread GitBox


jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651390233



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2285,7 +2287,11 @@ QuorumState quorum() {
 }
 
 public OptionalLong highWatermark() {
-return quorum.highWatermark().isPresent() ? 
OptionalLong.of(quorum.highWatermark().get().offset) : OptionalLong.empty();
+if (quorum.highWatermark().isPresent()) {
+return OptionalLong.of(quorum.highWatermark().get().offset);
+} else {
+return OptionalLong.empty();
+}

Review comment:
   No functional change here. Just a formatting change. Always found this 
line hard to read and I had to fix it :smile: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-14 Thread GitBox


jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651387879



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##
@@ -230,12 +230,18 @@ default long truncateToEndOffset(OffsetAndEpoch 
endOffset) {
  * Create a writable snapshot for the given snapshot id.
  *
  * See {@link RawSnapshotWriter} for details on how to use this object. 
The caller of
- * this method is responsible for invoking {@link 
RawSnapshotWriter#close()}.
+ * this method is responsible for invoking {@link 
RawSnapshotWriter#close()}. If a
+ * snapshot already exists then return an {@link Optional#empty()}.
  *
  * @param snapshotId the end offset and epoch that identifies the snapshot
- * @return a writable snapshot
+ * @param validate validate the snapshot id against the log
+ * @return a writable snapshot if it doesn't already exists
+ * @throws IllegalArgumentException if validate is true and end offset is 
greater than the
+ * high-watermark
+ * @throws IllegalArgumentException if validate is true and end offset is 
less than the log
+ * start offset
  */
-RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId);
+Optional createSnapshot(OffsetAndEpoch snapshotId, 
boolean validate);

Review comment:
   Done. Used these suggestions. I couldn't think of better names.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-14 Thread GitBox


jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651387678



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##
@@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() {
 return snapshot.snapshotId();
 }
 
+/**
+ * Returns the last log offset which is represented in the snapshot.
+ */
+public long lastOffsetFromLog() {

Review comment:
   I am up for that. Do you mind if I file a jira for this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-14 Thread GitBox


jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651387182



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##
@@ -437,14 +437,89 @@ public void testCreateSnapshot() throws IOException {
 appendBatch(numberOfRecords, epoch);
 log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
 
-try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId)) {
+try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId, 
true).get()) {
 snapshot.freeze();
 }
 
 RawSnapshotReader snapshot = log.readSnapshot(snapshotId).get();
 assertEquals(0, snapshot.sizeInBytes());
 }
 
+@Test
+public void testCreateSnapshotValidation() {
+int numberOfRecords = 10;
+int firstEpoch = 1;
+int secondEpoch = 3;
+
+appendBatch(numberOfRecords, firstEpoch);
+appendBatch(numberOfRecords, secondEpoch);
+log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords));
+
+// Test snapshot id for the first epoch
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(numberOfRecords, firstEpoch), true).get()) { }
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(numberOfRecords - 1, firstEpoch), true).get()) { }
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(1, firstEpoch), true).get()) { }
+
+// Test snapshot id for the second epoch
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(2 * numberOfRecords, secondEpoch), true).get()) { }
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(2 * numberOfRecords - 1, secondEpoch), true).get()) { }
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(numberOfRecords + 1, secondEpoch), true).get()) { }
+}
+
+@Test
+public void testCreateSnapshotLaterThanHighWatermark() {
+int numberOfRecords = 10;
+int epoch = 1;
+
+appendBatch(numberOfRecords, epoch);
+log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
+
+assertThrows(
+IllegalArgumentException.class,
+() -> log.createSnapshot(new OffsetAndEpoch(numberOfRecords + 1, 
epoch), true)
+);
+}
+
+@Test
+public void testCreateSnapshotBeforeLogStartOffset() {

Review comment:
   Added few more tests one for much larger epoch, one for much smaller 
epoch and one for missing epoch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on pull request #10883: KAFKA-12949: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh

2021-06-14 Thread GitBox


IgnacioAcunaF commented on pull request #10883:
URL: https://github.com/apache/kafka/pull/10883#issuecomment-861104058


   PING @ijuma (as I saw you on a PR related to test-kraft-server-start.sh 
[KAFKA-12672])


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-14 Thread GitBox


hachikuji commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651378054



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##
@@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() {
 return snapshot.snapshotId();
 }
 
+/**
+ * Returns the last log offset which is represented in the snapshot.
+ */
+public long lastOffsetFromLog() {

Review comment:
   Yeah, makes sense. I was sort of considering if it would be useful to 
have a `SnapshotId` object. Currently we use `OffsetAndEpoch` in other cases, 
but maybe a separate object would let us have better names. It would also let 
us define inclusive and exclusive methods.
   





-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-14 Thread GitBox


jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651374491



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##
@@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() {
 return snapshot.snapshotId();
 }
 
+/**
+ * Returns the last log offset which is represented in the snapshot.
+ */
+public long lastOffsetFromLog() {

Review comment:
   One of the users of this API had some confusion with this offset and the 
offsets return in the `Batch` by the `SnapshotReader` iterator. I wanted to 
make it clear that this offset and epoch refers to the offset and epoch found 
in the `ReplicatedLog` or `handleCommit`.
   
   While the offsets reported by the `Batch` for the `SnapshotReader` iterator 
are unrelated to the log's offsets.
   
   How about `lastContainedLogOffset` and `lastContainedLogEpoch`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on pull request #10883: KAFKA-12949: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh

2021-06-14 Thread GitBox


IgnacioAcunaF commented on pull request #10883:
URL: https://github.com/apache/kafka/pull/10883#issuecomment-861084792


   With no change:
   
![image](https://user-images.githubusercontent.com/31544929/121976023-f49deb80-cd50-11eb-8fa7-94b7c4923751.png)
   
   With the change:
   
![image](https://user-images.githubusercontent.com/31544929/121975932-c5877a00-cd50-11eb-8a4a-bf930a1982dc.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

2021-06-14 Thread GitBox


mjsax commented on pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#issuecomment-861081018


   Replied to comments and rebased to resolve merge conflicts. \cc 
@guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

2021-06-14 Thread GitBox


mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r651363243



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -82,20 +87,23 @@
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
 private WindowStore otherWindowStore;
-private StreamsMetricsImpl metrics;
 private Sensor droppedRecordsSensor;
 private Optional, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
-@SuppressWarnings("unchecked")
 @Override
 public void init(final ProcessorContext context) {
 super.init(context);
-metrics = (StreamsMetricsImpl) context.metrics();
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 otherWindowStore = context.getStateStore(otherWindowName);
 
-if 
(StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), 
ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
-outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+if (enableSpuriousResultFix

Review comment:
   I don't think that checking the condition twice is a real issue? Also, 
it seem to be better to do the check, because otherwise (if we might have a bug 
and incorrectly get a `null` store back) we might mask the bug?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

2021-06-14 Thread GitBox


mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r651362625



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -76,18 +76,37 @@
 
 private final long graceMs;
 
+protected final boolean enableSpuriousResultFix;
+
+protected JoinWindows(final JoinWindows joinWindows) {

Review comment:
   Not really possible. Multiple issues:
- we still need a protected constructor in `JoinWindows`
- we need to make `graceMs` protected
- blocker: we need to make the _public_ members `beforeMs` and `afterMs` 
non-final (and we cannot do this...)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF opened a new pull request #10883: KAFKA-12949: Add catch case to avoid scala.MatchError null

2021-06-14 Thread GitBox


IgnacioAcunaF opened a new pull request #10883:
URL: https://github.com/apache/kafka/pull/10883


   Encounter the following exception when trying to run the TestRaftServer:
   
   `bin/test-kraft-server-start.sh --config config/kraft.properties`
   ```
   [2021-06-14 17:15:43,232] ERROR [raft-workload-generator]: Error due to 
(kafka.tools.TestRaftServer$RaftWorkloadGenerator)
scala.MatchError: null
at 
kafka.tools.TestRaftServer$RaftWorkloadGenerator.doWork(TestRaftServer.scala:220)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-06-14 17:15:43,253] INFO [raft-workload-generator]: Stopped 
(kafka.tools.TestRaftServer$RaftWorkloadGenerator)
   ```
   
   Caused because of a not contemplated *null* on eventQueue.poll:
   
   ```
   eventQueue.poll(eventTimeoutMs, TimeUnit.MILLISECONDS) match {
   case HandleClaim(epoch) =>
 claimedEpoch = Some(epoch)
 throttler.reset()
 pendingAppends.clear()
 recordCount.set(0)
   
   case HandleResign =>
 claimedEpoch = None
 pendingAppends.clear()
   
   case HandleCommit(reader) =>
 try {
   while (reader.hasNext) {
 val batch = reader.next()
 claimedEpoch.foreach { leaderEpoch =>
   handleLeaderCommit(leaderEpoch, batch)
 }
   }
 } finally {
   reader.close()
 }
   
   case HandleSnapshot(reader) =>
 // Ignore snapshots; only interested in records appended by this leader
 reader.close()
   
   case Shutdown => // Ignore shutdown command
 }
   ```
   This makes raft-workload-generator's thread to stop.
   
   Proposal: 
   Add a catch case on the match statement.

   `case _ => // Ignore other events (such as null)`
   
   ```
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] YiDing-Duke commented on pull request #10843: MINOR: Log formatting for exceptions during configuration related operations

2021-06-14 Thread GitBox


YiDing-Duke commented on pull request #10843:
URL: https://github.com/apache/kafka/pull/10843#issuecomment-861078081


   @dajac we are good to go.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] YiDing-Duke commented on a change in pull request #10843: MINOR: Log formatting for exceptions during configuration related operations

2021-06-14 Thread GitBox


YiDing-Duke commented on a change in pull request #10843:
URL: https://github.com/apache/kafka/pull/10843#discussion_r651361653



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
##
@@ -174,11 +174,15 @@ public int hashCode() {
 return result;
 }
 
+/**
+ * Override toString to redact sensitive value.
+ * WARNING, user should be responsible to set the correct "IsSensitive" 
field for each config entry.

Review comment:
   Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12949) TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh

2021-06-14 Thread Ignacio Acuna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ignacio Acuna updated KAFKA-12949:
--
Attachment: TestRaftServer.log

> TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh
> -
>
> Key: KAFKA-12949
> URL: https://issues.apache.org/jira/browse/KAFKA-12949
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Ignacio Acuna
>Assignee: Ignacio Acuna
>Priority: Major
> Attachments: TestRaftServer.log
>
>
> Encounter the following exception when trying to run the TestRaftServer:
> {code:java}
> bin/test-kraft-server-start.sh --config config/kraft.properties{code}
> {code:java}
> [2021-06-14 17:15:43,232] ERROR [raft-workload-generator]: Error due to 
> (kafka.tools.TestRaftServer$RaftWorkloadGenerator)
>  scala.MatchError: null
>  at 
> kafka.tools.TestRaftServer$RaftWorkloadGenerator.doWork(TestRaftServer.scala:220)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
>  [2021-06-14 17:15:43,253] INFO [raft-workload-generator]: Stopped 
> (kafka.tools.TestRaftServer$RaftWorkloadGenerator){code}
> That happens on the followin match:
> {code:java}
> eventQueue.poll(eventTimeoutMs, TimeUnit.MILLISECONDS) match {
>   case HandleClaim(epoch) =>
>   claimedEpoch = Some(epoch)
>   throttler.reset()
>   pendingAppends.clear()
>   recordCount.set(0)
>   case HandleResign =>
>   claimedEpoch = None
>   pendingAppends.clear()case HandleCommit(reader) =>
>   try {
> while (reader.hasNext) {
>   val batch = reader.next()
>   claimedEpoch.foreach { leaderEpoch =>
> handleLeaderCommit(leaderEpoch, batch)
>   }
> }
>   } finally {
> reader.close()
>   }
>   case HandleSnapshot(reader) =>
>   // Ignore snapshots; only interested in records appended by this leader
>   reader.close()
>   case Shutdown => // Ignore shutdown command
> }
> {code}
> Full log attached. When the eventQueue.poll returns null (if deque is empty), 
> there isn't a case to match so the thread gets stuck and stops processing 
> events (raft-workload-generator).
> Proposal:
>  Add a case null to the match so the raft-workload-generator can continue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12949) TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh

2021-06-14 Thread Ignacio Acuna (Jira)
Ignacio Acuna created KAFKA-12949:
-

 Summary: TestRaftServer's scala.MatchError: null on 
test-kraft-server-start.sh
 Key: KAFKA-12949
 URL: https://issues.apache.org/jira/browse/KAFKA-12949
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Reporter: Ignacio Acuna
Assignee: Ignacio Acuna


Encounter the following exception when trying to run the TestRaftServer:
{code:java}
bin/test-kraft-server-start.sh --config config/kraft.properties{code}
{code:java}
[2021-06-14 17:15:43,232] ERROR [raft-workload-generator]: Error due to 
(kafka.tools.TestRaftServer$RaftWorkloadGenerator)
 scala.MatchError: null
 at 
kafka.tools.TestRaftServer$RaftWorkloadGenerator.doWork(TestRaftServer.scala:220)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
 [2021-06-14 17:15:43,253] INFO [raft-workload-generator]: Stopped 
(kafka.tools.TestRaftServer$RaftWorkloadGenerator){code}
That happens on the followin match:
{code:java}
eventQueue.poll(eventTimeoutMs, TimeUnit.MILLISECONDS) match {
  case HandleClaim(epoch) =>
  claimedEpoch = Some(epoch)
  throttler.reset()
  pendingAppends.clear()
  recordCount.set(0)
  case HandleResign =>
  claimedEpoch = None
  pendingAppends.clear()case HandleCommit(reader) =>
  try {
while (reader.hasNext) {
  val batch = reader.next()
  claimedEpoch.foreach { leaderEpoch =>
handleLeaderCommit(leaderEpoch, batch)
  }
}
  } finally {
reader.close()
  }
  case HandleSnapshot(reader) =>
  // Ignore snapshots; only interested in records appended by this leader
  reader.close()
  case Shutdown => // Ignore shutdown command
}
{code}
Full log attached. When the eventQueue.poll returns null (if deque is empty), 
there isn't a case to match so the thread gets stuck and stops processing 
events (raft-workload-generator).

Proposal:
 Add a case null to the match so the raft-workload-generator can continue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #10843: MINOR: Log formatting for exceptions during configuration related operations

2021-06-14 Thread GitBox


showuon commented on a change in pull request #10843:
URL: https://github.com/apache/kafka/pull/10843#discussion_r651354743



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
##
@@ -174,11 +174,15 @@ public int hashCode() {
 return result;
 }
 
+/**
+ * Override toString to redact sensitive value.
+ * WARNING, user should be responsible to set the correct "IsSensitive" 
field for each config entry.

Review comment:
   Nit: `isSensitive`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10843: MINOR: Log formatting for exceptions during configuration related operations

2021-06-14 Thread GitBox


showuon commented on a change in pull request #10843:
URL: https://github.com/apache/kafka/pull/10843#discussion_r651354059



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -469,7 +469,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 }
 invalidProps.keys.foreach(props.remove)
 val configSource = if (perBrokerConfig) "broker" else "default cluster"
-error(s"Dynamic $configSource config contains invalid values: 
$invalidProps, these configs will be ignored", e)
+error(s"Dynamic $configSource config contains invalid values in: 
${invalidProps.keys}, these configs will be ignored", e)

Review comment:
   SGTM! Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-14 Thread GitBox


ijuma commented on pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#issuecomment-861064040


   Yeah, that unintentional while using the GitHub editor. Will add it back.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-14 Thread GitBox


jolshan commented on pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#issuecomment-861062736


   Ah I see I didn't need that extra declaration but you also removed the 
comment. Not a huge deal though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-14 Thread GitBox


jolshan commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r651337637



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
##
@@ -117,13 +128,21 @@ public MetadataRequestData data() {
 public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
 Errors error = Errors.forException(e);
 MetadataResponseData responseData = new MetadataResponseData();
-if (topics() != null) {
-for (String topic : topics())
+if (data.topics() != null) {
+for (MetadataRequestTopic topic : data.topics()) {
+String topicName;
+// If null, set to the empty string, since the response does 
not allow null.
+if (topic.name() == null)
+topicName = "";
+else
+topicName = topic.name();
 responseData.topics().add(new 
MetadataResponseData.MetadataResponseTopic()
-.setName(topic)
-.setErrorCode(error.code())
-.setIsInternal(false)
-.setPartitions(Collections.emptyList()));
+.setName(topicName)
+.setTopicId(topic.topicId())
+.setErrorCode(error.code())
+.setIsInternal(false)
+.setPartitions(Collections.emptyList()));

Review comment:
   No. My IDE did that. I will revert that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-14 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651336157



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -119,18 +120,50 @@ public static FetchResponse parse(ByteBuffer buffer, 
short version) {
 return new FetchResponse(new FetchResponseData(new 
ByteBufferAccessor(buffer), version));
 }
 
+private LinkedHashMap 
toResponseDataMap(Map topicIdToNameMap, short version) {

Review comment:
   I think this inconsistency existed before I touched the code. 😅




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-14 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651335666



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -71,6 +76,12 @@ public FetchResponseData data() {
 return data;
 }
 
+/**
+ * From version 3 or later, the authorized and existing entries in 
`FetchRequest.fetchData` should be in the same order in `responseData`.
+ * Version 13 introduces topic IDs which mean there may be unresolved 
partitions. If there is any unknown topic ID in the request, the
+ * response will contain a top-level UNKNOWN_TOPIC_ID error and 
UNKNOWN_TOPIC_ID errors on all the partitions.
+ * We may also return UNKNOWN_TOPIC_ID for a given partition when that 
partition in the session has a topic ID inconsistent with the broker.

Review comment:
   I think I just have the wrong things here completely. There should be 
INCONSISTENT_TOPIC_ID here as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-14 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651335255



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -283,14 +290,18 @@ public void onSuccess(ClientResponse resp) {
 fetchTarget.id());
 return;
 }
-if (!handler.handleResponse(response)) {
+if (!handler.handleResponse(response, maxVersion)) 
{
+if (response.error() == 
Errors.FETCH_SESSION_TOPIC_ID_ERROR || response.error() == 
Errors.UNKNOWN_TOPIC_ID) {
+metadata.requestUpdate();

Review comment:
   This closes the session in handler.handlerResponse.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-14 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651335121



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -283,14 +290,18 @@ public void onSuccess(ClientResponse resp) {
 fetchTarget.id());
 return;
 }
-if (!handler.handleResponse(response)) {
+if (!handler.handleResponse(response, maxVersion)) 
{

Review comment:
   I see what you mean. It is a little tricky to get the version from the 
FetchResponse itself. Would `resp.requestHeader().apiVersion()` work?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-14 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651334106



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -73,6 +77,22 @@ public FetchSessionHandler(LogContext logContext, int node) {
 private LinkedHashMap sessionPartitions =
 new LinkedHashMap<>(0);
 
+/**
+ * All of the topic ids mapped to topic names for topics which exist in 
the fetch request session.
+ */
+private Map sessionTopicIds = new HashMap<>(0);
+
+/**
+ * All of the topic names mapped to topic ids for topics which exist in 
the fetch request session.
+ */
+private Map sessionTopicNames = new HashMap<>(0);
+
+public Map sessionTopicNames() {
+return sessionTopicNames;
+}
+
+private boolean canUseTopicIds = false;

Review comment:
   I think we already do something like this on the broker. We only get to 
the point of having a session if the broker had an ID for all the topics in the 
request. I don't think we can calculate on a request basis since we may respond 
with topics that did not have IDs associated. I may be misunderstanding what 
you are saying, but I'm very wary of trying to switch between versions 12 and 
13 in the same session.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-14 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651333015



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -213,9 +335,22 @@ public FetchRequestData build() {
 }
 sessionPartitions = next;
 next = null;
+canUseTopicIds = 
topicIds.keySet().containsAll(sessionPartitions.keySet().stream().map(
+tp -> tp.topic()).collect(Collectors.toSet()));
+// Only add topic IDs to the session if we are using topic IDs.
+if (canUseTopicIds) {

Review comment:
   That makes sense to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-14 Thread GitBox


hachikuji commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651330764



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##
@@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() {
 return snapshot.snapshotId();
 }
 
+/**
+ * Returns the last log offset which is represented in the snapshot.
+ */
+public long lastOffsetFromLog() {

Review comment:
   Maybe something like `lastIncludedOffset` or `lastContainedOffset`?

##
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##
@@ -437,14 +437,89 @@ public void testCreateSnapshot() throws IOException {
 appendBatch(numberOfRecords, epoch);
 log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
 
-try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId)) {
+try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId, 
true).get()) {
 snapshot.freeze();
 }
 
 RawSnapshotReader snapshot = log.readSnapshot(snapshotId).get();
 assertEquals(0, snapshot.sizeInBytes());
 }
 
+@Test
+public void testCreateSnapshotValidation() {
+int numberOfRecords = 10;
+int firstEpoch = 1;
+int secondEpoch = 3;
+
+appendBatch(numberOfRecords, firstEpoch);
+appendBatch(numberOfRecords, secondEpoch);
+log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords));
+
+// Test snapshot id for the first epoch
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(numberOfRecords, firstEpoch), true).get()) { }
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(numberOfRecords - 1, firstEpoch), true).get()) { }
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(1, firstEpoch), true).get()) { }
+
+// Test snapshot id for the second epoch
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(2 * numberOfRecords, secondEpoch), true).get()) { }
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(2 * numberOfRecords - 1, secondEpoch), true).get()) { }
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(numberOfRecords + 1, secondEpoch), true).get()) { }
+}
+
+@Test
+public void testCreateSnapshotLaterThanHighWatermark() {
+int numberOfRecords = 10;
+int epoch = 1;
+
+appendBatch(numberOfRecords, epoch);
+log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
+
+assertThrows(
+IllegalArgumentException.class,
+() -> log.createSnapshot(new OffsetAndEpoch(numberOfRecords + 1, 
epoch), true)
+);
+}
+
+@Test
+public void testCreateSnapshotBeforeLogStartOffset() {

Review comment:
   Worth adding any test cases for an invalid epoch?

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -180,14 +181,17 @@ default void beginShutdown() {}
 void resign(int epoch);
 
 /**
- * Create a writable snapshot file for a given offset and epoch.
+ * Create a writable snapshot file for a commmitted offset.

Review comment:
   nit: one extra 'm'

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1101,7 +1101,7 @@ private boolean handleFetchResponse(
 partitionResponse.snapshotId().epoch()
 );
 
-
state.setFetchingSnapshot(Optional.of(log.createSnapshot(snapshotId)));
+state.setFetchingSnapshot(log.createSnapshot(snapshotId, 
false));

Review comment:
   Might be worth a brief comment that the snapshot is expected to be well 
ahead of the current log, so we have to skip validation.

##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##
@@ -230,12 +230,18 @@ default long truncateToEndOffset(OffsetAndEpoch 
endOffset) {
  * Create a writable snapshot for the given snapshot id.
  *
  * See {@link RawSnapshotWriter} for details on how to use this object. 
The caller of
- * this method is responsible for invoking {@link 
RawSnapshotWriter#close()}.
+ * this method is responsible for invoking {@link 
RawSnapshotWriter#close()}. If a
+ * snapshot already exists then return an {@link Optional#empty()}.
  *
  * @param snapshotId the end offset and epoch that identifies the snapshot
- * @return a writable snapshot
+ * @param validate validate the snapshot id against the log
+ * @return a writable snapshot if it doesn't already exists
+ * @throws IllegalArgumentException if validate is true and end offset is 
greater than the
+ * high-watermark
+ * @throws IllegalArgumentException if validate is true and end offset is 

[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-14 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651332908



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -186,23 +268,63 @@ public String toString() {
  * incremental fetch requests (see below).
  */
 private LinkedHashMap next;
+private Map topicIds;
 private final boolean copySessionPartitions;
+private boolean missingTopicIds;
 
 Builder() {
 this.next = new LinkedHashMap<>();
+this.topicIds = new HashMap<>();
 this.copySessionPartitions = true;
 }
 
 Builder(int initialSize, boolean copySessionPartitions) {
 this.next = new LinkedHashMap<>(initialSize);
+this.topicIds = new HashMap<>(initialSize);
 this.copySessionPartitions = copySessionPartitions;
 }
 
 /**
  * Mark that we want data from this partition in the upcoming fetch.
  */
-public void add(TopicPartition topicPartition, PartitionData data) {
+public void add(TopicPartition topicPartition, Uuid topicId, 
PartitionData data) {
 next.put(topicPartition, data);
+// topicIds do not change between adding partitions and building, 
so we can use putIfAbsent
+if (!topicId.equals(Uuid.ZERO_UUID)) {
+topicIds.putIfAbsent(topicPartition.topic(), topicId);

Review comment:
   Ah I see what you are saying here. I think this will still close the 
session when we send the request. The other option is to set a boolean similar 
to `missingTopicId` (maybe just change to `inconsistentTopicId` that signals to 
close the session earlier (upon build)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-14 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651332118



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -186,23 +268,63 @@ public String toString() {
  * incremental fetch requests (see below).
  */
 private LinkedHashMap next;
+private Map topicIds;
 private final boolean copySessionPartitions;
+private boolean missingTopicIds;
 
 Builder() {
 this.next = new LinkedHashMap<>();
+this.topicIds = new HashMap<>();
 this.copySessionPartitions = true;
 }
 
 Builder(int initialSize, boolean copySessionPartitions) {
 this.next = new LinkedHashMap<>(initialSize);
+this.topicIds = new HashMap<>(initialSize);
 this.copySessionPartitions = copySessionPartitions;
 }
 
 /**
  * Mark that we want data from this partition in the upcoming fetch.
  */
-public void add(TopicPartition topicPartition, PartitionData data) {
+public void add(TopicPartition topicPartition, Uuid topicId, 
PartitionData data) {
 next.put(topicPartition, data);
+// topicIds do not change between adding partitions and building, 
so we can use putIfAbsent
+if (!topicId.equals(Uuid.ZERO_UUID)) {
+topicIds.putIfAbsent(topicPartition.topic(), topicId);

Review comment:
   If we try to put in a new topic ID, the session should be closed. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-14 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651331636



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -412,6 +412,12 @@ abstract class AbstractFetcherThread(name: String,
"expected to persist.")
   partitionsWithError += topicPartition
 
+case Errors.INCONSISTENT_TOPIC_ID =>

Review comment:
   This is no longer a partition level error. We can only get it as a top 
level error. If it is a top level error, I believe we return an empty map and 
do not go down this code path. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-14 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651331247



##
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##
@@ -216,12 +217,21 @@ class ReplicaFetcherThread(name: String,
 try {
   val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
   val fetchResponse = 
clientResponse.responseBody.asInstanceOf[FetchResponse]
-  if (!fetchSessionHandler.handleResponse(fetchResponse)) {
-Map.empty
+  if (!fetchSessionHandler.handleResponse(fetchResponse, 
clientResponse.requestHeader().apiVersion())) {
+if (fetchResponse.error() == Errors.UNKNOWN_TOPIC_ID)
+  throw new UnknownTopicIdException("There was a topic ID in the 
request that was unknown to the server.")
+else if (fetchResponse.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR)
+  throw new FetchSessionTopicIdException("There was a topic ID in the 
request that was inconsistent with the session.")
+else
+  Map.empty
   } else {
-fetchResponse.responseData.asScala
+fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, 
clientResponse.requestHeader().apiVersion()).asScala
   }
 } catch {
+  case unknownId: UnknownTopicIdException =>
+throw unknownId
+  case sessionUnknownId: FetchSessionTopicIdException =>
+throw sessionUnknownId

Review comment:
   This happens inside of `fetchSessionHandler.handleResponse`. We set the 
session to close upon the next request. The code path for Fetcher is slightly 
different so it made sense for that code to have it there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-14 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651329577



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -945,10 +945,12 @@ private boolean hasValidClusterId(String 
requestClusterId) {
 return completedFuture(new 
FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
 }
 
-if (!hasValidTopicPartition(request, log.topicPartition())) {
+if (!hasValidTopicPartition(request, log.topicPartition(), 
log.topicId())) {

Review comment:
   Are you referring to creating a new topic ID for the metadata topic? For 
now, we are simply using the sentinel ID. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r651323500



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1063,6 +1076,104 @@ public int generation() {
 return generation;
 }
 
+@Override
+public void restartConnectorAndTasks(
+RestartRequest request,
+Callback callback
+) {
+final String connectorName = request.connectorName();
+addRequest(
+() -> {
+if (checkRebalanceNeeded(callback)) {
+return null;
+}
+if 
(!configState.connectors().contains(request.connectorName())) {
+callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+return null;
+}
+if (isLeader()) {
+// Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+configBackingStore.putRestartRequest(request);
+// Compute and send the response that this was accepted
+Optional maybePlan = 
buildRestartPlanFor(request);
+if (!maybePlan.isPresent()) {
+callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+} else {
+RestartPlan plan = maybePlan.get();
+callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+}
+} else {
+callback.onCompletion(new NotLeaderException("Cannot 
process restart request since it is not assigned to this member", leaderUrl()), 
null);
+}
+
+return null;
+},
+forwardErrorCallback(callback)
+);
+}
+
+/**
+ * Process all pending restart requests. There can be at most one request 
per connector, because of how
+ * {@link RestartRequest#equals(Object)} and {@link 
RestartRequest#hashCode()} are based only on the connector name.
+ *
+ * This method is called from within the {@link #tick()} method. It is 
synchronized so that all pending restart requests
+ * are processed at once before any additional requests are added.
+ */
+private synchronized void processRestartRequests() {

Review comment:
   new implementation with iterator makes this explicit. Could you please 
check to see if it looks good now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r651323170



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -186,6 +192,10 @@
 private short currentProtocolVersion;
 private short backoffRetries;
 
+// visible for testing
+// The pending restart requests for the connectors;
+final NavigableSet pendingRestartRequests = new 
TreeSet<>();

Review comment:
   Fixed, Could you please check to see if it looks good now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10863: KAFKA-12890; Consumer group stuck in `CompletingRebalance`

2021-06-14 Thread GitBox


hachikuji commented on a change in pull request #10863:
URL: https://github.com/apache/kafka/pull/10863#discussion_r651224002



##
File path: core/src/main/scala/kafka/coordinator/group/DelayedSync.scala
##
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import kafka.server.DelayedOperation
+
+/**
+ * Delayed rebalance operation that is added to the purgatory when is 
completing the rebalance.
+ *
+ * Whenever a SyncGroup is receives, checks that we received all the SyncGroup 
request from

Review comment:
   nit: Whenever a SyncGroup is receive**d**?

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1450,12 +1457,89 @@ class GroupCoordinator(val brokerId: Int,
 group.maybeInvokeJoinCallback(member, joinResult)
 completeAndScheduleNextHeartbeatExpiration(group, member)
 member.isNew = false
+
+group.addPendingSyncMember(member.memberId)
   }
+
+  schedulePendingSync(group)
 }
   }
 }
   }
 
+  private def maybeRemovePendingSyncMember(

Review comment:
   nit: `removePendingSyncMember` throws an exception if the member is not 
in the group, so does the "maybe" in the name make sense?

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1450,12 +1457,89 @@ class GroupCoordinator(val brokerId: Int,
 group.maybeInvokeJoinCallback(member, joinResult)
 completeAndScheduleNextHeartbeatExpiration(group, member)
 member.isNew = false
+
+group.addPendingSyncMember(member.memberId)
   }
+
+  schedulePendingSync(group)
 }
   }
 }
   }
 
+  private def maybeRemovePendingSyncMember(
+group: GroupMetadata,
+memberId: String
+  ): Unit = {
+group.removePendingSyncMember(memberId)
+maybeCompleteSyncExpiration(group)
+  }
+
+  private def removeSyncExpiration(
+group: GroupMetadata
+  ): Unit = {
+group.clearPendingSyncMembers()
+maybeCompleteSyncExpiration(group)
+  }
+
+  private def maybeCompleteSyncExpiration(
+group: GroupMetadata
+  ): Unit = {
+val groupKey = GroupKey(group.groupId)
+syncPurgatory.checkAndComplete(groupKey)
+  }
+
+  private def schedulePendingSync(
+group: GroupMetadata
+  ): Unit = {
+val delayedSync = new DelayedSync(this, group, group.rebalanceTimeoutMs)
+val groupKey = GroupKey(group.groupId)
+syncPurgatory.tryCompleteElseWatch(delayedSync, Seq(groupKey))
+  }
+
+  def tryCompletePendingSync(
+group: GroupMetadata,
+forceComplete: () => Boolean
+  ): Boolean = {
+group.inLock {
+  group.currentState match {
+case Dead | Empty | PreparingRebalance =>
+  forceComplete()
+case CompletingRebalance | Stable =>
+  if (group.hasReceivedSyncFromAllMembers())
+forceComplete()
+  else false
+  }
+}
+  }
+
+  def onExpirePendingSync(
+group: GroupMetadata
+  ): Unit = {
+group.inLock {
+  group.currentState match {
+case Dead | Empty | PreparingRebalance =>
+  debug(s"Received unexpected notification of sync expiration after 
group ${group.groupId} " +
+s"already transitioned to the ${group.currentState} state.")
+
+case CompletingRebalance | Stable =>
+  if (!group.hasAllMembersJoined) {
+val pendingSyncMembers = group.allPendingSyncMembers()
+
+info(s"Group ${group.groupId} removed members who haven't " +

Review comment:
   nit: this message might be redundant given the one in `prepareRebalance`.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1450,12 +1457,89 @@ class GroupCoordinator(val brokerId: Int,
 group.maybeInvokeJoinCallback(member, joinResult)
 completeAndScheduleNextHeartbeatExpiration(group, member)
 member.isNew = false
+
+group.addPendingSyncMember(member.memberId)
   }
+
+  schedulePendingSync(group)
 }
   }
 }
   }
 
+  private def maybeRemove

[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-14 Thread GitBox


junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r649585182



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -817,20 +839,26 @@ class KafkaApis(val requestChannel: RequestChannel,
   def createResponse(throttleTimeMs: Int): FetchResponse = {
 // Down-convert messages for each partition if required
 val convertedData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-unconvertedFetchResponse.responseData.forEach { (tp, 
unconvertedPartitionData) =>
-  val error = Errors.forCode(unconvertedPartitionData.errorCode)
-  if (error != Errors.NONE)
-debug(s"Fetch request with correlation id 
${request.header.correlationId} from client $clientId " +
-  s"on partition $tp failed due to ${error.exceptionName}")
-  convertedData.put(tp, maybeConvertFetchedData(tp, 
unconvertedPartitionData))
+unconvertedFetchResponse.data().responses().forEach { topicResponse =>
+  topicResponse.partitions().forEach{ unconvertedPartitionData =>

Review comment:
   Space after forEach.

##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -593,19 +619,22 @@ class FetchSessionCache(private val maxEntries: Int,
 * @param nowThe current time in milliseconds.
 * @param privileged True if the new entry we are trying to create 
is privileged.
 * @param size   The number of cached partitions in the new 
entry we are trying to create.
-* @param createPartitions   A callback function which creates the map of 
cached partitions.
+* @param versionThe version of the request
+* @param createPartitions   A callback function which creates the map of 
cached partitions and the mapping from
+*   topic name for topic ID for the topics.

Review comment:
   from topic name for topic ID => from topic name to topic ID ?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -213,9 +335,22 @@ public FetchRequestData build() {
 }
 sessionPartitions = next;
 next = null;
+canUseTopicIds = 
topicIds.keySet().containsAll(sessionPartitions.keySet().stream().map(
+tp -> tp.topic()).collect(Collectors.toSet()));
+// Only add topic IDs to the session if we are using topic IDs.
+if (canUseTopicIds) {

Review comment:
   Should we set sessionTopicIds and sessionTopicNames to empty map if 
canUseTopicIds is false?

##
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##
@@ -216,12 +217,21 @@ class ReplicaFetcherThread(name: String,
 try {
   val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
   val fetchResponse = 
clientResponse.responseBody.asInstanceOf[FetchResponse]
-  if (!fetchSessionHandler.handleResponse(fetchResponse)) {
-Map.empty
+  if (!fetchSessionHandler.handleResponse(fetchResponse, 
clientResponse.requestHeader().apiVersion())) {
+if (fetchResponse.error() == Errors.UNKNOWN_TOPIC_ID)
+  throw new UnknownTopicIdException("There was a topic ID in the 
request that was unknown to the server.")
+else if (fetchResponse.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR)
+  throw new FetchSessionTopicIdException("There was a topic ID in the 
request that was inconsistent with the session.")
+else
+  Map.empty
   } else {
-fetchResponse.responseData.asScala
+fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, 
clientResponse.requestHeader().apiVersion()).asScala
   }
 } catch {
+  case unknownId: UnknownTopicIdException =>
+throw unknownId
+  case sessionUnknownId: FetchSessionTopicIdException =>
+throw sessionUnknownId

Review comment:
   If we get FetchSessionTopicIdException, the existing session is going to 
be invalid. So, it seems that we should start a new session? The same thing 
seems to apply to UnknownTopicIdException

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -80,19 +91,9 @@ public Errors error() {
 return Errors.forCode(data.errorCode());
 }
 
-public LinkedHashMap 
responseData() {
-if (responseData == null) {
-synchronized (this) {
-if (responseData == null) {
-responseData = new LinkedHashMap<>();
-data.responses().forEach(topicResponse ->
-topicResponse.partitions().forEach(partition ->
-responseData.put(new 
TopicPartition(topicResponse.topic(), partition.partitionIndex()), partition))
-);

[GitHub] [kafka] cmccabe commented on pull request #10804: KAFKA-12877: Make flexibleVersions mandatory

2021-06-14 Thread GitBox


cmccabe commented on pull request #10804:
URL: https://github.com/apache/kafka/pull/10804#issuecomment-861030811


   Seems like there was a Jenkins infra issue on the last test run.
   ```
   [2021-06-14T20:15:25.946Z] Execution failed for task 
':streams:streams-scala:compileScala'.
   
   [2021-06-14T20:15:25.946Z] > Timeout waiting to lock zinc-1.3.5_2.13.6_16 
compiler cache (/home/jenkins/.gradle/caches/7.0.2/zinc-1.3.5_2.13.6_16). It is 
currently in use by another Gradle instance.
   
   [2021-06-14T20:15:25.946Z]   Owner PID: 36970
   
   [2021-06-14T20:15:25.946Z]   Our PID: 37256
   
   [2021-06-14T20:15:25.946Z]   Owner Operation: 
   
   [2021-06-14T20:15:25.946Z]   Our operation: 
   
   [2021-06-14T20:15:25.946Z]   Lock file: 
/home/jenkins/.gradle/caches/7.0.2/zinc-1.3.5_2.13.6_16/zinc-1.3.5_2.13.6_16.lock
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10867: KAFKA-12931: KIP-746: Revise KRaft Metadata Records

2021-06-14 Thread GitBox


cmccabe commented on pull request #10867:
URL: https://github.com/apache/kafka/pull/10867#issuecomment-861030196


   Test failure is 
`org.apache.kafka.streams.integration.TaskMetadataIntegrationTest`, which is 
not related


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r65129



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1063,6 +1076,104 @@ public int generation() {
 return generation;
 }
 
+@Override
+public void restartConnectorAndTasks(
+RestartRequest request,
+Callback callback
+) {
+final String connectorName = request.connectorName();
+addRequest(
+() -> {
+if (checkRebalanceNeeded(callback)) {
+return null;
+}
+if 
(!configState.connectors().contains(request.connectorName())) {
+callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+return null;
+}
+if (isLeader()) {
+// Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+configBackingStore.putRestartRequest(request);
+// Compute and send the response that this was accepted
+Optional maybePlan = 
buildRestartPlanFor(request);
+if (!maybePlan.isPresent()) {
+callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+} else {
+RestartPlan plan = maybePlan.get();
+callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+}
+} else {
+callback.onCompletion(new NotLeaderException("Cannot 
process restart request since it is not assigned to this member", leaderUrl()), 
null);
+}
+
+return null;
+},
+forwardErrorCallback(callback)
+);
+}
+
+/**
+ * Process all pending restart requests. There can be at most one request 
per connector, because of how
+ * {@link RestartRequest#equals(Object)} and {@link 
RestartRequest#hashCode()} are based only on the connector name.
+ *
+ * This method is called from within the {@link #tick()} method. It is 
synchronized so that all pending restart requests
+ * are processed at once before any additional requests are added.
+ */
+private synchronized void processRestartRequests() {

Review comment:
   We can't make a copy because we are doing pollFirst and its removing it 
out of set
   `while ((request = pendingRestartRequests.pollFirst()) != null) {
   `
   
   The whole method synchronization was done deliberately to keep the code 
simple. One more point is that this just triggers the start of connector/tasks 
and real start happens in another thread so this should be pretty fast.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-14 Thread GitBox


ijuma commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r651308552



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
##
@@ -117,13 +128,21 @@ public MetadataRequestData data() {
 public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
 Errors error = Errors.forException(e);
 MetadataResponseData responseData = new MetadataResponseData();
-if (topics() != null) {
-for (String topic : topics())
+if (data.topics() != null) {
+for (MetadataRequestTopic topic : data.topics()) {
+String topicName;
+// If null, set to the empty string, since the response does 
not allow null.
+if (topic.name() == null)
+topicName = "";
+else
+topicName = topic.name();

Review comment:
   Can we use the ternary operator here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-14 Thread GitBox


ijuma commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r651308210



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
##
@@ -117,13 +128,21 @@ public MetadataRequestData data() {
 public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
 Errors error = Errors.forException(e);
 MetadataResponseData responseData = new MetadataResponseData();
-if (topics() != null) {
-for (String topic : topics())
+if (data.topics() != null) {
+for (MetadataRequestTopic topic : data.topics()) {
+String topicName;
+// If null, set to the empty string, since the response does 
not allow null.
+if (topic.name() == null)
+topicName = "";
+else
+topicName = topic.name();
 responseData.topics().add(new 
MetadataResponseData.MetadataResponseTopic()
-.setName(topic)
-.setErrorCode(error.code())
-.setIsInternal(false)
-.setPartitions(Collections.emptyList()));
+.setName(topicName)
+.setTopicId(topic.topicId())
+.setErrorCode(error.code())
+.setIsInternal(false)
+.setPartitions(Collections.emptyList()));

Review comment:
   Is there a reason to change the indent?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12379) KIP-716: Allow configuring the location of the offsetsync topic with MirrorMaker2

2021-06-14 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-12379:
---
Fix Version/s: 3.0.0

> KIP-716: Allow configuring the location of the offsetsync topic with 
> MirrorMaker2
> -
>
> Key: KAFKA-12379
> URL: https://issues.apache.org/jira/browse/KAFKA-12379
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.0.0
>
>
> Ticket for KIP-716
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-716%3A+Allow+configuring+the+location+of+the+offset-syncs+topic+with+MirrorMaker2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12379) KIP-716: Allow configuring the location of the offsetsync topic with MirrorMaker2

2021-06-14 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-12379:
---
Labels: needs-kip  (was: )

> KIP-716: Allow configuring the location of the offsetsync topic with 
> MirrorMaker2
> -
>
> Key: KAFKA-12379
> URL: https://issues.apache.org/jira/browse/KAFKA-12379
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> Ticket for KIP-716
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-716%3A+Allow+configuring+the+location+of+the+offset-syncs+topic+with+MirrorMaker2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r651300080



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
##
@@ -255,12 +257,29 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
 
 @POST
 @Path("/{connector}/restart")
-public void restartConnector(final @PathParam("connector") String 
connector,
+public Response restartConnector(final @PathParam("connector") String 
connector,
  final @Context HttpHeaders headers,
+ final @DefaultValue ("false") 
@QueryParam("includeTasks") Boolean includeTasks,
+ final @DefaultValue ("false") 
@QueryParam("onlyFailed") Boolean onlyFailed,
  final @QueryParam("forward") Boolean forward) 
throws Throwable {
-FutureCallback cb = new FutureCallback<>();
-herder.restartConnector(connector, cb);
-completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", 
"POST", headers, null, forward);
+RestartRequest restartRequest = new RestartRequest(connector, 
onlyFailed, includeTasks);
+if (restartRequest.forciblyRestartConnectorOnly()) {
+// For backward compatibility, just restart the connector instance 
and return OK with no body
+FutureCallback cb = new FutureCallback<>();
+herder.restartConnector(connector, cb);
+completeOrForwardRequest(cb, "/connectors/" + connector + 
"/restart", "POST", headers, null, forward);
+return Response.ok().build();
+}
+
+FutureCallback cb = new FutureCallback<>();
+herder.restartConnectorAndTasks(restartRequest, cb);
+Map queryParameters = new HashMap<>();
+queryParameters.put("includeTasks", String.valueOf(includeTasks));
+queryParameters.put("onlyFailed", String.valueOf(onlyFailed));
+String forwardingPath = "/connectors/" + connector + "/restart";

Review comment:
   Good idea, Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r651293447



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1063,6 +1076,104 @@ public int generation() {
 return generation;
 }
 
+@Override
+public void restartConnectorAndTasks(
+RestartRequest request,
+Callback callback
+) {
+final String connectorName = request.connectorName();
+addRequest(
+() -> {
+if (checkRebalanceNeeded(callback)) {
+return null;
+}
+if 
(!configState.connectors().contains(request.connectorName())) {
+callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+return null;
+}
+if (isLeader()) {
+// Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+configBackingStore.putRestartRequest(request);
+// Compute and send the response that this was accepted
+Optional maybePlan = 
buildRestartPlanFor(request);
+if (!maybePlan.isPresent()) {
+callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+} else {
+RestartPlan plan = maybePlan.get();
+callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+}
+} else {
+callback.onCompletion(new NotLeaderException("Cannot 
process restart request since it is not assigned to this member", leaderUrl()), 
null);
+}
+
+return null;
+},
+forwardErrorCallback(callback)
+);
+}
+
+/**
+ * Process all pending restart requests. There can be at most one request 
per connector, because of how
+ * {@link RestartRequest#equals(Object)} and {@link 
RestartRequest#hashCode()} are based only on the connector name.
+ *
+ * This method is called from within the {@link #tick()} method. It is 
synchronized so that all pending restart requests
+ * are processed at once before any additional requests are added.
+ */
+private synchronized void processRestartRequests() {
+RestartRequest request;
+while ((request = pendingRestartRequests.pollFirst()) != null) {
+doRestartConnectorAndTasks(request);
+}
+}
+
+protected synchronized boolean doRestartConnectorAndTasks(RestartRequest 
request) {
+final String connectorName = request.connectorName();
+Optional maybePlan = buildRestartPlanFor(request);
+if (!maybePlan.isPresent()) {
+log.debug("Skipping restart of connector '{}' since no status is 
available: {}", connectorName, request);
+return false;
+}
+RestartPlan plan = maybePlan.get();
+log.info("Executing {}", plan);
+
+
+// If requested, stop the connector and any tasks, marking each as 
restarting
+final ExtendedAssignment currentAssignments = assignment;
+final Collection assignedIdsToRestart = 
plan.taskIdsToRestart()
+ .stream()
+ 
.filter(taskId -> currentAssignments.tasks().contains(taskId))
+ 
.collect(Collectors.toList());
+final boolean restartConnector = plan.restartConnector() && 
currentAssignments.connectors().contains(connectorName);
+final boolean restartTasks = !assignedIdsToRestart.isEmpty();
+if (restartConnector) {
+worker.stopAndAwaitConnector(connectorName);
+recordRestarting(connectorName);
+}
+if (restartTasks) {
+// Stop the tasks and mark as restarting
+worker.stopAndAwaitTasks(assignedIdsToRestart);
+assignedIdsToRestart.forEach(this::recordRestarting);
+}
+
+// Now restart the connector and tasks
+if (restartConnector) {
+startConnector(connectorName, (error, targetState) -> {
+if (error == null) {
+log.info("Connector {} successfully restarted", 
connectorName);
+} else {
+log.error("Failed to restart connector '" + connectorName 
+ "'", error);
+}
+});
+}
+if (restartTasks) {
+log.debug("Restarting {} of {} tasks for {}", 
plan.restartTaskCount

[jira] [Commented] (KAFKA-12948) NetworkClient.close(node) with node in connecting state makes NetworkClient unusable

2021-06-14 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363227#comment-17363227
 ] 

Ismael Juma commented on KAFKA-12948:
-

Good catch. So, this regressed in 2.7.0?

> NetworkClient.close(node) with node in connecting state makes NetworkClient 
> unusable
> 
>
> Key: KAFKA-12948
> URL: https://issues.apache.org/jira/browse/KAFKA-12948
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 2.7.2, 2.8.1
>
>
> `NetworkClient.close(node)` closes the node and removes it from 
> `ClusterConnectionStates.nodeState`, but not from 
> `ClusterConnectionStates.connectingNodes`. Subsequent `NetworkClient.poll()` 
> invocations throw IllegalStateException and this leaves the NetworkClient in 
> an unusable state until the node is removed from connectionNodes or added to 
> nodeState. We don't use `NetworkClient.close(node)` in clients, but we use it 
> in clients started by brokers for replica fetcher and controller. Since 
> brokers use NetworkClientUtils.isReady() before establishing connections and 
> this invokes poll(), the NetworkClient never recovers.
> Exception stack trace:
> {code:java}
> java.lang.IllegalStateException: No entry found for connection 0
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:409)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.isConnectionSetupTimeout(ClusterConnectionStates.java:446)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.lambda$nodesWithConnectionSetupTimeout$0(ClusterConnectionStates.java:458)
> at 
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
> at 
> java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553)
> at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodesWithConnectionSetupTimeout(ClusterConnectionStates.java:459)
> at 
> org.apache.kafka.clients.NetworkClient.handleTimedOutConnections(NetworkClient.java:807)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
> at 
> org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:42)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r651298044



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -186,6 +192,10 @@
 private short currentProtocolVersion;
 private short backoffRetries;
 
+// visible for testing
+// The pending restart requests for the connectors;
+final NavigableSet pendingRestartRequests = new 
TreeSet<>();

Review comment:
   no particular reason, TreeSet was a NavigableSet implementation that 
came to my mind.  But I like your above idea of using a map and simplifying the 
code, Let me work on it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r65129



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1063,6 +1076,104 @@ public int generation() {
 return generation;
 }
 
+@Override
+public void restartConnectorAndTasks(
+RestartRequest request,
+Callback callback
+) {
+final String connectorName = request.connectorName();
+addRequest(
+() -> {
+if (checkRebalanceNeeded(callback)) {
+return null;
+}
+if 
(!configState.connectors().contains(request.connectorName())) {
+callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+return null;
+}
+if (isLeader()) {
+// Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+configBackingStore.putRestartRequest(request);
+// Compute and send the response that this was accepted
+Optional maybePlan = 
buildRestartPlanFor(request);
+if (!maybePlan.isPresent()) {
+callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+} else {
+RestartPlan plan = maybePlan.get();
+callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+}
+} else {
+callback.onCompletion(new NotLeaderException("Cannot 
process restart request since it is not assigned to this member", leaderUrl()), 
null);
+}
+
+return null;
+},
+forwardErrorCallback(callback)
+);
+}
+
+/**
+ * Process all pending restart requests. There can be at most one request 
per connector, because of how
+ * {@link RestartRequest#equals(Object)} and {@link 
RestartRequest#hashCode()} are based only on the connector name.
+ *
+ * This method is called from within the {@link #tick()} method. It is 
synchronized so that all pending restart requests
+ * are processed at once before any additional requests are added.
+ */
+private synchronized void processRestartRequests() {

Review comment:
   We can't make a copy because we are doing pollFirst and its removing it 
out of set
   `while ((request = pendingRestartRequests.pollFirst()) != null) {
   `
   
   The whole method synchronization was done deliberately to keep the code 
simple.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r65129



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1063,6 +1076,104 @@ public int generation() {
 return generation;
 }
 
+@Override
+public void restartConnectorAndTasks(
+RestartRequest request,
+Callback callback
+) {
+final String connectorName = request.connectorName();
+addRequest(
+() -> {
+if (checkRebalanceNeeded(callback)) {
+return null;
+}
+if 
(!configState.connectors().contains(request.connectorName())) {
+callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+return null;
+}
+if (isLeader()) {
+// Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+configBackingStore.putRestartRequest(request);
+// Compute and send the response that this was accepted
+Optional maybePlan = 
buildRestartPlanFor(request);
+if (!maybePlan.isPresent()) {
+callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+} else {
+RestartPlan plan = maybePlan.get();
+callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+}
+} else {
+callback.onCompletion(new NotLeaderException("Cannot 
process restart request since it is not assigned to this member", leaderUrl()), 
null);
+}
+
+return null;
+},
+forwardErrorCallback(callback)
+);
+}
+
+/**
+ * Process all pending restart requests. There can be at most one request 
per connector, because of how
+ * {@link RestartRequest#equals(Object)} and {@link 
RestartRequest#hashCode()} are based only on the connector name.
+ *
+ * This method is called from within the {@link #tick()} method. It is 
synchronized so that all pending restart requests
+ * are processed at once before any additional requests are added.
+ */
+private synchronized void processRestartRequests() {

Review comment:
   We can't make a copy because we are doing pollFirst and its removing it 
out of list
   `while ((request = pendingRestartRequests.pollFirst()) != null) {
   `
   
   The whole method synchronization was done deliberately to keep the code 
simple.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r651293447



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1063,6 +1076,104 @@ public int generation() {
 return generation;
 }
 
+@Override
+public void restartConnectorAndTasks(
+RestartRequest request,
+Callback callback
+) {
+final String connectorName = request.connectorName();
+addRequest(
+() -> {
+if (checkRebalanceNeeded(callback)) {
+return null;
+}
+if 
(!configState.connectors().contains(request.connectorName())) {
+callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+return null;
+}
+if (isLeader()) {
+// Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+configBackingStore.putRestartRequest(request);
+// Compute and send the response that this was accepted
+Optional maybePlan = 
buildRestartPlanFor(request);
+if (!maybePlan.isPresent()) {
+callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+} else {
+RestartPlan plan = maybePlan.get();
+callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+}
+} else {
+callback.onCompletion(new NotLeaderException("Cannot 
process restart request since it is not assigned to this member", leaderUrl()), 
null);
+}
+
+return null;
+},
+forwardErrorCallback(callback)
+);
+}
+
+/**
+ * Process all pending restart requests. There can be at most one request 
per connector, because of how
+ * {@link RestartRequest#equals(Object)} and {@link 
RestartRequest#hashCode()} are based only on the connector name.
+ *
+ * This method is called from within the {@link #tick()} method. It is 
synchronized so that all pending restart requests
+ * are processed at once before any additional requests are added.
+ */
+private synchronized void processRestartRequests() {
+RestartRequest request;
+while ((request = pendingRestartRequests.pollFirst()) != null) {
+doRestartConnectorAndTasks(request);
+}
+}
+
+protected synchronized boolean doRestartConnectorAndTasks(RestartRequest 
request) {
+final String connectorName = request.connectorName();
+Optional maybePlan = buildRestartPlanFor(request);
+if (!maybePlan.isPresent()) {
+log.debug("Skipping restart of connector '{}' since no status is 
available: {}", connectorName, request);
+return false;
+}
+RestartPlan plan = maybePlan.get();
+log.info("Executing {}", plan);
+
+
+// If requested, stop the connector and any tasks, marking each as 
restarting
+final ExtendedAssignment currentAssignments = assignment;
+final Collection assignedIdsToRestart = 
plan.taskIdsToRestart()
+ .stream()
+ 
.filter(taskId -> currentAssignments.tasks().contains(taskId))
+ 
.collect(Collectors.toList());
+final boolean restartConnector = plan.restartConnector() && 
currentAssignments.connectors().contains(connectorName);
+final boolean restartTasks = !assignedIdsToRestart.isEmpty();
+if (restartConnector) {
+worker.stopAndAwaitConnector(connectorName);
+recordRestarting(connectorName);
+}
+if (restartTasks) {
+// Stop the tasks and mark as restarting
+worker.stopAndAwaitTasks(assignedIdsToRestart);
+assignedIdsToRestart.forEach(this::recordRestarting);
+}
+
+// Now restart the connector and tasks
+if (restartConnector) {
+startConnector(connectorName, (error, targetState) -> {
+if (error == null) {
+log.info("Connector {} successfully restarted", 
connectorName);
+} else {
+log.error("Failed to restart connector '" + connectorName 
+ "'", error);
+}
+});
+}
+if (restartTasks) {
+log.debug("Restarting {} of {} tasks for {}", 
plan.restartTaskCount

[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r651292365



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##
@@ -357,6 +367,62 @@ public void validateConnectorConfig(Map 
connectorProps, Callback
 });
 }
 
+/**
+ * Build the {@link RestartPlan} that describes what should and should not 
be restarted given the restart request
+ * and the current status of the connector and task instances.
+ *
+ * @param request the restart request; may not be null
+ * @return the restart plan, or empty this worker has no status for the 
connector named in the request and therefore the

Review comment:
   Fixed

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##
@@ -357,6 +367,62 @@ public void validateConnectorConfig(Map 
connectorProps, Callback
 });
 }
 
+/**
+ * Build the {@link RestartPlan} that describes what should and should not 
be restarted given the restart request
+ * and the current status of the connector and task instances.
+ *
+ * @param request the restart request; may not be null
+ * @return the restart plan, or empty this worker has no status for the 
connector named in the request and therefore the
+ * connector cannot be restarted
+ */
+public Optional buildRestartPlanFor(RestartRequest request) {
+String connectorName = request.connectorName();
+ConnectorStatus connectorStatus = 
statusBackingStore.get(connectorName);
+if (connectorStatus == null) {
+return Optional.empty();
+}
+
+// If requested, mark the connector as restarting
+AbstractStatus.State connectorState;
+if (request.includeConnector(connectorStatus)) {
+connectorState = AbstractStatus.State.RESTARTING;
+} else {
+connectorState = connectorStatus.state();
+}
+ConnectorStateInfo.ConnectorState connectorInfoState = new 
ConnectorStateInfo.ConnectorState(
+connectorState.toString(),
+connectorStatus.workerId(),
+connectorStatus.trace()
+);
+
+// Collect the task IDs to stop and restart (may be none)

Review comment:
   Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r651291878



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1063,6 +1076,104 @@ public int generation() {
 return generation;
 }
 
+@Override
+public void restartConnectorAndTasks(
+RestartRequest request,
+Callback callback
+) {
+final String connectorName = request.connectorName();
+addRequest(
+() -> {
+if (checkRebalanceNeeded(callback)) {
+return null;
+}
+if 
(!configState.connectors().contains(request.connectorName())) {
+callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+return null;
+}
+if (isLeader()) {
+// Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+configBackingStore.putRestartRequest(request);
+// Compute and send the response that this was accepted
+Optional maybePlan = 
buildRestartPlanFor(request);
+if (!maybePlan.isPresent()) {
+callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+} else {
+RestartPlan plan = maybePlan.get();
+callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+}
+} else {
+callback.onCompletion(new NotLeaderException("Cannot 
process restart request since it is not assigned to this member", leaderUrl()), 
null);

Review comment:
   Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r651291754



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1063,6 +1076,104 @@ public int generation() {
 return generation;
 }
 
+@Override
+public void restartConnectorAndTasks(
+RestartRequest request,
+Callback callback
+) {
+final String connectorName = request.connectorName();
+addRequest(
+() -> {
+if (checkRebalanceNeeded(callback)) {
+return null;
+}
+if 
(!configState.connectors().contains(request.connectorName())) {
+callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+return null;
+}
+if (isLeader()) {
+// Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+configBackingStore.putRestartRequest(request);
+// Compute and send the response that this was accepted
+Optional maybePlan = 
buildRestartPlanFor(request);
+if (!maybePlan.isPresent()) {
+callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+} else {
+RestartPlan plan = maybePlan.get();
+callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+}
+} else {
+callback.onCompletion(new NotLeaderException("Cannot 
process restart request since it is not assigned to this member", leaderUrl()), 
null);
+}
+
+return null;
+},
+forwardErrorCallback(callback)
+);
+}
+
+/**
+ * Process all pending restart requests. There can be at most one request 
per connector, because of how
+ * {@link RestartRequest#equals(Object)} and {@link 
RestartRequest#hashCode()} are based only on the connector name.
+ *
+ * This method is called from within the {@link #tick()} method. It is 
synchronized so that all pending restart requests
+ * are processed at once before any additional requests are added.
+ */
+private synchronized void processRestartRequests() {
+RestartRequest request;
+while ((request = pendingRestartRequests.pollFirst()) != null) {
+doRestartConnectorAndTasks(request);
+}
+}
+
+protected synchronized boolean doRestartConnectorAndTasks(RestartRequest 
request) {
+final String connectorName = request.connectorName();
+Optional maybePlan = buildRestartPlanFor(request);
+if (!maybePlan.isPresent()) {
+log.debug("Skipping restart of connector '{}' since no status is 
available: {}", connectorName, request);
+return false;
+}
+RestartPlan plan = maybePlan.get();
+log.info("Executing {}", plan);
+
+
+// If requested, stop the connector and any tasks, marking each as 
restarting
+final ExtendedAssignment currentAssignments = assignment;
+final Collection assignedIdsToRestart = 
plan.taskIdsToRestart()
+ .stream()
+ 
.filter(taskId -> currentAssignments.tasks().contains(taskId))
+ 
.collect(Collectors.toList());
+final boolean restartConnector = plan.restartConnector() && 
currentAssignments.connectors().contains(connectorName);
+final boolean restartTasks = !assignedIdsToRestart.isEmpty();
+if (restartConnector) {
+worker.stopAndAwaitConnector(connectorName);
+recordRestarting(connectorName);
+}
+if (restartTasks) {
+// Stop the tasks and mark as restarting
+worker.stopAndAwaitTasks(assignedIdsToRestart);
+assignedIdsToRestart.forEach(this::recordRestarting);
+}
+
+// Now restart the connector and tasks
+if (restartConnector) {
+startConnector(connectorName, (error, targetState) -> {
+if (error == null) {
+log.info("Connector {} successfully restarted", 
connectorName);

Review comment:
   Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra

[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r651291606



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
##
@@ -58,6 +58,9 @@ public void start(Map props) {
 commonConfigs = props;
 log.info("Started {} connector {}", this.getClass().getSimpleName(), 
connectorName);
 connectorHandle.recordConnectorStart();
+if 
("true".equalsIgnoreCase(props.getOrDefault("connector.start.inject.error", 
"false"))) {

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs

2021-06-14 Thread GitBox


kpatelatwork commented on pull request #10841:
URL: https://github.com/apache/kafka/pull/10841#issuecomment-860983059


   @rhauch I have resolved all the review comments. Could you please check to 
see if it looks good now?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10841:
URL: https://github.com/apache/kafka/pull/10841#discussion_r651267237



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
##
@@ -56,6 +57,39 @@
 "set X-Frame-Options:DENY, add  :no-cache, no-store, 
must-revalidate "
 );
 
+@Test
+public void testListenersConfigAllowedValues() {
+Map props = baseProps();
+
+// no value set for "listeners"
+WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), 
props);
+assertEquals(LISTENERS_DEFAULT, 
config.getList(WorkerConfig.LISTENERS_CONFIG));
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:";);
+config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), 
Arrays.asList("http://a.b:";));
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:, 
https://a.b:7812";);
+config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), 
Arrays.asList("http://a.b:";, "https://a.b:7812";));
+
+new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+}
+
+@Test
+public void testListenersConfigNotAllowedValues() {
+Map props = baseProps();
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "");
+assertThrows(ConfigException.class, () -> new 
WorkerConfig(WorkerConfig.baseConfigDef(), props)).printStackTrace();

Review comment:
   Very good suggestion, I added that case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10841:
URL: https://github.com/apache/kafka/pull/10841#discussion_r651267091



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
##
@@ -56,6 +57,39 @@
 "set X-Frame-Options:DENY, add  :no-cache, no-store, 
must-revalidate "
 );
 
+@Test
+public void testListenersConfigAllowedValues() {
+Map props = baseProps();
+
+// no value set for "listeners"
+WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), 
props);
+assertEquals(LISTENERS_DEFAULT, 
config.getList(WorkerConfig.LISTENERS_CONFIG));
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:";);
+config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), 
Arrays.asList("http://a.b:";));
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:, 
https://a.b:7812";);
+config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), 
Arrays.asList("http://a.b:";, "https://a.b:7812";));
+
+new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+}
+
+@Test
+public void testListenersConfigNotAllowedValues() {
+Map props = baseProps();

Review comment:
   Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10841:
URL: https://github.com/apache/kafka/pull/10841#discussion_r651266585



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
##
@@ -56,6 +57,39 @@
 "set X-Frame-Options:DENY, add  :no-cache, no-store, 
must-revalidate "
 );
 
+@Test
+public void testListenersConfigAllowedValues() {
+Map props = baseProps();
+
+// no value set for "listeners"
+WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), 
props);
+assertEquals(LISTENERS_DEFAULT, 
config.getList(WorkerConfig.LISTENERS_CONFIG));
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:";);
+config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), 
Arrays.asList("http://a.b:";));
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:, 
https://a.b:7812";);
+config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), 
Arrays.asList("http://a.b:";, "https://a.b:7812";));
+
+new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+}
+
+@Test
+public void testListenersConfigNotAllowedValues() {
+Map props = baseProps();
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "");
+assertThrows(ConfigException.class, () -> new 
WorkerConfig(WorkerConfig.baseConfigDef(), props)).printStackTrace();
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:,";);
+assertThrows(ConfigException.class, () -> new 
WorkerConfig(WorkerConfig.baseConfigDef(), props));

Review comment:
   Fixed, I added "listeners" and "admin.listeners" to each message, Could 
you please check to see if it looks good now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10841:
URL: https://github.com/apache/kafka/pull/10841#discussion_r651266416



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
##
@@ -56,6 +57,39 @@
 "set X-Frame-Options:DENY, add  :no-cache, no-store, 
must-revalidate "
 );
 
+@Test
+public void testListenersConfigAllowedValues() {
+Map props = baseProps();
+
+// no value set for "listeners"
+WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), 
props);
+assertEquals(LISTENERS_DEFAULT, 
config.getList(WorkerConfig.LISTENERS_CONFIG));
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:";);
+config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), 
Arrays.asList("http://a.b:";));

Review comment:
   copy paste mistake from other test, I fixed both tests.

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
##
@@ -56,6 +57,39 @@
 "set X-Frame-Options:DENY, add  :no-cache, no-store, 
must-revalidate "
 );
 
+@Test
+public void testListenersConfigAllowedValues() {
+Map props = baseProps();
+
+// no value set for "listeners"
+WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), 
props);
+assertEquals(LISTENERS_DEFAULT, 
config.getList(WorkerConfig.LISTENERS_CONFIG));
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:";);
+config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), 
Arrays.asList("http://a.b:";));
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:, 
https://a.b:7812";);
+config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), 
Arrays.asList("http://a.b:";, "https://a.b:7812";));
+
+new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+}
+
+@Test
+public void testListenersConfigNotAllowedValues() {
+Map props = baseProps();
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "");
+assertThrows(ConfigException.class, () -> new 
WorkerConfig(WorkerConfig.baseConfigDef(), props)).printStackTrace();
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:,";);
+assertThrows(ConfigException.class, () -> new 
WorkerConfig(WorkerConfig.baseConfigDef(), props));

Review comment:
   Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12946) __consumer_offsets topic with very big partitions

2021-06-14 Thread Emi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363203#comment-17363203
 ] 

Emi commented on KAFKA-12946:
-

[~rndgstn] What do you mean for "has a significantly smaller size than the 
leader"? Thanks

> __consumer_offsets topic with very big partitions
> -
>
> Key: KAFKA-12946
> URL: https://issues.apache.org/jira/browse/KAFKA-12946
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.0.0
>Reporter: Emi
>Priority: Critical
>
> I am using Kafka 2.0.0 with java 8u191
>  There is a partitions of the __consumer_offsets topic that is 600 GB with 
> 6000 segments older than 4 months. Other partitions of that topic are small: 
> 20-30 MB.
> There are 60 consumer groups, 90 topics and 100 partitions per topic.
> There aren't errors in the logs. From the log of the logcleaner, I can see 
> that partition is never touched from the logcleaner thread for the 
> compaction, but it only add new segments.
>  How is this possible?
> There was another partition with the same problem, but after some months it 
> has been compacted. Now there is only one partition with this problem, but 
> this is bigger and keep growing
> I have used the kafka-dump-log tool to check these old segments and I can see 
> many duplicates. So I would assume that is not compacted.
> My settings:
>  {{offsets.commit.required.acks = -1}}
>  {{[offsets.commit.timeout.ms|http://offsets.commit.timeout.ms/]}} = 5000
>  {{offsets.load.buffer.size = 5242880}}
>  
> {{[offsets.retention.check.interval.ms|http://offsets.retention.check.interval.ms/]}}
>  = 60
>  {{offsets.retention.minutes = 10080}}
>  {{offsets.topic.compression.codec = 0}}
>  {{offsets.topic.num.partitions = 50}}
>  {{offsets.topic.replication.factor = 3}}
>  {{offsets.topic.segment.bytes = 104857600}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram opened a new pull request #10882: KAFKA-12948: Remove node from ClusterConnectionStates.connectingNodes when node is removed

2021-06-14 Thread GitBox


rajinisivaram opened a new pull request #10882:
URL: https://github.com/apache/kafka/pull/10882


   NetworkClient.poll() throws IllegalStateException when checking 
`isConnectionSetupTimeout` if all nodes in 
`ClusterConnectionStates.connectingNodes` aren't present in 
`ClusterConnectionStates.nodeState`. When we remove a node from `nodeState`, we 
should also remove from `connectingNodes`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs

2021-06-14 Thread GitBox


kpatelatwork commented on a change in pull request #10841:
URL: https://github.com/apache/kafka/pull/10841#discussion_r651266225



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##
@@ -497,6 +476,37 @@ static void validateHeaderConfigAction(String action) {
 + "Expected one of %s", action, HEADER_ACTIONS));
 }
 }
+private static class ListenersValidator implements ConfigDef.Validator {
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+throw new ConfigException("Invalid value, at least one URI is 
expected, ex: http://localhost:8080,https://localhost:8443.";);
+}
+
+if (!(value instanceof List)) {
+throw new ConfigException("Invalid value type (list 
expected).");
+}
+
+List items = (List) value;
+if (items.isEmpty()) {
+throw new ConfigException("Invalid value, at least one URI is 
expected, ex: http://localhost:8080,https://localhost:8443.";);
+}
+
+for (Object item: items) {
+if (!(item instanceof String)) {
+throw new ConfigException("Invalid type for listener 
(expected String).");
+}
+if (Utils.isBlank((String) item)) {
+throw new ConfigException("Empty listener found when 
parsing list.");

Review comment:
   Fixed, it was a copy paste mistake from AdminListenerValidator, I have 
fixed AdminListenerValidator also, Would you please review that class changes 
also and let me know its ok to change the exception message there or is it 
backward incompatible?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] erdody commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


erdody commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r649771532



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##
@@ -357,6 +367,62 @@ public void validateConnectorConfig(Map 
connectorProps, Callback
 });
 }
 
+/**
+ * Build the {@link RestartPlan} that describes what should and should not 
be restarted given the restart request
+ * and the current status of the connector and task instances.
+ *
+ * @param request the restart request; may not be null
+ * @return the restart plan, or empty this worker has no status for the 
connector named in the request and therefore the

Review comment:
   Nit: or empty **if** this worker 

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
##
@@ -255,12 +257,29 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
 
 @POST
 @Path("/{connector}/restart")
-public void restartConnector(final @PathParam("connector") String 
connector,
+public Response restartConnector(final @PathParam("connector") String 
connector,
  final @Context HttpHeaders headers,
+ final @DefaultValue ("false") 
@QueryParam("includeTasks") Boolean includeTasks,
+ final @DefaultValue ("false") 
@QueryParam("onlyFailed") Boolean onlyFailed,
  final @QueryParam("forward") Boolean forward) 
throws Throwable {
-FutureCallback cb = new FutureCallback<>();
-herder.restartConnector(connector, cb);
-completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", 
"POST", headers, null, forward);
+RestartRequest restartRequest = new RestartRequest(connector, 
onlyFailed, includeTasks);
+if (restartRequest.forciblyRestartConnectorOnly()) {
+// For backward compatibility, just restart the connector instance 
and return OK with no body
+FutureCallback cb = new FutureCallback<>();
+herder.restartConnector(connector, cb);
+completeOrForwardRequest(cb, "/connectors/" + connector + 
"/restart", "POST", headers, null, forward);
+return Response.ok().build();
+}
+
+FutureCallback cb = new FutureCallback<>();
+herder.restartConnectorAndTasks(restartRequest, cb);
+Map queryParameters = new HashMap<>();
+queryParameters.put("includeTasks", String.valueOf(includeTasks));
+queryParameters.put("onlyFailed", String.valueOf(onlyFailed));
+String forwardingPath = "/connectors/" + connector + "/restart";

Review comment:
   Nit: Move up so you can share with 270?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -186,6 +192,10 @@
 private short currentProtocolVersion;
 private short backoffRetries;
 
+// visible for testing
+// The pending restart requests for the connectors;
+final NavigableSet pendingRestartRequests = new 
TreeSet<>();

Review comment:
   There are a few comments in different places explaining the special 
equality implementation in RestartRequest.
   Have we considered making this a Map to make it 
explicit that we keep the latest per connector, have a more typical 
equals/hashcode and avoid all the warnings?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -186,6 +192,10 @@
 private short currentProtocolVersion;
 private short backoffRetries;
 
+// visible for testing
+// The pending restart requests for the connectors;
+final NavigableSet pendingRestartRequests = new 
TreeSet<>();

Review comment:
   Just out of curiosity, any particular reason why we want to process 
these in connectorName order? (instead of FIFO)

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1063,6 +1076,104 @@ public int generation() {
 return generation;
 }
 
+@Override
+public void restartConnectorAndTasks(
+RestartRequest request,
+Callback callback
+) {
+final String connectorName = request.connectorName();
+addRequest(
+() -> {
+if (checkRebalanceNeeded(callback)) {
+return null;
+}
+if 
(!configState.connectors().contains(request.connectorName())) {
+callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+return null;
+}
+if (isLeader()) {
+   

[GitHub] [kafka] jsancio commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-06-14 Thread GitBox


jsancio commented on pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#issuecomment-860980428


   > Or may be we can make a seperate PR to handle the validation for 
onSnapshotFrozen?
   
   I am okay with fixing this in a future PR. Do you want to go ahead and file 
an sub-task for https://issues.apache.org/jira/browse/KAFKA-10310 and link it 
here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12948) NetworkClient.close(node) with node in connecting state makes NetworkClient unusable

2021-06-14 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-12948:
--

 Summary: NetworkClient.close(node) with node in connecting state 
makes NetworkClient unusable
 Key: KAFKA-12948
 URL: https://issues.apache.org/jira/browse/KAFKA-12948
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 2.7.1, 2.8.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.7.2, 2.8.1


`NetworkClient.close(node)` closes the node and removes it from 
`ClusterConnectionStates.nodeState`, but not from 
`ClusterConnectionStates.connectingNodes`. Subsequent `NetworkClient.poll()` 
invocations throw IllegalStateException and this leaves the NetworkClient in an 
unusable state until the node is removed from connectionNodes or added to 
nodeState. We don't use `NetworkClient.close(node)` in clients, but we use it 
in clients started by brokers for replica fetcher and controller. Since brokers 
use NetworkClientUtils.isReady() before establishing connections and this 
invokes poll(), the NetworkClient never recovers.

Exception stack trace:
{code:java}
java.lang.IllegalStateException: No entry found for connection 0
at 
org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:409)
at 
org.apache.kafka.clients.ClusterConnectionStates.isConnectionSetupTimeout(ClusterConnectionStates.java:446)
at 
org.apache.kafka.clients.ClusterConnectionStates.lambda$nodesWithConnectionSetupTimeout$0(ClusterConnectionStates.java:458)
at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at 
org.apache.kafka.clients.ClusterConnectionStates.nodesWithConnectionSetupTimeout(ClusterConnectionStates.java:459)
at 
org.apache.kafka.clients.NetworkClient.handleTimedOutConnections(NetworkClient.java:807)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
at 
org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:42)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on a change in pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs

2021-06-14 Thread GitBox


rhauch commented on a change in pull request #10841:
URL: https://github.com/apache/kafka/pull/10841#discussion_r651200941



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##
@@ -497,6 +476,37 @@ static void validateHeaderConfigAction(String action) {
 + "Expected one of %s", action, HEADER_ACTIONS));
 }
 }
+private static class ListenersValidator implements ConfigDef.Validator {
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+throw new ConfigException("Invalid value, at least one URI is 
expected, ex: http://localhost:8080,https://localhost:8443.";);
+}
+
+if (!(value instanceof List)) {
+throw new ConfigException("Invalid value type (list 
expected).");
+}
+
+List items = (List) value;
+if (items.isEmpty()) {
+throw new ConfigException("Invalid value, at least one URI is 
expected, ex: http://localhost:8080,https://localhost:8443.";);
+}
+
+for (Object item: items) {
+if (!(item instanceof String)) {
+throw new ConfigException("Invalid type for listener 
(expected String).");
+}
+if (Utils.isBlank((String) item)) {
+throw new ConfigException("Empty listener found when 
parsing list.");

Review comment:
   Nit: should we say "listener URL" instead of just "listener"? Or maybe 
"Empty URL found when parsing listeners list"?

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
##
@@ -56,6 +57,39 @@
 "set X-Frame-Options:DENY, add  :no-cache, no-store, 
must-revalidate "
 );
 
+@Test
+public void testListenersConfigAllowedValues() {
+Map props = baseProps();
+
+// no value set for "listeners"
+WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), 
props);
+assertEquals(LISTENERS_DEFAULT, 
config.getList(WorkerConfig.LISTENERS_CONFIG));
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:";);
+config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), 
Arrays.asList("http://a.b:";));
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:, 
https://a.b:7812";);
+config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+assertEquals(config.getList(WorkerConfig.LISTENERS_CONFIG), 
Arrays.asList("http://a.b:";, "https://a.b:7812";));
+
+new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+}
+
+@Test
+public void testListenersConfigNotAllowedValues() {
+Map props = baseProps();
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "");
+assertThrows(ConfigException.class, () -> new 
WorkerConfig(WorkerConfig.baseConfigDef(), props)).printStackTrace();
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:,";);
+assertThrows(ConfigException.class, () -> new 
WorkerConfig(WorkerConfig.baseConfigDef(), props));

Review comment:
   Should you check that the exception message contains an expected string? 
Technically, catching this exception doesn't ensure that the `listeners` 
property is the one that failed.

##
File path: docs/upgrade.html
##
@@ -69,7 +69,9 @@ Notable changes in 3
 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 
instead.
 The quota.producer.default and 
quota.consumer.default configurations were removed (https://issues.apache.org/jira/browse/KAFKA-12591";>KAFKA-12591).
 Dynamic quota defaults must be used instead.
-The default value for the consumer configuration 
session.timeout.ms was increased from 10s to 45s. See 
+The deprecated worker configurations rest.host.name 
and rest.port were removed (https://issues.apache.org/jira/browse/KAFKA-12482";>KAFKA-12482) in 
the Kafka Connect.

Review comment:
   Also, should probably add `connect` to the list on line 32.

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
##
@@ -56,6 +57,39 @@
 "set X-Frame-Options:DENY, add  :no-cache, no-store, 
must-revalidate "
 );
 
+@Test
+public void testListenersConfigAllowedValues() {
+Map props = baseProps();
+
+// no value set for "listeners"
+WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), 
props);
+assertEquals(LISTENERS_DEFAULT, 
config.getList(WorkerConfig.LISTENERS_CONFIG));
+
+props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:";);
+config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
+assertEquals(config.getList(WorkerConfi

[jira] [Commented] (KAFKA-12946) __consumer_offsets topic with very big partitions

2021-06-14 Thread Ron Dagostino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363155#comment-17363155
 ] 

Ron Dagostino commented on KAFKA-12946:
---

If the partition isn't being cleaned then you can try setting 
min.cleanable.dirty.ratio=0 for the __consumer_offsets topic; this might allow 
it to get cleaned.  You can delete that config after a while to let the value 
default back.

Another possibility might exist if one of the follower replicas has a 
significantly smaller size than the leader; in such cases you can move 
leadership to the smaller replica and then reassign the follower replicas to 
new brokers so that they will copy the (much smaller-sized) data; then you can 
migrate the followers back to where they were originally and move the leader 
back to the original leader.  This solution will only work if you have more 
brokers than the replication factor.

Finally, take a look at 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions.
  You may not have any other options right now if it is a hanging transaction, 
but help is coming.

> __consumer_offsets topic with very big partitions
> -
>
> Key: KAFKA-12946
> URL: https://issues.apache.org/jira/browse/KAFKA-12946
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.0.0
>Reporter: Emi
>Priority: Critical
>
> I am using Kafka 2.0.0 with java 8u191
>  There is a partitions of the __consumer_offsets topic that is 600 GB with 
> 6000 segments older than 4 months. Other partitions of that topic are small: 
> 20-30 MB.
> There are 60 consumer groups, 90 topics and 100 partitions per topic.
> There aren't errors in the logs. From the log of the logcleaner, I can see 
> that partition is never touched from the logcleaner thread for the 
> compaction, but it only add new segments.
>  How is this possible?
> There was another partition with the same problem, but after some months it 
> has been compacted. Now there is only one partition with this problem, but 
> this is bigger and keep growing
> I have used the kafka-dump-log tool to check these old segments and I can see 
> many duplicates. So I would assume that is not compacted.
> My settings:
>  {{offsets.commit.required.acks = -1}}
>  {{[offsets.commit.timeout.ms|http://offsets.commit.timeout.ms/]}} = 5000
>  {{offsets.load.buffer.size = 5242880}}
>  
> {{[offsets.retention.check.interval.ms|http://offsets.retention.check.interval.ms/]}}
>  = 60
>  {{offsets.retention.minutes = 10080}}
>  {{offsets.topic.compression.codec = 0}}
>  {{offsets.topic.num.partitions = 50}}
>  {{offsets.topic.replication.factor = 3}}
>  {{offsets.topic.segment.bytes = 104857600}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #10863: KAFKA-12890; Consumer group stuck in `CompletingRebalance`

2021-06-14 Thread GitBox


dajac commented on pull request #10863:
URL: https://github.com/apache/kafka/pull/10863#issuecomment-860921205


   @hachikuji Thanks for the clarification. Yes, that makes sense to me as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik edited a comment on pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-14 Thread GitBox


kowshik edited a comment on pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#issuecomment-860537313


   @junrao Thanks for the review! I ran the system tests.
   1. [System test run 
#4560](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4560/) 
on top of the latest commit 008b701386ce5a4d892d6ac5b90798b981c4fba0 from this 
PR. The run finished with 12 test failures.
   2. [System test run 
#4561](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4561/) 
against AK trunk on top of commit 6de37e536ac76ef13530d49dc7320110332cd1ee 
which does not contain changes from this PR. The run finished with 13 test 
failures.
   
   There were 11 overlapping failures in both (1) and (2). For these, I didn't 
find anything abnormal in the logs so far, the failure reason seems similar in 
both.
   
   The only new failure in (1) that's not present in (2) was:
   
   ```
   Module: kafkatest.tests.client.consumer_test
   Class:  OffsetValidationTest
   Method: test_broker_failure
   Arguments:
   {
 "clean_shutdown": true,
 "enable_autocommit": false,
 "metadata_quorum": "REMOTE_KRAFT"
   }
   ```
   
   Logs indicate that the test failed [at this 
line](https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/tests/kafkatest/tests/client/consumer_test.py#L388)
 because one of the worker nodes running the consumer didn't complete within 
the timeout of 30s. This doesn't seem indicative of a real failure (yet), so 
I'm rerunning the system tests again in [test run 
#4562](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4562/) 
to check if the failure is consistent. I'll keep you posted on the outcome of 
this second run.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10863: KAFKA-12890; Consumer group stuck in `CompletingRebalance`

2021-06-14 Thread GitBox


hachikuji commented on pull request #10863:
URL: https://github.com/apache/kafka/pull/10863#issuecomment-860914096


   @dajac Given the background from KAFKA-10134, the rebalance timeout seems 
like the right one in my mind. We're basically allowing for a delay between the 
JoinGroup and SyncGroup caused by consumer processing while the rebalance is in 
progress. This time is bounded by max.poll.interval.ms which is used as the 
rebalance timeout internally. Does that make sense?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #10843: MINOR: Log formatting for exceptions during configuration related operations

2021-06-14 Thread GitBox


dajac commented on pull request #10843:
URL: https://github.com/apache/kafka/pull/10843#issuecomment-860910847


   @YiDing-Duke Thanks for the update.
   
   @showuon Does the PR look good to you now?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-06-14 Thread GitBox


jsancio commented on pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#issuecomment-860908546


   > I assume a KIP is needed.
   
   We don't need a KIP. All of these API are internal APIs that are not 
accessible/publish to projects external to Apache Kafka.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10804: KAFKA-12877: Make flexibleVersions mandatory

2021-06-14 Thread GitBox


cmccabe commented on pull request #10804:
URL: https://github.com/apache/kafka/pull/10804#issuecomment-860882713


   OK, I reconsidered this a bit. I think we DON'T want to make incompatible 
changes if we can avoid it... such as changing the default from NONE to ALL.
   
   There are a few reasons for this. One is that we eventually want to have a 
verifier tool that checks that changes between message files shipped with 
version X and those shipped with version X+1 are compatible. This is a lot 
easier if we refrain from gratuitous compatibility breaks.
   
   Another is that there are external projects that are looking at these JSON 
files now. There's no formal contract or anything, but it's just nicer for the 
ecosystem not to break things.
   
   Finally, downstream forks of Kafka may have a difficult time if we change 
defaults around like this. For example, if someone had an in-house fork of 
Kafka with some new messages, changing around defaults could cause a 
compatibility break for them. It's better not to have to worry about this.
   
   For all these reasons, I think it's best just to require explicitly spelling 
out `flexibleVersions` for now. It's a little extra boilerplate but not that 
bad in the grand scheme of things. Maybe eventually we can have more versioning 
in the JSON file that lets us elide some of this, but for now I think this is 
the best way to go. cc @hachikuji 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10880: KAFKA-12870; Flush in progress not cleared after transaction completion

2021-06-14 Thread GitBox


hachikuji commented on pull request #10880:
URL: https://github.com/apache/kafka/pull/10880#issuecomment-860877998


   Note I have a few additional tests which I will post shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wycccccc opened a new pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for StreamsMe…

2021-06-14 Thread GitBox


wycc opened a new pull request #10881:
URL: https://github.com/apache/kafka/pull/10881


   Development of EasyMock and PowerMock has stagnated while Mockito continues 
to be actively developed. With the new Java cadence, it's a problem to depend 
on libraries that do bytecode generation and are not actively maintained. In 
addition, Mockito is also easier to 
use.[KAFKA-7438](https://issues.apache.org/jira/browse/KAFKA-7438)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12944) KIP-724: Always write record batches with message format v2

2021-06-14 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12944:

Description: If IBP is 3.0 or higher.

> KIP-724: Always write record batches with message format v2
> ---
>
> Key: KAFKA-12944
> URL: https://issues.apache.org/jira/browse/KAFKA-12944
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.0.0
>
>
> If IBP is 3.0 or higher.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12947) Replace EasyMock and PowerMock with Mockito for StreamsMetricsImplTest ...

2021-06-14 Thread YI-CHEN WANG (Jira)
YI-CHEN WANG created KAFKA-12947:


 Summary: Replace EasyMock and PowerMock with Mockito for 
StreamsMetricsImplTest ...
 Key: KAFKA-12947
 URL: https://issues.apache.org/jira/browse/KAFKA-12947
 Project: Kafka
  Issue Type: Sub-task
Reporter: YI-CHEN WANG
Assignee: YI-CHEN WANG


For Kafka-7438



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji opened a new pull request #10880: KAFKA-12870; Flush in progress not cleared after transaction completion

2021-06-14 Thread GitBox


hachikuji opened a new pull request #10880:
URL: https://github.com/apache/kafka/pull/10880


   We had been using `RecordAccumulator.beginFlush` in order to force the 
`RecordAccumulator` to flush pending batches when a transaction was being 
completed. Internally, `RecordAccumulator` has a simple counter for the number 
of flushes in progress. The count gets incremented in `beginFlush` and it is 
expected to be decremented by `awaitFlushCompletion`. The second call to 
decrement the counter never happened in the transactional path, so the counter 
could get stuck at a positive value, which means that the linger time would 
effectively be ignored.
   
   The patch here fixes the problem by removing the use of `beginFlush` in 
`Sender`. Instead, we now add an additional condition in `RecordAccumulator` to 
explicitly check when a transaction is being completed. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #10878: KAFKA-12898; Owned partitions in the subscription must be sorted

2021-06-14 Thread GitBox


dajac commented on pull request #10878:
URL: https://github.com/apache/kafka/pull/10878#issuecomment-860843508


   I forgot to update few tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   >