[GitHub] [kafka] ewencp commented on a change in pull request #10915: Enable connecting VS Code remote debugger

2021-06-23 Thread GitBox


ewencp commented on a change in pull request #10915:
URL: https://github.com/apache/kafka/pull/10915#discussion_r657638116



##
File path: tests/README.md
##
@@ -51,6 +51,40 @@ bash tests/docker/ducker-ak up -j 'openjdk:11'; 
tests/docker/run_tests.sh
 ```
 REBUILD="t" bash tests/docker/run_tests.sh
 ```
+* Debug tests in VS Code:
+  - Run test with `--debug` flag (can be before or after file name):

Review comment:
   As in other thread, I'm fine either way. I prefer just getting things to 
a clean, consistent state, especially since these are tests so we don't really 
have the same compatibility requirements. But if that creates too much 
overhead/confusion (especially since we need to continue running tests on older 
branches), this alternative approach seems acceptable even if a bit unusual.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stan-confluent commented on a change in pull request #10915: Enable connecting VS Code remote debugger

2021-06-23 Thread GitBox


stan-confluent commented on a change in pull request #10915:
URL: https://github.com/apache/kafka/pull/10915#discussion_r657636156



##
File path: tests/README.md
##
@@ -51,6 +51,40 @@ bash tests/docker/ducker-ak up -j 'openjdk:11'; 
tests/docker/run_tests.sh
 ```
 REBUILD="t" bash tests/docker/run_tests.sh
 ```
+* Debug tests in VS Code:
+  - Run test with `--debug` flag (can be before or after file name):

Review comment:
   We can still keep this - I do need to update run_tests.sh to pass 
_DUCKTAPE_OPTIONS after the `--`.
   My idea was that I didn't want to make sure the flags don't overlap between 
ducktape and ducker-ak, hence I changed ducker-ak to expect ducktape args after 
the `--` - like this `ducker-ak test my_test.py -- --ducktape-flag`. 
   I am, however, open to simply using a different flag name - it is unlikely 
we'll add too many flags to the ducker-ak run command anyway, so we can simply 
make sure they don't match the ducktape ones. 
   @omkreddy and @ewencp (with whom we had similar conversation in a different 
repo) - which one do you prefer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-23 Thread GitBox


skaundinya15 commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r657611619



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
##
@@ -29,9 +32,9 @@
  */
 @InterfaceStability.Evolving
 public class DeleteConsumerGroupsResult {
-private final Map> futures;
+private final Map> futures;
 
-DeleteConsumerGroupsResult(final Map> futures) {
+DeleteConsumerGroupsResult(Map> 
futures) {

Review comment:
   @tombentley I see, thank you for the clarification. I guess in that case 
we have to stick with returning the `KafkaFutureImpl` in this situation then? 
Doesn't really seem to be a way around this otherwise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12987) kafka用户无密码防暴力破解功能

2021-06-23 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368600#comment-17368600
 ] 

Luke Chen commented on KAFKA-12987:
---

[~ohye3166], thanks for your report. Could you translate into English? Also, 
could you add reproduce steps or what area you mentioned that the attacker can 
brute-force password guessing?

Thank you.

> kafka用户无密码防暴力破解功能
> -
>
> Key: KAFKA-12987
> URL: https://issues.apache.org/jira/browse/KAFKA-12987
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 2.7.1
>Reporter: chenzongyi
>Priority: Major
>
> 可多次用错误的密码进行访问,没有防暴力破解的 功能



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on pull request #10923: KAFKA-12976: Remove UNSUPPORTED_VERSION error from delete and describe topics calls

2021-06-23 Thread GitBox


jolshan commented on pull request #10923:
URL: https://github.com/apache/kafka/pull/10923#issuecomment-867292878


   I don't think we need to support unsupported version errors in the client.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12987) kafka用户无密码防暴力破解功能

2021-06-23 Thread chenzongyi (Jira)
chenzongyi created KAFKA-12987:
--

 Summary: kafka用户无密码防暴力破解功能
 Key: KAFKA-12987
 URL: https://issues.apache.org/jira/browse/KAFKA-12987
 Project: Kafka
  Issue Type: Improvement
  Components: admin
Affects Versions: 2.7.1
Reporter: chenzongyi


可多次用错误的密码进行访问,没有防暴力破解的 功能



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10900: KAFKA-12967; KRaft broker should forward DescribeQuorum to controller

2021-06-23 Thread GitBox


hachikuji commented on a change in pull request #10900:
URL: https://github.com/apache/kafka/pull/10900#discussion_r657576935



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -217,6 +227,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 case ApiKeys.DESCRIBE_TRANSACTIONS => 
handleDescribeTransactionsRequest(request)
 case ApiKeys.LIST_TRANSACTIONS => 
handleListTransactionsRequest(request)
 case ApiKeys.ALLOCATE_PRODUCER_IDS => 
maybeForwardToController(request, handleAllocateProducerIdsRequest)
+case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)

Review comment:
   Yeah, because DescribeQuorum does not make sense for zk clusters. As far 
as I know, this is the only case of a client api which is only exposed under 
kraft. However, we do have some internal controller apis that are only used by 
kraft (e.g. for broker registration and heartbeating).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] g1geordie commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

2021-06-23 Thread GitBox


g1geordie commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-867277063


   @bbejeck 
   Failed test also appear in other issues. I think it's unrelated.
   ```
   org.apache.kafka.common.network.SslTransportLayerTest.[1] 
tlsProtocol=TLSv1.2, useInlinePem=false
   kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   ```
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-23 Thread GitBox


ccding commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r657558091



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -167,21 +245,14 @@ object LogLoader extends Logging {
* in place of existing segment(s). For log splitting, we know that any 
.swap file whose base offset is higher than
* the smallest offset .clean file could be part of an incomplete split 
operation. Such .swap files are also deleted
* by this method.
+   *
* @param params The parameters for the log being loaded from disk
-   * @return Set of .swap files that are valid to be swapped in as segment 
files
+   * @return Set of .swap files that are valid to be swapped in as segment 
files and index files

Review comment:
   No, we are not renaming .cleaned files to .swap files due to KAFKA-6264. 
I forgot to update the description of the PR. Just updated it: please see the 
updated one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-23 Thread GitBox


ccding commented on pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#issuecomment-867273158


   @junrao Thanks for the review. Addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-23 Thread GitBox


tombentley commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r657570794



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
##
@@ -29,9 +32,9 @@
  */
 @InterfaceStability.Evolving
 public class DeleteConsumerGroupsResult {
-private final Map> futures;
+private final Map> futures;
 
-DeleteConsumerGroupsResult(final Map> futures) {
+DeleteConsumerGroupsResult(Map> 
futures) {

Review comment:
   @skaundinya15 those methods in `KafkaFuture` are intentionally not 
`public` because a user receiving an instance should _never_ need to complete 
the future (they're always completed by the admin client). Making them `public` 
would thus make the API less type safe. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

2021-06-23 Thread GitBox


ableegoldman commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657560587



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1558,12 +1607,45 @@ private void processStreamThread(final 
Consumer consumer) {
 for (final StreamThread thread : copy) consumer.accept(thread);
 }
 
+/**
+ * Returns runtime information about the local threads of this {@link 
KafkaStreams} instance.
+ *
+ * @return the set of {@link 
org.apache.kafka.streams.processor.ThreadMetadata}.
+ * @deprecated since 3.0 use {@link #threadsMetadata()}
+ */
+@Deprecated
+@SuppressWarnings("deprecation")
+public Set 
localThreadsMetadata() {
+return threadsMetadata().stream().map(threadMetadata -> new 
org.apache.kafka.streams.processor.ThreadMetadata(
+threadMetadata.threadName(),
+threadMetadata.threadState(),
+threadMetadata.consumerClientId(),
+threadMetadata.restoreConsumerClientId(),
+threadMetadata.producerClientIds(),
+threadMetadata.adminClientId(),
+threadMetadata.activeTasks().stream().map(taskMetadata -> new 
org.apache.kafka.streams.processor.TaskMetadata(
+taskMetadata.taskId().toString(),
+taskMetadata.topicPartitions(),
+taskMetadata.committedOffsets(),
+taskMetadata.endOffsets(),
+taskMetadata.timeCurrentIdlingStarted())
+).collect(Collectors.toSet()),
+threadMetadata.standbyTasks().stream().map(taskMetadata -> new 
org.apache.kafka.streams.processor.TaskMetadata(
+taskMetadata.taskId().toString(),
+taskMetadata.topicPartitions(),
+taskMetadata.committedOffsets(),
+taskMetadata.endOffsets(),
+taskMetadata.timeCurrentIdlingStarted())
+).collect(Collectors.toSet(
+.collect(Collectors.toSet());
+}
+
 /**
  * Returns runtime information about the local threads of this {@link 
KafkaStreams} instance.
  *
  * @return the set of {@link ThreadMetadata}.
  */
-public Set localThreadsMetadata() {
+public Set threadsMetadata() {

Review comment:
   @cadonna `localThreadMetadata` still sounds more correct to me than 
`localThreadsMetadata`. I really can't explain it other than to say that 
English is weird, and names/titles like this do not always follow the regular 
rules of grammar/plurals 路‍♀️ 
   
   But actually I like your suggestion `metadataForLocalThreads()` even better 
than any of them, SGTM




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-23 Thread GitBox


ccding commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r657560081



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -106,7 +174,17 @@ object LogLoader extends Logging {
   loadSegmentFiles(params)
 })
 
-completeSwapOperations(swapFiles, params)
+// Do the actual recovery for toRecoverSwapFiles, as discussed above.

Review comment:
   you are right if we don't need to do sanity checks. Removed this

##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -90,11 +90,79 @@ object LogLoader extends Logging {
*   overflow index offset
*/
   def load(params: LoadLogParams): LoadedLogOffsets = {
-// first do a pass through the files in the log directory and remove any 
temporary files
+
+// First pass: through the files in the log directory and remove any 
temporary files
 // and find any interrupted swap operations
 val swapFiles = removeTempFilesAndCollectSwapFiles(params)
 
-// Now do a second pass and load all the log and index files.
+// The remaining valid swap files must come from compaction operation. We 
can simply rename them
+// to regular segment files. But, before renaming, we should figure out 
which segments are
+// compacted and delete these segment files: this is done by calculating 
min/maxSwapFileOffset.
+// If sanity check fails, we cannot do the simple renaming, we must do a 
full recovery, which
+// involves rebuilding all the index files and the producer state.
+// We store segments that require renaming and recovery in this code 
block, and do the actual
+// renaming and recovery later.
+var minSwapFileOffset = Long.MaxValue
+var maxSwapFileOffset = Long.MinValue
+val toRenameSwapFiles = mutable.Set[File]()
+val toRecoverSwapFiles = mutable.Set[File]()
+swapFiles.filter(f => Log.isLogFile(new 
File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "".foreach { f =>
+  val baseOffset = offsetFromFile(f)
+  val segment = LogSegment.open(f.getParentFile,
+baseOffset = baseOffset,
+params.config,
+time = params.time,
+fileSuffix = Log.SwapFileSuffix)
+  try {
+segment.sanityCheck(false)

Review comment:
   fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-23 Thread GitBox


ccding commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r657559556



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -90,11 +90,79 @@ object LogLoader extends Logging {
*   overflow index offset
*/
   def load(params: LoadLogParams): LoadedLogOffsets = {
-// first do a pass through the files in the log directory and remove any 
temporary files
+
+// First pass: through the files in the log directory and remove any 
temporary files
 // and find any interrupted swap operations
 val swapFiles = removeTempFilesAndCollectSwapFiles(params)
 
-// Now do a second pass and load all the log and index files.
+// The remaining valid swap files must come from compaction operation. We 
can simply rename them

Review comment:
   are you concerned about the logic or the comment? If comment only, I 
fixed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-23 Thread GitBox


ccding commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r657558091



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -167,21 +245,14 @@ object LogLoader extends Logging {
* in place of existing segment(s). For log splitting, we know that any 
.swap file whose base offset is higher than
* the smallest offset .clean file could be part of an incomplete split 
operation. Such .swap files are also deleted
* by this method.
+   *
* @param params The parameters for the log being loaded from disk
-   * @return Set of .swap files that are valid to be swapped in as segment 
files
+   * @return Set of .swap files that are valid to be swapped in as segment 
files and index files

Review comment:
   No, we are not renaming .cleaned files to .swap files due to KAFKA-6264. 
I forgot to update the description of the PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-23 Thread GitBox


ccding commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r657558091



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -167,21 +245,14 @@ object LogLoader extends Logging {
* in place of existing segment(s). For log splitting, we know that any 
.swap file whose base offset is higher than
* the smallest offset .clean file could be part of an incomplete split 
operation. Such .swap files are also deleted
* by this method.
+   *
* @param params The parameters for the log being loaded from disk
-   * @return Set of .swap files that are valid to be swapped in as segment 
files
+   * @return Set of .swap files that are valid to be swapped in as segment 
files and index files

Review comment:
   No, we are not renaming .cleaned files to .swap files. I forgot to 
update the description of the PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata

2021-06-23 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368542#comment-17368542
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12984:


[~mjsax] Technically yes, the issue with the SubscriptionState potentially 
providing invalid "ownedPartitions" input can affect Kafka Streams as well. 
However the impact for Streams is be considerably less severe, as the 
assignment algorithm it doesn't make any assumptions about the previous 
assignment being valid. The worst that should happen to a Streams application 
is that the assignment could be slightly sub-optimal, with a partition/active 
task being assigned to a member that had dropped out of the group since being 
assigned that partition, instead of its true current owner. 

> Cooperative sticky assignor can get stuck with invalid SubscriptionState 
> input metadata
> ---
>
> Key: KAFKA-12984
> URL: https://issues.apache.org/jira/browse/KAFKA-12984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0, 2.8.1
>
>
> Some users have reported seeing their consumer group become stuck in the 
> CompletingRebalance phase when using the cooperative-sticky assignor. Based 
> on the request metadata we were able to deduce that multiple consumers were 
> reporting the same partition(s) in their "ownedPartitions" field of the 
> consumer protocol. Since this is an invalid state, the input causes the 
> cooperative-sticky assignor to detect that something is wrong and throw an 
> IllegalStateException. If the consumer application is set up to simply retry, 
> this will cause the group to appear to hang in the rebalance state.
> The "ownedPartitions" field is encoded based on the ConsumerCoordinator's 
> SubscriptionState, which was assumed to always be up to date. However there 
> may be cases where the consumer has dropped out of the group but fails to 
> clear the SubscriptionState, allowing it to report some partitions as owned 
> that have since been reassigned to another member.
> We should (a) fix the sticky assignment algorithm to resolve cases of 
> improper input conditions by invalidating the "ownedPartitions" in cases of 
> double ownership, and (b) shore up the ConsumerCoordinator logic to better 
> handle rejoining the group and keeping its internal state consistent. See 
> KAFKA-12983 for more details on (b)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface

2021-06-23 Thread GitBox


cmccabe commented on a change in pull request #10887:
URL: https://github.com/apache/kafka/pull/10887#discussion_r657531976



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1352,8 +1370,7 @@ class ReplicaManagerTest {
   Optional.of(1))
 val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, 
None, timeout = 10)
 assertNull(fetchResult.get)
-
-Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true)
+
Mockito.when(replicaManager.metadataCache.contains(ArgumentMatchers.eq(tp0))).thenReturn(true)

Review comment:
   your proposed change doesn't compile because metadataCache is not 
defined in this scope




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface

2021-06-23 Thread GitBox


cmccabe commented on a change in pull request #10887:
URL: https://github.com/apache/kafka/pull/10887#discussion_r657531588



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -62,17 +46,22 @@ trait MetadataCache {
 
   def getAllTopics(): collection.Set[String]
 
-  def getAllPartitions(): collection.Set[TopicPartition]
+  def getTopicPartitions(topicName: String): collection.Set[TopicPartition]
 
-  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+  def hasAliveBroker(brokerId: Int): Boolean
 
-  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+  def getAliveBrokers(): Iterable[BrokerMetadata]
 
-  def getAliveBrokers: collection.Seq[MetadataBroker]
+  def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node]
+
+  def getAliveBrokerNodes(listenerName: String): Iterable[Node]
 
   def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
 
-  def numPartitions(topic: String): Option[Int]
+  /**
+   * Return the number of partitions in the given topic, or 0 if the given 
topic does not exist.
+   */
+  def numPartitions(topic: String): Int

Review comment:
   It just seems awkward because typically when you ask "how many 
partitions?" the answer is not Some or None, but a number.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface

2021-06-23 Thread GitBox


cmccabe commented on a change in pull request #10887:
URL: https://github.com/apache/kafka/pull/10887#discussion_r657530812



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -2886,7 +2885,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   sendResponseCallback(error)(partitionErrors)
 } else {
   val partitions = if (electionRequest.data.topicPartitions == null) {
-metadataCache.getAllPartitions()
+
metadataCache.getAllTopics().flatMap(metadataCache.getTopicPartitions(_))

Review comment:
   ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface

2021-06-23 Thread GitBox


cmccabe commented on a change in pull request #10887:
URL: https://github.com/apache/kafka/pull/10887#discussion_r657530674



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1164,7 +1164,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 var unauthorizedForCreateTopics = Set[String]()
 
 if (authorizedTopics.nonEmpty) {
-  val nonExistingTopics = 
metadataCache.getNonExistingTopics(authorizedTopics)
+  val nonExistingTopics = 
authorizedTopics.filter(!metadataCache.contains(_))

Review comment:
   ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

2021-06-23 Thread GitBox


IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-867206422


   Hi @dajac, thanks again for your comments.
   I've updated the PR with them in consideration. 
   What do you think?
   best regards


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-23 Thread GitBox


junrao commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r657507051



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -106,7 +174,17 @@ object LogLoader extends Logging {
   loadSegmentFiles(params)
 })
 
-completeSwapOperations(swapFiles, params)
+// Do the actual recovery for toRecoverSwapFiles, as discussed above.

Review comment:
   Hmm, I am not sure why we need this step. We have processed all .swap 
files before and no new .swap files should be introduced if we get to here.

##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -167,21 +245,14 @@ object LogLoader extends Logging {
* in place of existing segment(s). For log splitting, we know that any 
.swap file whose base offset is higher than
* the smallest offset .clean file could be part of an incomplete split 
operation. Such .swap files are also deleted
* by this method.
+   *
* @param params The parameters for the log being loaded from disk
-   * @return Set of .swap files that are valid to be swapped in as segment 
files
+   * @return Set of .swap files that are valid to be swapped in as segment 
files and index files

Review comment:
   The PR descriptions says "as a result, if at least one .swap file exists 
for a segment, all other files for the segment must exist as .cleaned files or 
.swap files. Therefore, we rename the .cleaned files to .swap files, then make 
them normal segment files.". Are we implementing the renaming of .clean files 
to .swap files?

##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -90,11 +90,79 @@ object LogLoader extends Logging {
*   overflow index offset
*/
   def load(params: LoadLogParams): LoadedLogOffsets = {
-// first do a pass through the files in the log directory and remove any 
temporary files
+
+// First pass: through the files in the log directory and remove any 
temporary files
 // and find any interrupted swap operations
 val swapFiles = removeTempFilesAndCollectSwapFiles(params)
 
-// Now do a second pass and load all the log and index files.
+// The remaining valid swap files must come from compaction operation. We 
can simply rename them
+// to regular segment files. But, before renaming, we should figure out 
which segments are
+// compacted and delete these segment files: this is done by calculating 
min/maxSwapFileOffset.
+// If sanity check fails, we cannot do the simple renaming, we must do a 
full recovery, which
+// involves rebuilding all the index files and the producer state.
+// We store segments that require renaming and recovery in this code 
block, and do the actual
+// renaming and recovery later.
+var minSwapFileOffset = Long.MaxValue
+var maxSwapFileOffset = Long.MinValue
+val toRenameSwapFiles = mutable.Set[File]()
+val toRecoverSwapFiles = mutable.Set[File]()
+swapFiles.filter(f => Log.isLogFile(new 
File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "".foreach { f =>
+  val baseOffset = offsetFromFile(f)
+  val segment = LogSegment.open(f.getParentFile,
+baseOffset = baseOffset,
+params.config,
+time = params.time,
+fileSuffix = Log.SwapFileSuffix)
+  try {
+segment.sanityCheck(false)

Review comment:
   It doesn't seem we need this since we call segment.sanityCheck() on all 
segments later in loadSegmentFiles().

##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -90,11 +90,79 @@ object LogLoader extends Logging {
*   overflow index offset
*/
   def load(params: LoadLogParams): LoadedLogOffsets = {
-// first do a pass through the files in the log directory and remove any 
temporary files
+
+// First pass: through the files in the log directory and remove any 
temporary files
 // and find any interrupted swap operations
 val swapFiles = removeTempFilesAndCollectSwapFiles(params)
 
-// Now do a second pass and load all the log and index files.
+// The remaining valid swap files must come from compaction operation. We 
can simply rename them

Review comment:
   It seems that those swap files could be the result of segment split too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-23 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657507129



##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+val offsets = Map(
+  //testTopicPartition0 -> there is no offset information for an asssigned 
topic partition
+  testTopicPartition1 -> new OffsetAndMetadata(100), // regular 
information for a assigned partition
+  testTopicPartition2 -> null, //there is a null value for an asssigned 
topic partition
+  // testTopicPartition3 ->  there is no offset information for an 
unasssigned topic partition
+  testTopicPartition4 -> new OffsetAndMetadata(100), // regular 
information for a unassigned partition
+  testTopicPartition5 -> null, //there is a null value for an unasssigned 
topic partition
+).asJava
+
+val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+val endOffsets = Map(
+  testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+)
+val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2 )
+val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) 
=> assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+def describeGroupsResult(groupState: ConsumerGroupState): 
DescribeConsumerGroupsResult = {
+  val member1 = new MemberDescription("member1", Optional.of("instance1"), 
"client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+  val description = new ConsumerGroupDescription(group,
+true,
+Collections.singleton(member1),
+classOf[RangeAssignor].getName,
+groupState,
+new Node(1, "localhost", 9092))
+  new DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(description)))
+}
+
+def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] 
= {
+  val expectedOffsets = endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+  ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+map.keySet.asScala == expectedOffsets.keySet && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+  }
+}
+def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, 
OffsetSpec] = {
+  val expectedOffsets = offsets.asScala.filter{ case (tp, _) => 
unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+  ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+map.keySet.asScala == expectedOffsets.keySet && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+  }
+}
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+  .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+  .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets))
+doAnswer(_ => new 
ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics,
 any())
+doAnswer(_ => new 
ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics,
 any())
+
+val (state, assignments) = groupService.collectGroupOffsets(group)
+assertEquals(Some("Stable"), state)
+assertTrue(assignments.nonEmpty)
+// Results should have information for all assigned topic partition (even 
if there is not Offset's information at all, because they get fills with None)
+// Results should have information only for unassigned topic partitions if 
and only if there is information about 

[GitHub] [kafka] izzyacademy commented on pull request #10740: Kafka 8613 kip 633 drop default grace period streams

2021-06-23 Thread GitBox


izzyacademy commented on pull request #10740:
URL: https://github.com/apache/kafka/pull/10740#issuecomment-867189274


   I rebased and created a brand new branch with the new changes. A new PR has 
been created
   
   https://github.com/apache/kafka/pull/10924/
   
   Please take a look when you have a moment.
   
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] izzyacademy commented on pull request #10924: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

2021-06-23 Thread GitBox


izzyacademy commented on pull request #10924:
URL: https://github.com/apache/kafka/pull/10924#issuecomment-867188534


   @ableegoldman @mjsax @showuon @cadonna 
   
   When you have a moment, please take a look. 
   
   This is an update based on the feedback from PR #10740


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] izzyacademy opened a new pull request #10924: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

2021-06-23 Thread GitBox


izzyacademy opened a new pull request #10924:
URL: https://github.com/apache/kafka/pull/10924


   KIP-633 New APIs for Controlling Grace Period for Windowed Operations
   
   - Added API changes by KIP-633 for JoinWindows, SessionWindows, TimeWindows 
and SlidingWindows
   - Renamed Windows.DEFAULT_GRACE_PERIOD_MS to 
DEPRECATED_OLD_24_HR_GRACE_PERIOD
   - Added new constant Windows.NO_GRACE_PERIOD to avoid magic constants when 0 
is specified as grace Period
   - Added preliminary Java unit test cases for new API methods
   - Replaced Deprecated calls with equivalent in Examples
   - Replaced Deprecated API calls in Scala tests with updated API method calls
   - Added Deprecation suppression in Tests for derecated API method calls in 
Java and Scala Tests
   
   modified:   
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
   modified:   
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
   modified:   
streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
   modified:   
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
   modified:   
streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
   
   modified:   
streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
   modified:   
streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
   modified:   
streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
   
   modified:   
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
   modified:   
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
   modified:   
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
   modified:   
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
   
   modified:   
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
   
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
   modified:   
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
   modified:   

[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-23 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657483179



##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+val offsets = Map(
+  //testTopicPartition0 -> there is no offset information for an asssigned 
topic partition
+  testTopicPartition1 -> new OffsetAndMetadata(100), // regular 
information for a assigned partition
+  testTopicPartition2 -> null, //there is a null value for an asssigned 
topic partition
+  // testTopicPartition3 ->  there is no offset information for an 
unasssigned topic partition
+  testTopicPartition4 -> new OffsetAndMetadata(100), // regular 
information for a unassigned partition
+  testTopicPartition5 -> null, //there is a null value for an unasssigned 
topic partition
+).asJava
+
+val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+val endOffsets = Map(
+  testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+)
+val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2 )
+val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) 
=> assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+def describeGroupsResult(groupState: ConsumerGroupState): 
DescribeConsumerGroupsResult = {
+  val member1 = new MemberDescription("member1", Optional.of("instance1"), 
"client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+  val description = new ConsumerGroupDescription(group,
+true,
+Collections.singleton(member1),
+classOf[RangeAssignor].getName,
+groupState,
+new Node(1, "localhost", 9092))
+  new DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(description)))
+}
+
+def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] 
= {
+  val expectedOffsets = endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+  ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+map.keySet.asScala == expectedOffsets.keySet && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+  }
+}

Review comment:
   Yes, that's a good idea. Going to unifiy both cases on a single argument 
matcher




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-23 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657481603



##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+val offsets = Map(
+  //testTopicPartition0 -> there is no offset information for an asssigned 
topic partition
+  testTopicPartition1 -> new OffsetAndMetadata(100), // regular 
information for a assigned partition
+  testTopicPartition2 -> null, //there is a null value for an asssigned 
topic partition
+  // testTopicPartition3 ->  there is no offset information for an 
unasssigned topic partition
+  testTopicPartition4 -> new OffsetAndMetadata(100), // regular 
information for a unassigned partition
+  testTopicPartition5 -> null, //there is a null value for an unasssigned 
topic partition
+).asJava
+
+val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+val endOffsets = Map(
+  testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+)
+val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2 )
+val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) 
=> assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+def describeGroupsResult(groupState: ConsumerGroupState): 
DescribeConsumerGroupsResult = {
+  val member1 = new MemberDescription("member1", Optional.of("instance1"), 
"client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+  val description = new ConsumerGroupDescription(group,
+true,
+Collections.singleton(member1),
+classOf[RangeAssignor].getName,
+groupState,
+new Node(1, "localhost", 9092))
+  new DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(description)))
+}
+
+def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] 
= {
+  val expectedOffsets = endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+  ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+map.keySet.asScala == expectedOffsets.keySet && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+  }
+}
+def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, 
OffsetSpec] = {
+  val expectedOffsets = offsets.asScala.filter{ case (tp, _) => 
unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+  ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+map.keySet.asScala == expectedOffsets.keySet && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+  }
+}
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+  .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+  .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets))
+doAnswer(_ => new 
ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics,
 any())
+doAnswer(_ => new 
ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics,
 any())

Review comment:
   In fact that was my first approach, but defining a 
when(...).thenReturn(...) for two differentes ArgMatcher  for the same method 
throw an Null Pointer Error at Mockito. Used the doAnswer as a workaround. 
   But taking into account tha previous comment, I am working on a single 
when(...).thenReturn(...), so it wouldn't be a problem anymore




-- 
This is an automated message from the Apache Git 

[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-23 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657479764



##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+val offsets = Map(
+  //testTopicPartition0 -> there is no offset information for an asssigned 
topic partition
+  testTopicPartition1 -> new OffsetAndMetadata(100), // regular 
information for a assigned partition
+  testTopicPartition2 -> null, //there is a null value for an asssigned 
topic partition
+  // testTopicPartition3 ->  there is no offset information for an 
unasssigned topic partition
+  testTopicPartition4 -> new OffsetAndMetadata(100), // regular 
information for a unassigned partition
+  testTopicPartition5 -> null, //there is a null value for an unasssigned 
topic partition
+).asJava
+
+val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+val endOffsets = Map(
+  testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+)
+val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2 )
+val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) 
=> assignedTopicPartitions.contains(tp) }.toMap.keySet

Review comment:
   You are absolutely right, miss testTopicPartition3. Going to set it 
explicitly  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-23 Thread GitBox


skaundinya15 commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r657473433



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
##
@@ -29,9 +32,9 @@
  */
 @InterfaceStability.Evolving
 public class DeleteConsumerGroupsResult {
-private final Map> futures;
+private final Map> futures;
 
-DeleteConsumerGroupsResult(final Map> futures) {
+DeleteConsumerGroupsResult(Map> 
futures) {

Review comment:
   @dajac @mimaison It looks like the reason we have to do 
`KafkaFutureImpl` here is because in the `SimpleAdminApiFuture` class, we have 
a class variable called `futures` which is of type ` private final Map>`. As a result, we return `KafkaFutureImpl` for the `all()`, 
`get()` and other methods. We could change all of this to use `KafkaFuture` 
instead, but this would require us to change the following methods from 
`protected` to `public`:
   
   ```java
   /**
* If not already completed, sets the value returned by get() and 
related methods to the given
* value.
*/
   protected abstract boolean complete(T newValue);
   
   /**
* If not already completed, causes invocations of get() and related 
methods to throw the given
* exception.
*/
   protected abstract boolean completeExceptionally(Throwable newException);
   ```
   
   The question here would be should we change these `protected` methods to be 
`public` so we can maintain just returning a type of `KafkaFuture` or if it's 
okay to return `KafkaFutureImpl`. I think it could be worth changing 
`KafkaFuture` methods from `protected` to `public` so that we can maintain the 
invariant of always returning the type of `KafkaFuture`, but not sure if that 
would require getting some consensus on the mailing list about this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on a change in pull request #10915: Enable connecting VS Code remote debugger

2021-06-23 Thread GitBox


omkreddy commented on a change in pull request #10915:
URL: https://github.com/apache/kafka/pull/10915#discussion_r657466048



##
File path: tests/README.md
##
@@ -51,6 +51,40 @@ bash tests/docker/ducker-ak up -j 'openjdk:11'; 
tests/docker/run_tests.sh
 ```
 REBUILD="t" bash tests/docker/run_tests.sh
 ```
+* Debug tests in VS Code:
+  - Run test with `--debug` flag (can be before or after file name):

Review comment:
   looks like this conflicts with existing usage running tests with debug 
logs (check above)
   `_DUCKTAPE_OPTIONS="--debug" bash tests/docker/run_tests.sh | tee 
debug_logs.txt`
   
   Can we use different option to enable debugger ex: --debugpy (or) 
--enable-debugger 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12559) Add a top-level Streams config for bounding off-heap memory

2021-06-23 Thread Martin Sundeqvist (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368464#comment-17368464
 ] 

Martin Sundeqvist commented on KAFKA-12559:
---

[~ableegoldman] [~simplyamuthan] Hi, I can see it's been a while since the last 
update. I'm new at this, and would like to take a crack at it, unless things 
are already moving forward?

> Add a top-level Streams config for bounding off-heap memory
> ---
>
> Key: KAFKA-12559
> URL: https://issues.apache.org/jira/browse/KAFKA-12559
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: amuthan Ganeshan
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> At the moment we provide an example of how to bound the memory usage of 
> rocskdb in the [Memory 
> Management|https://kafka.apache.org/27/documentation/streams/developer-guide/memory-mgmt.html#rocksdb]
>  section of the docs. This requires implementing a custom RocksDBConfigSetter 
> class and setting a number of rocksdb options for relatively advanced 
> concepts and configurations. It seems a fair number of users either fail to 
> find this or consider it to be for more advanced use cases/users. But RocksDB 
> can eat up a lot of off-heap memory and it's not uncommon for users to come 
> across a {{RocksDBException: Cannot allocate memory}}
> It would probably be a much better user experience if we implemented this 
> memory bound out-of-the-box and just gave users a top-level StreamsConfig to 
> tune the off-heap memory given to rocksdb, like we have for on-heap cache 
> memory with cache.max.bytes.buffering. More advanced users can continue to 
> fine-tune their memory bounding and apply other configs with a custom config 
> setter, while new or more casual users can cap on the off-heap memory without 
> getting their hands dirty with rocksdb.
> I would propose to add the following top-level config:
> rocksdb.max.bytes.off.heap: medium priority, default to -1 (unbounded), valid 
> values are [0, inf]
> I'd also want to consider adding a second, lower priority top-level config to 
> give users a knob for adjusting how much of that total off-heap memory goes 
> to the block cache + index/filter blocks, and how much of it is afforded to 
> the write buffers. I'm struggling to come up with a good name for this 
> config, but it would be something like
> rocksdb.memtable.to.block.cache.off.heap.memory.ratio: low priority, default 
> to 0.5, valid values are [0, 1]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] izzyacademy commented on pull request #10740: Kafka 8613 kip 633 drop default grace period streams

2021-06-23 Thread GitBox


izzyacademy commented on pull request #10740:
URL: https://github.com/apache/kafka/pull/10740#issuecomment-867140579


   > Hey @izzyacademy , I'm taking a look but just a quick heads up: the build 
failed to compile, looks like the problem is some of the demo classes use one 
of the now-deprecated methods and need to be migrated to the new API: 
`TemperatureDemo`, `PageViewTypedDemo`, and `PageViewUntypedDemo`
   
   This has just been addressed. Thanks @ableegoldman for your recommendations 
yesterday.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #10923: KAFKA-12976: Remove UNSUPPORTED_VERSION error from delete and describe topics calls

2021-06-23 Thread GitBox


jolshan opened a new pull request #10923:
URL: https://github.com/apache/kafka/pull/10923


   Removed the condition to throw the error. Now we only throw when we didn't 
find the topic ID. 
   Updated the test for IBP < 2.8 that tries to delete topics using ID.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-23 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657421072



##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+val offsets = Map(
+  //testTopicPartition0 -> there is no offset information for an asssigned 
topic partition
+  testTopicPartition1 -> new OffsetAndMetadata(100), // regular 
information for a assigned partition
+  testTopicPartition2 -> null, //there is a null value for an asssigned 
topic partition
+  // testTopicPartition3 ->  there is no offset information for an 
unasssigned topic partition
+  testTopicPartition4 -> new OffsetAndMetadata(100), // regular 
information for a unassigned partition
+  testTopicPartition5 -> null, //there is a null value for an unasssigned 
topic partition
+).asJava

Review comment:
   Perfect, makes sense. Would do that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10918: KAFKA-12756: Update ZooKeeper to v3.6.3

2021-06-23 Thread GitBox


rondagostino commented on pull request #10918:
URL: https://github.com/apache/kafka/pull/10918#issuecomment-86780


   System test results before making the jar test dependency: 
https://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-06-23--001.1624469608--rondagostino--KAFKA-12756--bbc88d7e8/report.html
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.8.1
   session_id:   2021-06-23--001
   run time: 175 minutes 4.253 seconds
   tests run:867
   passed:   659
   failed:   11
   ignored:  197
   

   ```
   
   - `kafkatest.tests.core.zookeeper_authorizer_test` failed for 
`metadata_quorum:REMOTE_KRAFT` -- unrelated to this change snce ZooKeeper isn't 
even involved.
   - `kafkatest.tests.core.upgrade_test` failed all 3 tests from version 2.8.0; 
this was due to 2.8.0 not being part of the Vagrant image.  I added a commit to 
this PR to fix that and confirmed that the non-compression flavor of that test 
for 2.8.0 passed locally with Vagrant (this PR already had the change to add it 
to the Docker image for Docker-based system tests).
   - `kafkatest.sanity_checks.test_kafka_version` failed due to `kafka-topics 
--zookeeper` being used against the current version rather than the 0.8.2 
version; added a commit to this PR to swap the node versions so the correct 
broker will get the request and confirmed locally that the test now passes.
   - `kafkatest.tests.streams.streams_eos_test` had 4 test failures.
   - `kafkatest.tests.streams.streams_upgrade_test` had 2 test failures
   
   I believe the steams failures are unrelated because I downgraded to 
Zookeeper v3.5.9 locally and ran 
`kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_version_probing_upgrade`
 -- it still failed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] g1geordie commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

2021-06-23 Thread GitBox


g1geordie commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-867105129


   @bbejeck  Thanks for the review.  
   You are right 
   Please help me review again.

   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata

2021-06-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368353#comment-17368353
 ] 

Matthias J. Sax commented on KAFKA-12984:
-

[~ableegoldman] – is KS affected by this issue? Even if we use our own 
assignor, it seems that the issue with regard to "subscription state" could 
also affect KS?

> Cooperative sticky assignor can get stuck with invalid SubscriptionState 
> input metadata
> ---
>
> Key: KAFKA-12984
> URL: https://issues.apache.org/jira/browse/KAFKA-12984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0, 2.8.1
>
>
> Some users have reported seeing their consumer group become stuck in the 
> CompletingRebalance phase when using the cooperative-sticky assignor. Based 
> on the request metadata we were able to deduce that multiple consumers were 
> reporting the same partition(s) in their "ownedPartitions" field of the 
> consumer protocol. Since this is an invalid state, the input causes the 
> cooperative-sticky assignor to detect that something is wrong and throw an 
> IllegalStateException. If the consumer application is set up to simply retry, 
> this will cause the group to appear to hang in the rebalance state.
> The "ownedPartitions" field is encoded based on the ConsumerCoordinator's 
> SubscriptionState, which was assumed to always be up to date. However there 
> may be cases where the consumer has dropped out of the group but fails to 
> clear the SubscriptionState, allowing it to report some partitions as owned 
> that have since been reassigned to another member.
> We should (a) fix the sticky assignment algorithm to resolve cases of 
> improper input conditions by invalidating the "ownedPartitions" in cases of 
> double ownership, and (b) shore up the ConsumerCoordinator logic to better 
> handle rejoining the group and keeping its internal state consistent. See 
> KAFKA-12983 for more details on (b)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata

2021-06-23 Thread Michael Bingham (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368354#comment-17368354
 ] 

Michael Bingham commented on KAFKA-12984:
-

Does this issue potentially apply to {{StreamsPartitionAssignor}} as well?

> Cooperative sticky assignor can get stuck with invalid SubscriptionState 
> input metadata
> ---
>
> Key: KAFKA-12984
> URL: https://issues.apache.org/jira/browse/KAFKA-12984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0, 2.8.1
>
>
> Some users have reported seeing their consumer group become stuck in the 
> CompletingRebalance phase when using the cooperative-sticky assignor. Based 
> on the request metadata we were able to deduce that multiple consumers were 
> reporting the same partition(s) in their "ownedPartitions" field of the 
> consumer protocol. Since this is an invalid state, the input causes the 
> cooperative-sticky assignor to detect that something is wrong and throw an 
> IllegalStateException. If the consumer application is set up to simply retry, 
> this will cause the group to appear to hang in the rebalance state.
> The "ownedPartitions" field is encoded based on the ConsumerCoordinator's 
> SubscriptionState, which was assumed to always be up to date. However there 
> may be cases where the consumer has dropped out of the group but fails to 
> clear the SubscriptionState, allowing it to report some partitions as owned 
> that have since been reassigned to another member.
> We should (a) fix the sticky assignment algorithm to resolve cases of 
> improper input conditions by invalidating the "ownedPartitions" in cases of 
> double ownership, and (b) shore up the ConsumerCoordinator logic to better 
> handle rejoining the group and keeping its internal state consistent. See 
> KAFKA-12983 for more details on (b)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata

2021-06-23 Thread Michael Bingham (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Bingham updated KAFKA-12984:

Comment: was deleted

(was: Does this issue potentially apply to {{StreamsPartitionAssignor}} as 
well?)

> Cooperative sticky assignor can get stuck with invalid SubscriptionState 
> input metadata
> ---
>
> Key: KAFKA-12984
> URL: https://issues.apache.org/jira/browse/KAFKA-12984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0, 2.8.1
>
>
> Some users have reported seeing their consumer group become stuck in the 
> CompletingRebalance phase when using the cooperative-sticky assignor. Based 
> on the request metadata we were able to deduce that multiple consumers were 
> reporting the same partition(s) in their "ownedPartitions" field of the 
> consumer protocol. Since this is an invalid state, the input causes the 
> cooperative-sticky assignor to detect that something is wrong and throw an 
> IllegalStateException. If the consumer application is set up to simply retry, 
> this will cause the group to appear to hang in the rebalance state.
> The "ownedPartitions" field is encoded based on the ConsumerCoordinator's 
> SubscriptionState, which was assumed to always be up to date. However there 
> may be cases where the consumer has dropped out of the group but fails to 
> clear the SubscriptionState, allowing it to report some partitions as owned 
> that have since been reassigned to another member.
> We should (a) fix the sticky assignment algorithm to resolve cases of 
> improper input conditions by invalidating the "ownedPartitions" in cases of 
> double ownership, and (b) shore up the ConsumerCoordinator logic to better 
> handle rejoining the group and keeping its internal state consistent. See 
> KAFKA-12983 for more details on (b)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #10892: [WIP] New Admin API for deleteTopics

2021-06-23 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r657306351



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/TopicCollection.java
##
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Collection;
+
+public class TopicCollection {
+
+private Collection topicNames;
+private Collection topicIds;
+private final TopicAttribute attribute;
+
+public enum TopicAttribute {
+TOPIC_NAME, TOPIC_ID

Review comment:
   I guess this prevents the unsupported operation exception in the methods 
themselves though. We would need to convert the TopicCollection to 
TopicNameCollection before they could access the collection of names though, so 
I'd suspect we'd still need to handle there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12986) Throttled Replicas validator should validate that the proposed value is parseable

2021-06-23 Thread David Mao (Jira)
David Mao created KAFKA-12986:
-

 Summary: Throttled Replicas validator should validate that the 
proposed value is parseable
 Key: KAFKA-12986
 URL: https://issues.apache.org/jira/browse/KAFKA-12986
 Project: Kafka
  Issue Type: Bug
  Components: admin, core
Reporter: David Mao


The ThrottledReplicaListValidator currently allows a string like 

leader.replication.throttled.replicas=,0:1 to be set

which is unparseable by the TopicConfig callback handler.

For robustness, the validator should also validate that the property can be 
parsed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] nnordrum closed pull request #2664: MINOR: Added additional -start/-stop files for consistency

2021-06-23 Thread GitBox


nnordrum closed pull request #2664:
URL: https://github.com/apache/kafka/pull/2664


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nnordrum closed pull request #3518: WIP: SimpleRegexAclAuthorizer

2021-06-23 Thread GitBox


nnordrum closed pull request #3518:
URL: https://github.com/apache/kafka/pull/3518


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-06-23 Thread GitBox


guozhangwang commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r657231704



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1200,7 +1179,7 @@ private long getCacheSizePerThread(final int 
numStreamThreads) {
 if (numStreamThreads == 0) {
 return totalCacheSize;
 }
-return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+return totalCacheSize / (numStreamThreads + 
(topologyMetadata.hasGlobalTopology() ? 1 : 0));

Review comment:
   If we have more than one named topologies that have global stores, 
should the global topologies share one cache sub-space?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -345,8 +343,17 @@ private SinkNodeFactory(final String name,
 }
 }
 
+public void setTopologyName(final String namedTopology) {

Review comment:
   Could we move the logic of 
`namedTopology.setTopologyName(topologyName);` into the constructor of 
`NamedTopologyStreamsBuilder(final String topologyName)` itself, and then call 
the constructor of `NamedTopology` directly, so that we can still have a final 
field in line 138 above? 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -131,15 +135,9 @@
 private StreamsConfig config = null;
 
 // The name of the topology this builder belongs to, or null if none
-private final String namedTopology;
-
-public InternalTopologyBuilder() {
-this.namedTopology = null;
-}
+private String namedTopology;

Review comment:
   nit: maybe better be `topologyName`?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -364,6 +371,10 @@ public synchronized final StreamsConfig getStreamsConfig() 
{
 return config;
 }
 
+public String namedTopology() {

Review comment:
   Ditto, I feel its better to let `topologyName` return a String.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -1065,14 +1086,22 @@ private void buildProcessorNode(final Map> pro
 return Collections.unmodifiableMap(globalStateStores);
 }
 
-public Set allStateStoreName() {
+public Set allStateStoreNames() {

Review comment:
   Nice find.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -1216,39 +1245,26 @@ private void setRegexMatchedTopicToStateStore() {
 }
 }
 
-public synchronized Pattern earliestResetTopicsPattern() {
-return resetTopicsPattern(earliestResetTopics, earliestResetPatterns);
+public boolean hasOffsetResetOverrides() {
+return !(earliestResetTopics.isEmpty() && 
earliestResetPatterns.isEmpty()
+&& latestResetTopics.isEmpty() && latestResetPatterns.isEmpty());
 }
 
-public synchronized Pattern latestResetTopicsPattern() {
-return resetTopicsPattern(latestResetTopics, latestResetPatterns);
-}
-
-private Pattern resetTopicsPattern(final Set resetTopics,
-   final Set resetPatterns) {
-final List topics = 
maybeDecorateInternalSourceTopics(resetTopics);
-
-return buildPattern(topics, resetPatterns);
-}
-
-private static Pattern buildPattern(final Collection sourceTopics,
-final Collection 
sourcePatterns) {
-final StringBuilder builder = new StringBuilder();
-
-for (final String topic : sourceTopics) {
-builder.append(topic).append("|");
-}
-
-for (final Pattern sourcePattern : sourcePatterns) {
-builder.append(sourcePattern.pattern()).append("|");
-}
-
-if (builder.length() > 0) {
-builder.setLength(builder.length() - 1);
-return Pattern.compile(builder.toString());
+public OffsetResetStrategy offsetResetStrategy(final String topic) {
+if 
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
+earliestResetPatterns.stream().anyMatch(p -> 
p.matcher(topic).matches())) {
+return EARLIEST;
+} else if 
(maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) ||
+latestResetPatterns.stream().anyMatch(p -> 
p.matcher(topic).matches())) {
+return LATEST;
+} else if 
(maybeDecorateInternalSourceTopics(sourceTopicNames).contains(topic)

Review comment:
   Could you elaborate a bit on the `NONE` case? Not sure I fully follow 
here.

##
File path: 

[GitHub] [kafka] feyman2016 commented on pull request #10593: KAFKA-10800 Enhance the test for validation when the state machine creates a snapshot

2021-06-23 Thread GitBox


feyman2016 commented on pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#issuecomment-866954962


   @jsancio Thanks!
   @mumrah Hi, could you kindly help to review and merge? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10900: KAFKA-12967; KRaft broker should forward DescribeQuorum to controller

2021-06-23 Thread GitBox


mumrah commented on a change in pull request #10900:
URL: https://github.com/apache/kafka/pull/10900#discussion_r657234456



##
File path: core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.server.IntegrationTestUtils.connectAndReceive
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
+import org.apache.kafka.common.requests.{DescribeQuorumRequest, 
DescribeQuorumResponse}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.extension.ExtendWith
+
+import scala.jdk.CollectionConverters._
+
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT)
+class DescribeQuorumRequestTest(cluster: ClusterInstance) {

Review comment:
   Is it worth adding a test for ZK mode here? Just to see that it fails

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -217,6 +227,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 case ApiKeys.DESCRIBE_TRANSACTIONS => 
handleDescribeTransactionsRequest(request)
 case ApiKeys.LIST_TRANSACTIONS => 
handleListTransactionsRequest(request)
 case ApiKeys.ALLOCATE_PRODUCER_IDS => 
maybeForwardToController(request, handleAllocateProducerIdsRequest)
+case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)

Review comment:
   Hm, so we are _only_ allowing forwarding with this 
`forwardToControllerOrFail` since this RPC did not exist before? Are there 
other recently added controller-only RPCs that need this treatment?

##
File path: core/src/test/java/kafka/test/annotation/Type.java
##
@@ -28,7 +28,7 @@
  * The type of cluster config being requested. Used by {@link 
kafka.test.ClusterConfig} and the test annotations.
  */
 public enum Type {
-RAFT {
+KRAFT {

Review comment:
   Seems fine to change this here. I've been meaning to do this anyways.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 edited a comment on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-06-23 Thread GitBox


vamossagar12 edited a comment on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-866823035


   @guozhangwang / @cadonna  I made some tweaks to the code and also started 
testing with 1M keys. Now I see differences in terms of throughput for both 
range and putAll queries around .3 ops/s and .15 ops/s respectively:
   
   Here is the compaision:
   
   ```
   testPersistentRangeQueryPerformance original
   
   Benchmark Mode  
Cnt  Score   Error  Units
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance  thrpt   
15  1.131 ? 0.028  ops/s
   
   testPersistentPutAllPerformance original
   
   Benchmark Mode  Cnt  
Score   Error  Units
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance  thrpt   15  
0.919 ? 0.037  ops/s
   
   
   testPersistentRangeQueryPerformance bytebuffer
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance  thrpt   
15  1.442 ? 0.038  ops/s
   
   
   Benchmark Mode  Cnt  
Score   Error  Units
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance  thrpt   15  
1.065 ? 0.041  ops/s
   ```
   
   I needed to add ByteOrder after creating DirectByteBuffer object. And for 
putAll, i needed to flip before calling `put` to batch.
   
   Next step for me would be to create a kafka streams app and test throughput. 
I will post them here once i have those as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-06-23 Thread GitBox


wcarlson5 commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r657217991



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {

Review comment:
   We would still need to check. Because the thread count could still be 
dropped to 0 with `removeThread()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10854: KAFKA-12717: Remove internal Connect converter properties (KIP-738)

2021-06-23 Thread GitBox


C0urante commented on a change in pull request #10854:
URL: https://github.com/apache/kafka/pull/10854#discussion_r657202256



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##
@@ -361,55 +324,23 @@ protected static ConfigDef baseConfigDef() {
 .withClientSslSupport();
 }
 
-private void logInternalConverterDeprecationWarnings(Map 
props) {
-String[] deprecatedConfigs = new String[] {
-INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
-INTERNAL_VALUE_CONVERTER_CLASS_CONFIG
-};
-for (String config : deprecatedConfigs) {
-if (props.containsKey(config)) {
-Class internalConverterClass = getClass(config);
-logDeprecatedProperty(config, 
internalConverterClass.getCanonicalName(), 
INTERNAL_CONVERTER_DEFAULT.getCanonicalName(), null);
-if (internalConverterClass.equals(INTERNAL_CONVERTER_DEFAULT)) 
{
-// log the properties for this converter ...
-for (Map.Entry propEntry : 
originalsWithPrefix(config + ".").entrySet()) {
-String prop = propEntry.getKey();
-String propValue = propEntry.getValue().toString();
-String defaultValue = 
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG.equals(prop) ? "false" : null;
-logDeprecatedProperty(config + "." + prop, propValue, 
defaultValue, config);
-}
-}
+private void logInternalConverterRemovalWarnings(Map 
props) {
+List removedProperties = new ArrayList<>();
+for (String property : Arrays.asList("internal.key.converter", 
"internal.value.converter")) {
+if (props.containsKey(property)) {
+removedProperties.add(property);
 }
+removedProperties.addAll(originalsWithPrefix(property + 
".").keySet());
 }
-}
-
-private void logDeprecatedProperty(String propName, String propValue, 
String defaultValue, String prefix) {
-String prefixNotice = prefix != null
-? " (along with all configuration for '" + prefix + "')"
-: "";
-if (defaultValue != null && defaultValue.equalsIgnoreCase(propValue)) {
-log.info(
-"Worker configuration property '{}'{} is deprecated and may be 
removed in an upcoming release. "
-+ "The specified value '{}' matches the default, so this 
property can be safely removed from the worker configuration.",
-propName,
-prefixNotice,
-propValue
-);
-} else if (defaultValue != null) {
+if (!removedProperties.isEmpty()) {
 log.warn(
-"Worker configuration property '{}'{} is deprecated and may be 
removed in an upcoming release. "
-+ "The specified value '{}' does NOT match the default and 
recommended value '{}'.",
-propName,
-prefixNotice,
-propValue,
-defaultValue
-);
-} else {
-log.warn(
-"Worker configuration property '{}'{} is deprecated and may be 
removed in an upcoming release.",
-propName,
-prefixNotice
-);
+"The worker has been configured with one or more internal 
converter properties ({}). "
++ "Support for these properties was dropped in 
version 3.0, and specifying them will "

Review comment:
   Sure, done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10854: KAFKA-12717: Remove internal Connect converter properties (KIP-738)

2021-06-23 Thread GitBox


C0urante commented on a change in pull request #10854:
URL: https://github.com/apache/kafka/pull/10854#discussion_r657197034



##
File path: docs/upgrade.html
##
@@ -76,6 +76,12 @@ Notable changes in 3
 understood by brokers or version 2.5 or higher, so you must upgrade 
your kafka cluster to get the stronger semantics. Otherwise, you can just pass
 in new ConsumerGroupMetadata(consumerGroupId) to work 
with older brokers. See https://cwiki.apache.org/confluence/x/zJONCg;>KIP-732 for more 
details.
 
+
+The Connect internal.key.converter and 
internal.value.converter properties have been completely https://cwiki.apache.org/confluence/x/2YDOCg;>removed.
+Workers are now hardcoded to use the JSON converter with 
schemas.enable set to false. If your cluster has been 
using
+a different internal key or value converter, you can follow the 
migration steps outlined in https://cwiki.apache.org/confluence/x/2YDOCg;>KIP-738
+to safely upgrade your Connect cluster to 3.0.

Review comment:
   Good call, thanks  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

2021-06-23 Thread GitBox


bbejeck commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-866910927


   @g1geordie apologies for letting this go for so long.  I'm looking now and 
let's see if we can't get this in over the next few days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #10854: KAFKA-12717: Remove internal Connect converter properties (KIP-738)

2021-06-23 Thread GitBox


rhauch commented on pull request #10854:
URL: https://github.com/apache/kafka/pull/10854#issuecomment-866908588


   For the record, mostly green builds with only some builds having only 
failures unrelated to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

2021-06-23 Thread GitBox


dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657158355



##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+val offsets = Map(
+  //testTopicPartition0 -> there is no offset information for an asssigned 
topic partition
+  testTopicPartition1 -> new OffsetAndMetadata(100), // regular 
information for a assigned partition
+  testTopicPartition2 -> null, //there is a null value for an asssigned 
topic partition
+  // testTopicPartition3 ->  there is no offset information for an 
unasssigned topic partition
+  testTopicPartition4 -> new OffsetAndMetadata(100), // regular 
information for a unassigned partition
+  testTopicPartition5 -> null, //there is a null value for an unasssigned 
topic partition
+).asJava

Review comment:
   I would remove all the comments and only put one before `offsets` which 
explains that certain partitions are not present and certain have `null` or 
something along these lines.

##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+val offsets = Map(
+  //testTopicPartition0 -> there is no offset information for an asssigned 
topic partition
+  testTopicPartition1 -> new OffsetAndMetadata(100), // regular 
information for a assigned partition
+  testTopicPartition2 -> null, //there is a null value for an asssigned 
topic partition
+  // testTopicPartition3 ->  there is no offset information for an 
unasssigned topic partition
+  testTopicPartition4 -> new OffsetAndMetadata(100), // regular 
information for a unassigned partition
+  testTopicPartition5 -> null, //there is a null value for an unasssigned 
topic partition
+).asJava
+
+val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+val endOffsets = Map(
+  testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+)
+val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2 )
+val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) 
=> assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+def describeGroupsResult(groupState: ConsumerGroupState): 
DescribeConsumerGroupsResult = {

Review comment:
   `describeGroupsResult` is used only once in the test. It seems that we 
could simply declare a variable which contains the result that we want to 
return.

##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+val 

[GitHub] [kafka] rhauch commented on a change in pull request #10854: KAFKA-12717: Remove internal Connect converter properties (KIP-738)

2021-06-23 Thread GitBox


rhauch commented on a change in pull request #10854:
URL: https://github.com/apache/kafka/pull/10854#discussion_r657156175



##
File path: docs/upgrade.html
##
@@ -76,6 +76,12 @@ Notable changes in 3
 understood by brokers or version 2.5 or higher, so you must upgrade 
your kafka cluster to get the stronger semantics. Otherwise, you can just pass
 in new ConsumerGroupMetadata(consumerGroupId) to work 
with older brokers. See https://cwiki.apache.org/confluence/x/zJONCg;>KIP-732 for more 
details.
 
+
+The Connect internal.key.converter and 
internal.value.converter properties have been completely https://cwiki.apache.org/confluence/x/2YDOCg;>removed.
+Workers are now hardcoded to use the JSON converter with 
schemas.enable set to false. If your cluster has been 
using
+a different internal key or value converter, you can follow the 
migration steps outlined in https://cwiki.apache.org/confluence/x/2YDOCg;>KIP-738
+to safely upgrade your Connect cluster to 3.0.

Review comment:
   Might be good to mention that these have been deprecated since AK 2.0. 
This is similar to other items earlier in this list that mention the earlier 
version when defaults changed. Maybe something like:
   ```suggestion
   The Connect internal.key.converter and 
internal.value.converter properties have been completely https://cwiki.apache.org/confluence/x/2YDOCg;>removed.
   The use of these Connect worker properties has been deprecated since 
version 2.0.0. 
   Workers are now hardcoded to use the JSON converter with 
schemas.enable set to false. If your cluster has been 
using
   a different internal key or value converter, you can follow the 
migration steps outlined in https://cwiki.apache.org/confluence/x/2YDOCg;>KIP-738
   to safely upgrade your Connect cluster to 3.0.
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##
@@ -361,55 +324,23 @@ protected static ConfigDef baseConfigDef() {
 .withClientSslSupport();
 }
 
-private void logInternalConverterDeprecationWarnings(Map 
props) {
-String[] deprecatedConfigs = new String[] {
-INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
-INTERNAL_VALUE_CONVERTER_CLASS_CONFIG
-};
-for (String config : deprecatedConfigs) {
-if (props.containsKey(config)) {
-Class internalConverterClass = getClass(config);
-logDeprecatedProperty(config, 
internalConverterClass.getCanonicalName(), 
INTERNAL_CONVERTER_DEFAULT.getCanonicalName(), null);
-if (internalConverterClass.equals(INTERNAL_CONVERTER_DEFAULT)) 
{
-// log the properties for this converter ...
-for (Map.Entry propEntry : 
originalsWithPrefix(config + ".").entrySet()) {
-String prop = propEntry.getKey();
-String propValue = propEntry.getValue().toString();
-String defaultValue = 
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG.equals(prop) ? "false" : null;
-logDeprecatedProperty(config + "." + prop, propValue, 
defaultValue, config);
-}
-}
+private void logInternalConverterRemovalWarnings(Map 
props) {
+List removedProperties = new ArrayList<>();
+for (String property : Arrays.asList("internal.key.converter", 
"internal.value.converter")) {
+if (props.containsKey(property)) {
+removedProperties.add(property);
 }
+removedProperties.addAll(originalsWithPrefix(property + 
".").keySet());
 }
-}
-
-private void logDeprecatedProperty(String propName, String propValue, 
String defaultValue, String prefix) {
-String prefixNotice = prefix != null
-? " (along with all configuration for '" + prefix + "')"
-: "";
-if (defaultValue != null && defaultValue.equalsIgnoreCase(propValue)) {
-log.info(
-"Worker configuration property '{}'{} is deprecated and may be 
removed in an upcoming release. "
-+ "The specified value '{}' matches the default, so this 
property can be safely removed from the worker configuration.",
-propName,
-prefixNotice,
-propValue
-);
-} else if (defaultValue != null) {
+if (!removedProperties.isEmpty()) {
 log.warn(
-"Worker configuration property '{}'{} is deprecated and may be 
removed in an upcoming release. "
-+ "The specified value '{}' does NOT match the default and 
recommended value '{}'.",
-propName,
-prefixNotice,
-propValue,
-defaultValue
-);
-} else {
-log.warn(

[jira] [Resolved] (KAFKA-12482) Remove deprecated rest.host.name and rest.port Connect worker configs

2021-06-23 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-12482.
---
  Reviewer: Randall Hauch
Resolution: Fixed

Merged to `trunk`.

> Remove deprecated rest.host.name and rest.port Connect worker configs
> -
>
> Key: KAFKA-12482
> URL: https://issues.apache.org/jira/browse/KAFKA-12482
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Assignee: Kalpesh Patel
>Priority: Critical
> Fix For: 3.0.0
>
>
> The following Connect worker configuration properties were deprecated and 
> should be removed in 3.0.0:
>  * {{rest.host.name}} (deprecated in 
> [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface])
>  * {{rest.port}} (deprecated in 
> [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface])
> See KAFKA-12717 for removing the internal converter configurations:
>  * {{internal.key.converter}} (deprecated in 
> [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig])
>  * {{internal.value.converter}} (deprecated in 
> [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig])



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch merged pull request #10841: KAFKA-12482 Remove deprecated rest.host.name and rest.port configs

2021-06-23 Thread GitBox


rhauch merged pull request #10841:
URL: https://github.com/apache/kafka/pull/10841


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-23 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r657148983



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java
##
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+/**
+ * An immutable restart plan.
+ */
+public class RestartPlan {
+
+private final RestartRequest request;
+private final ConnectorStateInfo stateInfo;
+private final Collection idsToRestart;
+
+/**
+ * Create a new request to restart a connector and optionally its tasks.
+ *
+ * @param request  the restart request; may not be null
+ * @param restartStateInfo the current state info for the connector; may 
not be null
+ */
+public RestartPlan(RestartRequest request, ConnectorStateInfo 
restartStateInfo) {
+this.request = Objects.requireNonNull(request, "RestartRequest name 
may not be null");
+this.stateInfo = Objects.requireNonNull(restartStateInfo, 
"ConnectorStateInfo name may not be null");
+// Collect the task IDs to stop and restart (may be none)
+idsToRestart = Collections.unmodifiableList(
+stateInfo.tasks()
+.stream()
+.filter(this::isRestarting)
+.map(taskState -> new 
ConnectorTaskId(request.connectorName(), taskState.id()))
+.collect(Collectors.toList())
+);
+}
+
+/**
+ * Get the connector name.
+ *
+ * @return the name of the connector; never null
+ */
+public String connectorName() {
+return request.connectorName();
+}
+
+/**
+ * Get the original {@link RestartRequest}.
+ *
+ * @return the restart request; never null
+ */
+public RestartRequest restartRequest() {
+return request;
+}
+
+/**
+ * Get the {@link ConnectorStateInfo} that reflects the current state of 
the connector except with the {@code status}
+ * set to {@link AbstractStatus.State#RESTARTING} for the {@link 
Connector} instance and any {@link Task} instances that
+ * are to be restarted, based upon the {@link #restartRequest() restart 
request}.
+ *
+ * @return the connector state info that reflects the restart plan; never 
null
+ */
+public ConnectorStateInfo restartConnectorStateInfo() {
+return stateInfo;
+}
+
+/**
+ * Get the immutable collection of {@link ConnectorTaskId} for all tasks 
to be restarted
+ * based upon the {@link #restartRequest() restart request}.
+ *
+ * @return the IDs of the tasks to be restarted; never null but possibly 
empty
+ */
+public Collection taskIdsToRestart() {
+return idsToRestart;
+}
+
+/**
+ * Determine whether the {@link Connector} instance is to be restarted
+ * based upon the {@link #restartRequest() restart request}.
+ *
+ * @return true if the {@link Connector} instance is to be restarted, or 
false otherwise
+ */
+public boolean restartConnector() {
+return isRestarting(stateInfo.connector());
+}
+
+/**
+ * Determine whether at least one {@link Task} instance is to be restarted
+ * based upon the {@link #restartRequest() restart request}.
+ *
+ * @return true if any {@link Task} instances are to be restarted, or 
false if none are to be restarted
+ */
+public boolean restartAnyTasks() {

Review comment:
   Renamed to shouldRestartTasks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

2021-06-23 Thread GitBox


jlprat commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-866870832


   @ableegoldman and @cadonna thanks a lot for your reviews! I think I 
addressed all of your comments. Let me know what do you think about them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

2021-06-23 Thread GitBox


jlprat commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657138308



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskMetadataImplTest.java
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+
+public class TaskMetadataImplTest {
+
+public static final TaskId TASK_ID = new TaskId(1, 2);
+public static final TopicPartition TP_0 = new TopicPartition("t", 0);
+public static final TopicPartition TP_1 = new TopicPartition("t", 1);
+public static final Set TOPIC_PARTITIONS = mkSet(TP_0, 
TP_1);
+public static final Map COMMITTED_OFFSETS = 
mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 2L));
+public static final Map END_OFFSETS = 
mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 3L));
+public static final Optional TIME_CURRENT_IDLING_STARTED = 
Optional.of(3L);
+
+private TaskMetadata taskMetadata;
+
+@Before
+public void setUp() {
+taskMetadata = new TaskMetadataImpl(
+TASK_ID,
+TOPIC_PARTITIONS,
+COMMITTED_OFFSETS,
+END_OFFSETS,
+TIME_CURRENT_IDLING_STARTED);
+}
+
+@Test
+public void shouldNotAllowModificationOfInternalStateViaGetters() {
+assertTrue(isUnmodifiable(taskMetadata.topicPartitions()));
+assertTrue(isUnmodifiable(taskMetadata.committedOffsets()));
+assertTrue(isUnmodifiable(taskMetadata.endOffsets()));
+}
+
+@Test
+public void shouldFollowEqualsAndHasCodeContract() {

Review comment:
   @cadonna For hash code and equals contract validation, I decided to keep 
it under one single test, and validate the positive cases and any of the 
reasons why it might fall into the negative case.
   Please note that objects differing only on committed offsets, end offsets, 
and/or time curring idling started will be considered equals.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java
##
@@ -55,6 +62,63 @@ public void 
shouldNotAllowModificationOfInternalStateViaGetters() {
 assertTrue(isUnmodifiable(streamsMetadata.standbyStateStoreNames()));
 }
 
+@Test
+public void shouldFollowHashCodeAndEqualsContract() {

Review comment:
   @cadonna For hash code and equals contract validation, I decided to keep 
it under one single test, and validate the positive case and any of the reasons 
why it might fall into the negative case.
   

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImplTest.java
##
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 

[GitHub] [kafka] jlprat commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

2021-06-23 Thread GitBox


jlprat commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-866866991


   Needed to rebase as there were some conflicts with trunk, hence the force 
push.
   
   I applied the changes in separate commits:
   - One for method renames, formattings and small refactors (like the 
unmodifiable collections)
   - One for Adding tests
   - One to fix the regression caused by the unmodifiable collections in 
constructor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

2021-06-23 Thread GitBox


jlprat commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657134887



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java
##
@@ -57,7 +57,11 @@ public ThreadMetadataImpl(final String threadName,
   final Set standbyTasks) {
 this.mainConsumerClientId = mainConsumerClientId;
 this.restoreConsumerClientId = restoreConsumerClientId;
-this.producerClientIds = 
Collections.unmodifiableSet(producerClientIds);
+if (producerClientIds != null) {
+this.producerClientIds = 
Collections.unmodifiableSet(producerClientIds);
+} else {
+this.producerClientIds = Collections.emptySet();
+}

Review comment:
   @cadonna, I needed to add this guard as ThreadMetadataTest was failing 
with NPE.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

2021-06-23 Thread GitBox


jlprat commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657132576



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1478,8 +1499,36 @@ public void cleanUp() {
  * @param storeName the {@code storeName} to find metadata for
  * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances 
with the provide {@code storeName} of
  * this application
+ * @deprecated since 3.0.0 use {@link 
KafkaStreams#allMetadataForGivenStore} instead
  */
-public Collection allMetadataForStore(final String 
storeName) {
+@Deprecated
+public Collection 
allMetadataForStore(final String storeName) {
+validateIsRunningOrRebalancing();
+return 
streamsMetadataState.getAllMetadataForStore(storeName).stream().map(streamsMetadata
 ->
+new 
org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
+streamsMetadata.stateStoreNames(),
+streamsMetadata.topicPartitions(),
+streamsMetadata.standbyStateStoreNames(),
+streamsMetadata.standbyTopicPartitions()))
+.collect(Collectors.toSet());
+}
+
+/**
+ * Find all currently running {@code KafkaStreams} instances (potentially 
remotely) that
+ * 
+ *   use the same {@link StreamsConfig#APPLICATION_ID_CONFIG 
application ID} as this instance (i.e., all
+ *   instances that belong to the same Kafka Streams application)
+ *   and that contain a {@link StateStore} with the given {@code 
storeName}
+ * 
+ * and return {@link StreamsMetadata} for each discovered instance.
+ * 
+ * Note: this is a point in time view and it may change due to partition 
reassignment.
+ *
+ * @param storeName the {@code storeName} to find metadata for
+ * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances 
with the provide {@code storeName} of
+ * this application
+ */
+public Collection allMetadataForGivenStore(final String 
storeName) {

Review comment:
   I decided to for for the pattern xxxForxxx to keep consistency among 
different changes.
   
   It is now `streamsMetadataForStore` but happy to change if anyone has 
reasons against it.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

2021-06-23 Thread GitBox


jlprat commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657132064



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1558,12 +1607,45 @@ private void processStreamThread(final 
Consumer consumer) {
 for (final StreamThread thread : copy) consumer.accept(thread);
 }
 
+/**
+ * Returns runtime information about the local threads of this {@link 
KafkaStreams} instance.
+ *
+ * @return the set of {@link 
org.apache.kafka.streams.processor.ThreadMetadata}.
+ * @deprecated since 3.0 use {@link #threadsMetadata()}
+ */
+@Deprecated
+@SuppressWarnings("deprecation")
+public Set 
localThreadsMetadata() {
+return threadsMetadata().stream().map(threadMetadata -> new 
org.apache.kafka.streams.processor.ThreadMetadata(
+threadMetadata.threadName(),
+threadMetadata.threadState(),
+threadMetadata.consumerClientId(),
+threadMetadata.restoreConsumerClientId(),
+threadMetadata.producerClientIds(),
+threadMetadata.adminClientId(),
+threadMetadata.activeTasks().stream().map(taskMetadata -> new 
org.apache.kafka.streams.processor.TaskMetadata(
+taskMetadata.taskId().toString(),
+taskMetadata.topicPartitions(),
+taskMetadata.committedOffsets(),
+taskMetadata.endOffsets(),
+taskMetadata.timeCurrentIdlingStarted())
+).collect(Collectors.toSet()),
+threadMetadata.standbyTasks().stream().map(taskMetadata -> new 
org.apache.kafka.streams.processor.TaskMetadata(
+taskMetadata.taskId().toString(),
+taskMetadata.topicPartitions(),
+taskMetadata.committedOffsets(),
+taskMetadata.endOffsets(),
+taskMetadata.timeCurrentIdlingStarted())
+).collect(Collectors.toSet(
+.collect(Collectors.toSet());
+}
+
 /**
  * Returns runtime information about the local threads of this {@link 
KafkaStreams} instance.
  *
  * @return the set of {@link ThreadMetadata}.
  */
-public Set localThreadsMetadata() {
+public Set threadsMetadata() {

Review comment:
   I decided to for for the pattern xxxForxxx to keep consistency among 
different changes.
   
   It is now `metadataForLocalThreads` but happy to change if anyone has 
reasons against it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

2021-06-23 Thread GitBox


jlprat commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657131881



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1458,8 +1457,30 @@ public void cleanUp() {
  * Note: this is a point in time view and it may change due to partition 
reassignment.
  *
  * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances 
of this application
+ * @deprecated since 3.0.0 use {@link KafkaStreams#allRunningMetadata}
  */
-public Collection allMetadata() {
+@Deprecated
+public Collection 
allMetadata() {
+validateIsRunningOrRebalancing();
+return 
streamsMetadataState.getAllMetadata().stream().map(streamsMetadata ->
+new 
org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
+streamsMetadata.stateStoreNames(),
+streamsMetadata.topicPartitions(),
+streamsMetadata.standbyStateStoreNames(),
+streamsMetadata.standbyTopicPartitions()))
+.collect(Collectors.toSet());
+}
+
+/**
+ * Find all currently running {@code KafkaStreams} instances (potentially 
remotely) that use the same
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this 
instance (i.e., all instances that belong to
+ * the same Kafka Streams application) and return {@link StreamsMetadata} 
for each discovered instance.
+ * 
+ * Note: this is a point in time view and it may change due to partition 
reassignment.
+ *
+ * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances 
of this application
+ */
+public Collection allRunningMetadata() {

Review comment:
   I decided to for for the pattern xxxForxxx to keep consistency among 
different changes.
   
   It is now `metadataForAllStreamsClients` but happy to change if anyone has 
reasons against it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-23 Thread GitBox


rhauch commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r657118416



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java
##
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+/**
+ * An immutable restart plan.
+ */
+public class RestartPlan {
+
+private final RestartRequest request;
+private final ConnectorStateInfo stateInfo;
+private final Collection idsToRestart;
+
+/**
+ * Create a new request to restart a connector and optionally its tasks.
+ *
+ * @param request  the restart request; may not be null
+ * @param restartStateInfo the current state info for the connector; may 
not be null
+ */
+public RestartPlan(RestartRequest request, ConnectorStateInfo 
restartStateInfo) {
+this.request = Objects.requireNonNull(request, "RestartRequest name 
may not be null");
+this.stateInfo = Objects.requireNonNull(restartStateInfo, 
"ConnectorStateInfo name may not be null");
+// Collect the task IDs to stop and restart (may be none)
+idsToRestart = Collections.unmodifiableList(
+stateInfo.tasks()
+.stream()
+.filter(this::isRestarting)
+.map(taskState -> new 
ConnectorTaskId(request.connectorName(), taskState.id()))
+.collect(Collectors.toList())
+);
+}
+
+/**
+ * Get the connector name.
+ *
+ * @return the name of the connector; never null
+ */
+public String connectorName() {
+return request.connectorName();
+}
+
+/**
+ * Get the original {@link RestartRequest}.
+ *
+ * @return the restart request; never null
+ */
+public RestartRequest restartRequest() {
+return request;
+}
+
+/**
+ * Get the {@link ConnectorStateInfo} that reflects the current state of 
the connector except with the {@code status}
+ * set to {@link AbstractStatus.State#RESTARTING} for the {@link 
Connector} instance and any {@link Task} instances that
+ * are to be restarted, based upon the {@link #restartRequest() restart 
request}.
+ *
+ * @return the connector state info that reflects the restart plan; never 
null
+ */
+public ConnectorStateInfo restartConnectorStateInfo() {

Review comment:
   This is subtly different than a normal `ConnectorStateInfo`, so keeping 
the current name may be a bit more clear in the context where this method is 
used, even if it's less conventional.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-23 Thread GitBox


rhauch commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r655779197



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartRequest.java
##
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Objects;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+
+/**
+ * A request to restart a connector and/or task instances.
+ * The natural order is based upon the connector name, if two requests have 
the same connector name, then the requests are ordered based on the probable 
number of tasks/connector this request is going to restart.

Review comment:
   Nit:
   ```suggestion
* The natural order is based first upon the connector name and then 
requested restart behaviors. 
* If two requests have the same connector name, then the requests are 
ordered based on the 
* probable number of tasks/connector this request is going to restart.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12790) Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent versions of 8 and 11

2021-06-23 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368114#comment-17368114
 ] 

Ismael Juma commented on KAFKA-12790:
-

[~ueisele] I submitted a fix. Thanks once again for the help!

> Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent 
> versions of 8 and 11
> ---
>
> Key: KAFKA-12790
> URL: https://issues.apache.org/jira/browse/KAFKA-12790
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.0.0
>
>
> Support for TLS 1.0 and 1.1 was disabled in recent versions of Java 8/11
> and all versions of 16. Re-enable it in this test so that we can verify
> the server behavior when it establishes connections with such TLS
> versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #10918: KAFKA-12756: Update ZooKeeper to v3.6.3

2021-06-23 Thread GitBox


rondagostino commented on a change in pull request #10918:
URL: https://github.com/apache/kafka/pull/10918#discussion_r657107216



##
File path: gradle/dependencies.gradle
##
@@ -61,6 +61,7 @@ versions += [
   bcpkix: "1.66",
   checkstyle: "8.36.2",
   commonsCli: "1.4",
+  dropwizardMetrics: "3.2.5",

Review comment:
   > Does this conflict with the yammer metrics library 
   
   It doesn't, no.  The yammer metrics library is called 
`metrics-core-2.2.0.jar` and has all classes underneath the 
`com.yammer.metrics` package.  The dropwizard library is called 
`metrics-core-3.2.5.jar` and has all classes underneath the 
`com.codahale.metrics` package.  It isn't ideal that the jar names differ only 
by the version, but there is no conflict.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10918: KAFKA-12756: Update ZooKeeper to v3.6.3

2021-06-23 Thread GitBox


rondagostino commented on a change in pull request #10918:
URL: https://github.com/apache/kafka/pull/10918#discussion_r657104600



##
File path: docs/upgrade.html
##
@@ -21,6 +21,13 @@
 
 Notable changes in 
3.0.0
 
+ZooKeeper has been upgraded to 3.6.3, and that version has a hard 
dependency on the
+io.dropwizard.metrics:metrics-core:3.2.5 library due to 
the new metrics subsystem added in 3.6.0.
+Setting 
metricsProvider.className=org.apache.zookeeper.metrics.impl.NullMetricsProvider
 in your
+zookeeper.properties file does not remove this dependency.  ZooKeeper 
will fail to start with
+java.lang.NoClassDefFoundError: 
com/codahale/metrics/Reservoir if you do not have the above library
+on your CLASSPATH.
+

Review comment:
   > Isn't this handled automatically for users
   
   Yeah, good point, no need to discuss it here.  Will remove.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12786) Getting SslTransportLayerTest error

2021-06-23 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-12786.
-
Resolution: Duplicate

Since it's the same root cause as KAFKA-12790, marking as a duplicate of that.

> Getting SslTransportLayerTest error 
> 
>
> Key: KAFKA-12786
> URL: https://issues.apache.org/jira/browse/KAFKA-12786
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
> Environment: Ububtu 20.04
>Reporter: Sibelle
>Assignee: Ismael Juma
>Priority: Major
>  Labels: beginner
> Attachments: Error.png
>
>
> SaslAuthenticatorTest > testRepeatedValidSaslPlainOverSsl() PASSED
> org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(Args)[1]
>  failed, log available in 
> /kafka/clients/build/reports/testOutput/org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(Args)[1].test.stdout
> SslTransportLayerTest > [1] tlsProtocol=TLSv1.2, useInlinePem=false FAILED
> org.opentest4j.AssertionFailedError: Condition not met within timeout 
> 15000. Metric not updated failed-authentication-total expected:<1.0> but 
> was:<0.0> ==> expected:  but was: 
> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
> at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193)
> at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:320)
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368)
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:317)
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:301)
> at 
> org.apache.kafka.common.network.NioEchoServer.waitForMetrics(NioEchoServer.java:196)
> at 
> org.apache.kafka.common.network.NioEchoServer.verifyAuthenticationMetrics(NioEchoServer.java:155)
> at 
> org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(SslTransportLayerTest.java:644)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12790) Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent versions of 8 and 11

2021-06-23 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12790:

Description: 
Support for TLS 1.0 and 1.1 was disabled in recent versions of Java 8/11
and all versions of 16. Re-enable it in this test so that we can verify
the server behavior when it establishes connections with such TLS
versions.

  was:[|https://github.com/apache/kafka/pull/10415#issuecomment-808230478]


> Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent 
> versions of 8 and 11
> ---
>
> Key: KAFKA-12790
> URL: https://issues.apache.org/jira/browse/KAFKA-12790
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.0.0
>
>
> Support for TLS 1.0 and 1.1 was disabled in recent versions of Java 8/11
> and all versions of 16. Re-enable it in this test so that we can verify
> the server behavior when it establishes connections with such TLS
> versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12790) Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent versions of 8 and 11

2021-06-23 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-12790:
---

Reviewer: Rajini Sivaram
Assignee: Ismael Juma  (was: Rajini Sivaram)

> Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent 
> versions of 8 and 11
> ---
>
> Key: KAFKA-12790
> URL: https://issues.apache.org/jira/browse/KAFKA-12790
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.0.0
>
>
> Support for TLS 1.0 and 1.1 was disabled in recent versions of Java 8/11
> and all versions of 16. Re-enable it in this test so that we can verify
> the server behavior when it establishes connections with such TLS
> versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12790) Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent versions of 8 and 11

2021-06-23 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12790:

Summary: Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 
and recent versions of 8 and 11  (was: Fix 
SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16)

> Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent 
> versions of 8 and 11
> ---
>
> Key: KAFKA-12790
> URL: https://issues.apache.org/jira/browse/KAFKA-12790
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.0.0
>
>
> Details can be found in the PR:
> https://github.com/apache/kafka/pull/10415#issuecomment-808230478



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12790) Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent versions of 8 and 11

2021-06-23 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12790:

Description: 
[|https://github.com/apache/kafka/pull/10415#issuecomment-808230478]  (was: 
Details can be found in the PR:

https://github.com/apache/kafka/pull/10415#issuecomment-808230478)

> Fix SslTransportLayerTest.testUnsupportedTlsVersion with JDK 16 and recent 
> versions of 8 and 11
> ---
>
> Key: KAFKA-12790
> URL: https://issues.apache.org/jira/browse/KAFKA-12790
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.0.0
>
>
> [|https://github.com/apache/kafka/pull/10415#issuecomment-808230478]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma opened a new pull request #10922: KAFKA-12790: Fix SslTransportLayerTest.testUnsupportedTlsVersion with recent JDKs

2021-06-23 Thread GitBox


ijuma opened a new pull request #10922:
URL: https://github.com/apache/kafka/pull/10922


   Support for TLS 1.0 and 1.1 was disabled in recent versions of Java 8/11
   and all versions of 16. Re-enable it in this test so that we can verify
   the server behavior when it establishes connections with such TLS
   versions.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-06-23 Thread GitBox


vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-866823035


   @guozhangwang / @cadonna  I made some tweaks to the code and also started 
testing with 1M keys. Now I see differences in terms of throughput for both 
range and putAll queries:
   
   Here is the compaision:
   
   ```
   testPersistentRangeQueryPerformance original
   
   Benchmark Mode  
Cnt  Score   Error  Units
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance  thrpt   
15  1.131 ? 0.028  ops/s
   
   testPersistentPutAllPerformance original
   
   Benchmark Mode  Cnt  
Score   Error  Units
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance  thrpt   15  
0.919 ? 0.037  ops/s
   
   
   testPersistentRangeQueryPerformance bytebuffer
   StreamsPersistentStoreBenchmark.testPersistentRangeQueryPerformance  thrpt   
15  1.442 ? 0.038  ops/s
   
   
   Benchmark Mode  Cnt  
Score   Error  Units
   StreamsPersistentStoreBenchmark.testPersistentPutAllPerformance  thrpt   15  
1.065 ? 0.041  ops/s
   ```
   
   I needed to add ByteOrder after creating DirectByteBuffer object. And for 
putAll, i needed to flip before calling `put` to batch.
   
   Next step for me would be to create a kafka streams app and test throughput. 
I will post them here once i have those as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-06-23 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657052453



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
##
@@ -54,9 +55,11 @@ public static Builder forReplica(short allowedVersion, int 
replicaId) {
 return new Builder((short) 0, allowedVersion, replicaId, 
IsolationLevel.READ_UNCOMMITTED);
 }
 
-public static Builder forConsumer(boolean requireTimestamp, 
IsolationLevel isolationLevel) {
+public static Builder forConsumer(boolean requireTimestamp, 
IsolationLevel isolationLevel, boolean requireMaxTimestamp) {

Review comment:
   yep, I added a small one to verify oldest versions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-06-23 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657052063



##
File path: clients/src/main/resources/common/message/ListOffsetsResponse.json
##
@@ -29,7 +29,9 @@
   // Version 5 adds a new error code, OFFSET_NOT_AVAILABLE.
   //
   // Version 6 enables flexible versions.
-  "validVersions": "0-6",
+  //
+  // Version 7 is the same as version 6.

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-06-23 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657051441



##
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##
@@ -149,6 +179,21 @@ class LogOffsetTest extends BaseRequestTest {
 assertFalse(offsetChanged)
   }
 
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(): Unit = {
+val topic = "kafka-"
+val topicPartition = new TopicPartition(topic, 0)
+val log = createTopicAndGetLog(topic, topicPartition)
+
+log.updateHighWatermark(log.logEndOffset)
+
+val maxTimestampOffset = 
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+assertEquals(0L, log.logEndOffset)
+assertEquals(0L, maxTimestampOffset.get.offset)
+assertEquals(-1L, maxTimestampOffset.get.timestamp)
+

Review comment:
   done

##
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##
@@ -266,4 +311,14 @@ class LogOffsetTest extends BaseRequestTest {
   .partitions.asScala.find(_.partitionIndex == tp.partition).get
   }
 
+  private def createTopicAndGetLog(topic: String, topicPartition: 
TopicPartition): Log = {
+

Review comment:
   done

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
 }
 }
 
+@Test
+public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+

Review comment:
   done

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
 }
 }
 
+@Test
+public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+Node node = new Node(0, "localhost", 8120);
+List nodes = Collections.singletonList(node);
+final Cluster cluster = new Cluster(
+"mockClusterId",
+nodes,
+Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+Collections.emptySet(),
+Collections.emptySet(),
+node);
+final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+// listoffsets response from broker 0
+env.kafkaClient().prepareUnsupportedVersionResponse(
+request -> request instanceof ListOffsetsRequest);
+
+ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+TestUtils.assertFutureThrows(result.all(), 
UnsupportedVersionException.class);
+}
+}
+
+@Test
+public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() 
throws Exception {
+

Review comment:
   done

##
File path: clients/src/main/resources/common/message/ListOffsetsRequest.json
##
@@ -30,7 +30,9 @@
   // Version 5 is the same as version 4.
   //
   // Version 6 enables flexible versions.
-  "validVersions": "0-6",
+  //
+  // Version 7 enables listing offsets by max timestamp.

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-06-23 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657051339



##
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##
@@ -93,16 +87,52 @@ class LogOffsetTest extends BaseRequestTest {
   }
 
   @Test
-  def testGetOffsetsBeforeLatestTime(): Unit = {
+  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
 val topic = "kafka-"
 val topicPartition = new TopicPartition(topic, 0)
+val log = createTopicAndGetLog(topic, topicPartition)
 
-createTopic(topic, 1, 1)
+for (timestamp <- 0 until 20)
+  log.appendAsLeader(TestUtils.singletonRecords(value = 
Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0)
+log.flush()
 
-val logManager = server.getLogManager
-TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
-  s"Log for partition $topicPartition should be created")
-val log = logManager.getLog(topicPartition).get
+log.updateHighWatermark(log.logEndOffset)
+
+val firstOffset = 
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+assertEquals(19L, firstOffset.get.offset)
+assertEquals(19L, firstOffset.get.timestamp)
+
+log.truncateTo(0)
+
+val secondOffset = 
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+assertEquals(0L, secondOffset.get.offset)
+assertEquals(-1L, secondOffset.get.timestamp)
+

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng edited a comment on pull request #10825: KAFKA-5876: Add `streams()` method to StateStoreProvider

2021-06-23 Thread GitBox


vitojeng edited a comment on pull request #10825:
URL: https://github.com/apache/kafka/pull/10825#issuecomment-866794585


   update PR & rebase trunk.
   Remove:
   ```
   KafkaStreams streams();
   ```
   Add 
   ```
   KafkaStreams.State streamsState();
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-06-23 Thread GitBox


guozhangwang commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r657029488



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -799,41 +795,41 @@ public KafkaStreams(final Topology topology,
 public KafkaStreams(final Topology topology,
 final StreamsConfig config,
 final Time time) {
-this(topology.internalTopologyBuilder, config, new 
DefaultKafkaClientSupplier(), time);
+this(new TopologyMetadata(topology.internalTopologyBuilder, config), 
config, new DefaultKafkaClientSupplier(), time);
 }
 
-private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
- final StreamsConfig config,
- final KafkaClientSupplier clientSupplier) throws 
StreamsException {
-this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
-}
-
-private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
+private KafkaStreams(final Topology topology,
  final StreamsConfig config,
  final KafkaClientSupplier clientSupplier,
  final Time time) throws StreamsException {
+this(new TopologyMetadata(topology.internalTopologyBuilder, config), 
config, clientSupplier, time);
+}
+
+protected KafkaStreams(final TopologyMetadata topologyMetadata,

Review comment:
   Do these two functions need to be `protected` rather than `private`?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {

Review comment:
   Maybe we should just require `atLeast(1)` in StreamsConfig definition? 
And then here we only need to check the first condition.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import 

[GitHub] [kafka] vitojeng commented on pull request #10825: KAFKA-5876: Add `streams()` method to StateStoreProvider

2021-06-23 Thread GitBox


vitojeng commented on pull request #10825:
URL: https://github.com/apache/kafka/pull/10825#issuecomment-866794585


   update PR & rebase trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-06-23 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657032428



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
##
@@ -54,9 +55,11 @@ public static Builder forReplica(short allowedVersion, int 
replicaId) {
 return new Builder((short) 0, allowedVersion, replicaId, 
IsolationLevel.READ_UNCOMMITTED);
 }
 
-public static Builder forConsumer(boolean requireTimestamp, 
IsolationLevel isolationLevel) {
+public static Builder forConsumer(boolean requireTimestamp, 
IsolationLevel isolationLevel, boolean requireMaxTimestamp) {

Review comment:
   added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-06-23 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657007472



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -1444,43 +1450,63 @@ private DeleteGroupsResponse 
createDeleteGroupsResponse() {
 );
 }
 
-private ListOffsetsRequest createListOffsetRequest(int version) {
+private ListOffsetsRequest createListOffsetRequest(int version, long 
timestamp) {
 if (version == 0) {
 ListOffsetsTopic topic = new ListOffsetsTopic()
 .setName("test")
 .setPartitions(Arrays.asList(new ListOffsetsPartition()
 .setPartitionIndex(0)
-.setTimestamp(100L)
+.setTimestamp(timestamp)
 .setMaxNumOffsets(10)
 .setCurrentLeaderEpoch(5)));
 return ListOffsetsRequest.Builder
-.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
 .setTargetTimes(Collections.singletonList(topic))
 .build((short) version);
 } else if (version == 1) {
 ListOffsetsTopic topic = new ListOffsetsTopic()
 .setName("test")
 .setPartitions(Arrays.asList(new ListOffsetsPartition()
 .setPartitionIndex(0)
-.setTimestamp(100L)
+.setTimestamp(timestamp)
 .setCurrentLeaderEpoch(5)));
 return ListOffsetsRequest.Builder
-.forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
+.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
 .setTargetTimes(Collections.singletonList(topic))
 .build((short) version);
-} else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) {
+} else if (version >= 2 && version <= 6) {
 ListOffsetsPartition partition = new ListOffsetsPartition()
 .setPartitionIndex(0)
-.setTimestamp(100L)
+.setTimestamp(timestamp)
 .setCurrentLeaderEpoch(5);
 
 ListOffsetsTopic topic = new ListOffsetsTopic()
 .setName("test")
 .setPartitions(Arrays.asList(partition));
 return ListOffsetsRequest.Builder
-.forConsumer(true, IsolationLevel.READ_COMMITTED)
+.forConsumer(true, IsolationLevel.READ_COMMITTED, false)
 .setTargetTimes(Collections.singletonList(topic))
 .build((short) version);
+} else if (version >= 7 && version <= LIST_OFFSETS.latestVersion()) {
+ListOffsetsPartition partition = new ListOffsetsPartition()
+.setPartitionIndex(0)
+.setTimestamp(timestamp)
+.setCurrentLeaderEpoch(5);
+
+ListOffsetsTopic topic = new ListOffsetsTopic()
+.setName("test")
+.setPartitions(Arrays.asList(partition));
+if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+return ListOffsetsRequest.Builder
+.forConsumer(true, IsolationLevel.READ_COMMITTED, 
false)
+.setTargetTimes(Collections.singletonList(topic))
+.build((short) version);
+} else {

Review comment:
   removed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-06-23 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657007220



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -302,10 +302,16 @@ public void testSerialization() throws Exception {
 checkErrorResponse(createDeleteGroupsRequest(), 
unknownServerException, true);
 checkResponse(createDeleteGroupsResponse(), 0, true);
 for (short version : LIST_OFFSETS.allVersions()) {
-checkRequest(createListOffsetRequest(version), true);
-checkErrorResponse(createListOffsetRequest(version), 
unknownServerException, true);
+checkRequest(createListOffsetRequest(version, 100L), true);
+checkErrorResponse(createListOffsetRequest(version, 100L), 
unknownServerException, true);
 checkResponse(createListOffsetResponse(version), version, true);
 }
+LIST_OFFSETS.allVersions().stream().filter(version -> version >= 
(short) 7).forEach(
+version -> {
+checkRequest(createListOffsetRequest(version, 
ListOffsetsRequest.MAX_TIMESTAMP), true);
+checkErrorResponse(createListOffsetRequest(version, 
ListOffsetsRequest.MAX_TIMESTAMP), unknownServerException, true);
+}
+);

Review comment:
   coolio, have removed it.

##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -432,8 +438,8 @@ public void testSerialization() throws Exception {
 checkRequest(createUpdateMetadataRequest(5, null), false);
 checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), 
unknownServerException, true);
 checkResponse(createUpdateMetadataResponse(), 0, true);
-checkRequest(createListOffsetRequest(0), true);
-checkErrorResponse(createListOffsetRequest(0), unknownServerException, 
true);
+checkRequest(createListOffsetRequest(0, 100L), true);
+checkErrorResponse(createListOffsetRequest(0, 100L), 
unknownServerException, true);

Review comment:
   yep, removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-06-23 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657007003



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4298,6 +4296,39 @@ void handleFailure(Throwable throwable) {
 }
 }
 }
+
+@Override
+boolean 
handleUnsupportedVersionException(UnsupportedVersionException exception) {
+if (supportsMaxTimestamp) {
+supportsMaxTimestamp = false;
+
+// fail any unsupported futures and remove partitions 
from the downgraded retry
+List topicsToRemove = new 
ArrayList<>();
+partitionsToQuery.stream().forEach(
+t -> {
+List partitionsToRemove 
= new ArrayList<>();
+t.partitions().stream()
+.filter(p -> p.timestamp() == 
ListOffsetsRequest.MAX_TIMESTAMP)
+.forEach(
+p -> {
+futures.get(new 
TopicPartition(t.name(), p.partitionIndex()))
+.completeExceptionally(
+new 
UnsupportedVersionException(
+"Broker " + brokerId
++ " does not 
support MAX_TIMESTAMP offset spec"));
+partitionsToRemove.add(p);
+
+});
+t.partitions().removeAll(partitionsToRemove);
+if (t.partitions().isEmpty()) 
topicsToRemove.add(t);
+}
+);
+partitionsToQuery.removeAll(topicsToRemove);
+
+return !partitionsToQuery.isEmpty();

Review comment:
   good point, I've added a check for this.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
 }
 }
 
+@Test
+public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+Node node = new Node(0, "localhost", 8120);
+List nodes = Collections.singletonList(node);
+final Cluster cluster = new Cluster(
+"mockClusterId",
+nodes,
+Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+Collections.emptySet(),
+Collections.emptySet(),
+node);
+final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+// listoffsets response from broker 0
+env.kafkaClient().prepareUnsupportedVersionResponse(
+request -> request instanceof ListOffsetsRequest);
+
+ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+TestUtils.assertFutureThrows(result.all(), 
UnsupportedVersionException.class);
+}
+}
+
+@Test
+public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() 
throws Exception {
+
+Node node = new Node(0, "localhost", 8120);
+List nodes = Collections.singletonList(node);
+List pInfos = new ArrayList<>();
+pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+final Cluster cluster = new Cluster(
+"mockClusterId",
+nodes,
+pInfos,
+Collections.emptySet(),
+Collections.emptySet(),
+node);
+final TopicPartition tp0 = new TopicPartition("foo", 0);
+final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+// listoffsets response from broker 0
+env.kafkaClient().prepareUnsupportedVersionResponse(
+request -> request 

[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-06-23 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657004623



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4298,6 +4296,39 @@ void handleFailure(Throwable throwable) {
 }
 }
 }
+
+@Override
+boolean 
handleUnsupportedVersionException(UnsupportedVersionException exception) {
+if (supportsMaxTimestamp) {
+supportsMaxTimestamp = false;
+
+// fail any unsupported futures and remove partitions 
from the downgraded retry
+List topicsToRemove = new 
ArrayList<>();
+partitionsToQuery.stream().forEach(
+t -> {
+List partitionsToRemove 
= new ArrayList<>();
+t.partitions().stream()
+.filter(p -> p.timestamp() == 
ListOffsetsRequest.MAX_TIMESTAMP)
+.forEach(
+p -> {
+futures.get(new 
TopicPartition(t.name(), p.partitionIndex()))
+.completeExceptionally(
+new 
UnsupportedVersionException(
+"Broker " + brokerId
++ " does not 
support MAX_TIMESTAMP offset spec"));
+partitionsToRemove.add(p);
+
+});
+t.partitions().removeAll(partitionsToRemove);
+if (t.partitions().isEmpty()) 
topicsToRemove.add(t);
+}
+);
+partitionsToQuery.removeAll(topicsToRemove);

Review comment:
   agreed, looks much neater.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-06-23 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r656955014



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
 }
 }
 
+@Test
+public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+Node node = new Node(0, "localhost", 8120);
+List nodes = Collections.singletonList(node);
+final Cluster cluster = new Cluster(
+"mockClusterId",
+nodes,
+Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+Collections.emptySet(),
+Collections.emptySet(),
+node);
+final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+// listoffsets response from broker 0
+env.kafkaClient().prepareUnsupportedVersionResponse(
+request -> request instanceof ListOffsetsRequest);
+
+ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+TestUtils.assertFutureThrows(result.all(), 
UnsupportedVersionException.class);
+}
+}
+
+@Test
+public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() 
throws Exception {
+
+Node node = new Node(0, "localhost", 8120);
+List nodes = Collections.singletonList(node);
+List pInfos = new ArrayList<>();
+pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+final Cluster cluster = new Cluster(
+"mockClusterId",
+nodes,
+pInfos,
+Collections.emptySet(),
+Collections.emptySet(),
+node);
+final TopicPartition tp0 = new TopicPartition("foo", 0);
+final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+// listoffsets response from broker 0
+env.kafkaClient().prepareUnsupportedVersionResponse(
+request -> request instanceof ListOffsetsRequest);
+
+ListOffsetsTopicResponse topicResponse = 
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 
345L, 543);
+ListOffsetsResponseData responseData = new 
ListOffsetsResponseData()
+.setThrottleTimeMs(0)
+.setTopics(Arrays.asList(topicResponse));
+env.kafkaClient().prepareResponseFrom(new 
ListOffsetsResponse(responseData), node);
+
+ListOffsetsResult result = env.adminClient().listOffsets(new 
HashMap() {{
+put(tp0, OffsetSpec.maxTimestamp());
+put(tp1, OffsetSpec.latest());
+}});
+
+TestUtils.assertFutureThrows(result.partitionResult(tp0), 
UnsupportedVersionException.class);
+
+ListOffsetsResultInfo tp1Offset = 
result.partitionResult(tp1).get();
+assertEquals(345L, tp1Offset.offset());
+assertEquals(543, tp1Offset.leaderEpoch().get().intValue());
+assertEquals(-1L, tp1Offset.timestamp());
+}
+}
+
+@Test
+public void testListOffsetsMaxTimestampAndNoBrokerResponse() {
+Node node = new Node(0, "localhost", 8120);
+List nodes = Collections.singletonList(node);
+List pInfos = new ArrayList<>();
+pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+final Cluster cluster = new Cluster(
+"mockClusterId",
+nodes,
+pInfos,
+Collections.emptySet(),
+Collections.emptySet(),
+node);
+final TopicPartition tp0 = new TopicPartition("foo", 0);
+final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+

[GitHub] [kafka] cadonna commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

2021-06-23 Thread GitBox


cadonna commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r656890122



##
File path: streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+
+/**
+ * Represents the state of a single task running within a {@link KafkaStreams} 
application.
+ */
+public interface TaskMetadata {
+
+/**
+ * @return the basic task metadata such as subtopology and partition id
+ */
+TaskId taskId();
+
+/**
+ * This function will return a set of the current TopicPartitions
+ */
+Set topicPartitions();

Review comment:
   Could you please use javadoc mark-up like `@return` and `@param` for the 
docs? Here and for the other methods.

##
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link 
KafkaStreams} application.
+ */
+public interface ThreadMetadata {
+
+
+/**
+ * @return the state of the Thread
+ */
+String threadState();
+
+/**
+ * @return the name of the Thread
+ */
+String threadName();
+
+/**
+ * This function will return the set of the {@link TaskMetadata} for the 
current active tasks
+ */
+Set activeTasks();

Review comment:
   Could you use javadoc mark-up for the docs? Here and for the other 
methods.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link 
KafkaStreams} application.
+ */
+public class ThreadMetadataImpl implements ThreadMetadata {
+
+private final String threadName;
+
+private final String threadState;
+
+private final Set activeTasks;
+
+private final Set standbyTasks;
+
+private final String mainConsumerClientId;
+
+private final String 

[GitHub] [kafka] PhilHardwick opened a new pull request #10921: MINOR: Ensure queryable store providers is up to date after adding stream thread

2021-06-23 Thread GitBox


PhilHardwick opened a new pull request #10921:
URL: https://github.com/apache/kafka/pull/10921


   When a new thread is added the queryable store providers continues to use 
the store providers it was given when KafkaStreams was instantiated.
   
   I wanted to keep QueryableStoreProviders immutable, so this meant I had to 
make the queryableStoreProvider field in KafkaStreams class mutable to allow 
this change.
   
   This is tested via an integration test where, after adding a thread, 
producing messages with different keys shows that, with the previous code, the 
keys are not in the store and after the change they are queryable.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata

2021-06-23 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367967#comment-17367967
 ] 

Luke Chen commented on KAFKA-12984:
---

Good root cause analysis! And I agree the solution (a) that the sticky 
assignment algorithm to resolve cases of improper input conditions by 
invalidating the "ownedPartitions" in cases of double ownership.

> Cooperative sticky assignor can get stuck with invalid SubscriptionState 
> input metadata
> ---
>
> Key: KAFKA-12984
> URL: https://issues.apache.org/jira/browse/KAFKA-12984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0, 2.8.1
>
>
> Some users have reported seeing their consumer group become stuck in the 
> CompletingRebalance phase when using the cooperative-sticky assignor. Based 
> on the request metadata we were able to deduce that multiple consumers were 
> reporting the same partition(s) in their "ownedPartitions" field of the 
> consumer protocol. Since this is an invalid state, the input causes the 
> cooperative-sticky assignor to detect that something is wrong and throw an 
> IllegalStateException. If the consumer application is set up to simply retry, 
> this will cause the group to appear to hang in the rebalance state.
> The "ownedPartitions" field is encoded based on the ConsumerCoordinator's 
> SubscriptionState, which was assumed to always be up to date. However there 
> may be cases where the consumer has dropped out of the group but fails to 
> clear the SubscriptionState, allowing it to report some partitions as owned 
> that have since been reassigned to another member.
> We should (a) fix the sticky assignment algorithm to resolve cases of 
> improper input conditions by invalidating the "ownedPartitions" in cases of 
> double ownership, and (b) shore up the ConsumerCoordinator logic to better 
> handle rejoining the group and keeping its internal state consistent. See 
> KAFKA-12983 for more details on (b)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata

2021-06-23 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367967#comment-17367967
 ] 

Luke Chen edited comment on KAFKA-12984 at 6/23/21, 8:42 AM:
-

Good root cause analysis! And I agree the solution (a) that the sticky 
assignment algorithm should resolve cases of improper input conditions by 
invalidating the "ownedPartitions" in cases of double ownership.


was (Author: showuon):
Good root cause analysis! And I agree the solution (a) that the sticky 
assignment algorithm to resolve cases of improper input conditions by 
invalidating the "ownedPartitions" in cases of double ownership.

> Cooperative sticky assignor can get stuck with invalid SubscriptionState 
> input metadata
> ---
>
> Key: KAFKA-12984
> URL: https://issues.apache.org/jira/browse/KAFKA-12984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0, 2.8.1
>
>
> Some users have reported seeing their consumer group become stuck in the 
> CompletingRebalance phase when using the cooperative-sticky assignor. Based 
> on the request metadata we were able to deduce that multiple consumers were 
> reporting the same partition(s) in their "ownedPartitions" field of the 
> consumer protocol. Since this is an invalid state, the input causes the 
> cooperative-sticky assignor to detect that something is wrong and throw an 
> IllegalStateException. If the consumer application is set up to simply retry, 
> this will cause the group to appear to hang in the rebalance state.
> The "ownedPartitions" field is encoded based on the ConsumerCoordinator's 
> SubscriptionState, which was assumed to always be up to date. However there 
> may be cases where the consumer has dropped out of the group but fails to 
> clear the SubscriptionState, allowing it to report some partitions as owned 
> that have since been reassigned to another member.
> We should (a) fix the sticky assignment algorithm to resolve cases of 
> improper input conditions by invalidating the "ownedPartitions" in cases of 
> double ownership, and (b) shore up the ConsumerCoordinator logic to better 
> handle rejoining the group and keeping its internal state consistent. See 
> KAFKA-12983 for more details on (b)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on pull request #10897: MINOR: Reduced severity for "skipping records" falling out of time windows

2021-06-23 Thread GitBox


cadonna commented on pull request #10897:
URL: https://github.com/apache/kafka/pull/10897#issuecomment-866637956


   FYI: I opened PR #10920 to improve the unit tests for the log messages for 
the other types of dropped records.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10920: MINOR: Improve test of log messages for dropped records

2021-06-23 Thread GitBox


cadonna commented on pull request #10920:
URL: https://github.com/apache/kafka/pull/10920#issuecomment-866636831


   Call for review: @xdgrulez @mjsax 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #10920: MINOR: Improve test of log messages for dropped records

2021-06-23 Thread GitBox


cadonna opened a new pull request #10920:
URL: https://github.com/apache/kafka/pull/10920


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr opened a new pull request #10919: KAFKA-12985: CVE-2021-28169 - Upgrade jetty to 9.4.41

2021-06-23 Thread GitBox


dongjinleekr opened a new pull request #10919:
URL: https://github.com/apache/kafka/pull/10919


   [Jetty 
9.4.41.v20210516](https://github.com/eclipse/jetty.project/releases/tag/jetty-9.4.41.v20210516)
 resolves following security vulnerabilities.
   
   - [CVE-2021-28169](https://nvd.nist.gov/vuln/detail/CVE-2021-28169) 
(described in the issue)
   - [CVE-2021-34428](https://nvd.nist.gov/vuln/detail/CVE-2021-34428)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12985) CVE-2021-28169 - Upgrade jetty to 9.4.41

2021-06-23 Thread Dongjin Lee (Jira)
Dongjin Lee created KAFKA-12985:
---

 Summary: CVE-2021-28169 - Upgrade jetty to 9.4.41
 Key: KAFKA-12985
 URL: https://issues.apache.org/jira/browse/KAFKA-12985
 Project: Kafka
  Issue Type: Task
  Components: security
Reporter: Dongjin Lee
Assignee: Dongjin Lee


CVE-2021-28169 vulnerability affects Jetty versions up to 9.4.40. For more 
information see https://nvd.nist.gov/vuln/detail/CVE-2021-28169

Upgrading to Jetty version 9.4.41 should address this issue 
(https://github.com/eclipse/jetty.project/security/advisories/GHSA-gwcr-j4wh-j3cq).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >