[jira] [Assigned] (KAFKA-7438) Replace EasyMock and PowerMock with Mockito

2022-07-26 Thread Dalibor Plavcic (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dalibor Plavcic reassigned KAFKA-7438:
--

Assignee: Dalibor Plavcic

> Replace EasyMock and PowerMock with Mockito
> ---
>
> Key: KAFKA-7438
> URL: https://issues.apache.org/jira/browse/KAFKA-7438
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Dalibor Plavcic
>Priority: Major
>
> Development of EasyMock and PowerMock has stagnated while Mockito continues 
> to be actively developed. With the new Java cadence, it's a problem to depend 
> on libraries that do bytecode generation and are not actively maintained. In 
> addition, Mockito is also easier to use.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vvcephei commented on a diff in pull request #12437: KAFKA-13769 Add tests for ForeignJoinSubscriptionProcessorSupplier

2022-07-26 Thread GitBox


vvcephei commented on code in PR #12437:
URL: https://github.com/apache/kafka/pull/12437#discussion_r930562541


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
+import 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import 
org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ForeignJoinSubscriptionProcessorSupplierTest {

Review Comment:
   Thanks for this.
   
   I want to make sure we don't overlook this again if we add another version. 
I think it should be good enough if you add another test that asserts that 
`SubscriptionWrapper.CURRENT_VERSION == VERSION_1`, along with a comment 
explaining that when we bump the version, we should also add a new set of 
tests. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #12445: MINOR: remove unnecessary test stubbing

2022-07-26 Thread GitBox


vvcephei merged PR #12445:
URL: https://github.com/apache/kafka/pull/12445


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dstelljes commented on pull request #12295: KAFKA-13586: Prevent exception thrown during connector update from crashing distributed herder

2022-07-26 Thread GitBox


dstelljes commented on PR #12295:
URL: https://github.com/apache/kafka/pull/12295#issuecomment-1196168380

   Hmm, if this is what you see than I have the same thing:
   
   ```
   No tests found for given includes: [**/*Suite.class](exclude rules) 
[org.apache.kafka.connect.runtime.distributed.DistributedHerderTest.testConnectorConfigUpdate](--tests
 filter)
   ```
   
   It seems to be the `@PrepareForTest`s at the method level. I also tried 
removing the `@PrepareForTest` at the class level and copying it onto each test 
to rule out issues with class/method level combination and got the same result.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12439: KAFKA-10199: Further refactor task lifecycle management

2022-07-26 Thread GitBox


guozhangwang commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r930512882


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -480,6 +503,21 @@ private void closeAndRecycleTasks(final Map> tasksToRe
 }
 }
 
+private void convertActiveToStandby(final StreamTask activeTask,
+final Set partitions) {
+activeTask.recycleAndConvert();
+activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTask.id());
+final StandbyTask standbyTask = 
standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions);
+tasks.replaceActiveWithStandby(standbyTask);
+}
+
+private void convertStandbyToActive(final StandbyTask standbyTask,
+final Set partitions) {
+standbyTask.recycleAndConvert();

Review Comment:
   I will update the `TaskManagerTest` for them, since the creator classes do 
not have their own test suites. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky opened a new pull request, #12445: remove unnecessary stubbing

2022-07-26 Thread GitBox


lihaosky opened a new pull request, #12445:
URL: https://github.com/apache/kafka/pull/12445

   Unnecessary stubbing caused job failure
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #12403: KAFKA-13166 Fix missing ControllerApis error handling

2022-07-26 Thread GitBox


mumrah merged PR #12403:
URL: https://github.com/apache/kafka/pull/12403


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13806) Check CRC when reading snapshots

2022-07-26 Thread Niket Goel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niket Goel resolved KAFKA-13806.

Fix Version/s: 3.3
   Resolution: Duplicate

> Check CRC when reading snapshots
> 
>
> Key: KAFKA-13806
> URL: https://issues.apache.org/jira/browse/KAFKA-13806
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
> Fix For: 3.3
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13166) EOFException when Controller handles unknown API

2022-07-26 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-13166:
-
Fix Version/s: 3.3.0
   3.4.0

> EOFException when Controller handles unknown API
> 
>
> Key: KAFKA-13166
> URL: https://issues.apache.org/jira/browse/KAFKA-13166
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Minor
> Fix For: 3.3.0, 3.4.0
>
>
> When ControllerApis handles an unsupported RPC, it silently drops the request 
> due to an unhandled exception. 
> The following stack trace was manually printed since this exception was 
> suppressed on the controller. 
> {code}
> java.util.NoSuchElementException: key not found: UpdateFeatures
>   at scala.collection.MapOps.default(Map.scala:274)
>   at scala.collection.MapOps.default$(Map.scala:273)
>   at scala.collection.AbstractMap.default(Map.scala:405)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:425)
>   at kafka.network.RequestChannel$Metrics.apply(RequestChannel.scala:74)
>   at 
> kafka.network.RequestChannel.$anonfun$updateErrorMetrics$1(RequestChannel.scala:458)
>   at 
> kafka.network.RequestChannel.$anonfun$updateErrorMetrics$1$adapted(RequestChannel.scala:457)
>   at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
>   at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
>   at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
>   at 
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
>   at 
> kafka.network.RequestChannel.updateErrorMetrics(RequestChannel.scala:457)
>   at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:388)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:93)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:121)
>   at 
> kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:78)
>   at kafka.server.ControllerApis.handle(ControllerApis.scala:116)
>   at 
> kafka.server.ControllerApis.$anonfun$handleEnvelopeRequest$1(ControllerApis.scala:125)
>   at 
> kafka.server.ControllerApis.$anonfun$handleEnvelopeRequest$1$adapted(ControllerApis.scala:125)
>   at 
> kafka.server.EnvelopeUtils$.handleEnvelopeRequest(EnvelopeUtils.scala:65)
>   at 
> kafka.server.ControllerApis.handleEnvelopeRequest(ControllerApis.scala:125)
>   at kafka.server.ControllerApis.handle(ControllerApis.scala:103)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> This is due to a bug in the metrics code in RequestChannel.
> The result is that the request fails, but no indication is given that it was 
> due to an unsupported API on either the broker, controller, or client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante opened a new pull request, #12444: KAFKA-14101: Improve documentation for consuming from embedded Kafka cluster topics in Connect integration testing framework

2022-07-26 Thread GitBox


C0urante opened a new pull request, #12444:
URL: https://github.com/apache/kafka/pull/12444

   [Jira](https://issues.apache.org/jira/browse/KAFKA-14101)
   
   Depends on https://github.com/apache/kafka/pull/12429, which should 
implement a logical fix for KAFKA-14101. This follow-up PR is intended to help 
harden our integration tests against the mistakes that caused KAFKA-14101 by:
   1. Renaming `EmbeddedKafkaCluster::consume` to 
`EmbeddedKafkaCluster::consumeAtLeast` in order to clarify behavior and help 
distinguish between it and `EmbeddedKafkaCluster::consumeAll`.
   2. Adding a note to the Javadocs for `EmbeddedKafkaCluster::consumeAtLeast` 
warning about out-of-order consumption and data missing from topic partitions.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14111) Dynamic config update fails for "password" configs in KRaft

2022-07-26 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-14111:
-
Component/s: kraft

> Dynamic config update fails for "password" configs in KRaft
> ---
>
> Key: KAFKA-14111
> URL: https://issues.apache.org/jira/browse/KAFKA-14111
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: David Arthur
>Priority: Major
>
> Two related bugs found when working on updating 
> DynamicBrokerReconfigurationTest for KRaft. 
> Firstly, if we issue an AlterConfig (or IncrementalAlterConfig) for a broker 
> config that is defined as a "password", it will succeed on the controller, 
> but throw an error when the broker handles it. 
> For example, on a vanilla cluster running "config/kraft/server.properties"
> {code}
> /bin/kafka-configs.sh --bootstrap-server localhost:9092  --alter --broker 1 
> --add-config listener.name.external.ssl.key.password=foo 
> {code}
> results in
> {code}
> [2022-07-26 16:24:05,049] ERROR Dynamic password config 
> listener.name.external.ssl.key.password could not be decoded, ignoring. 
> (kafka.server.DynamicBrokerConfig)
> org.apache.kafka.common.config.ConfigException: Password encoder secret not 
> configured
>   at 
> kafka.server.DynamicBrokerConfig.$anonfun$passwordEncoder$1(DynamicBrokerConfig.scala:352)
>   at scala.Option.getOrElse(Option.scala:201)
>   at 
> kafka.server.DynamicBrokerConfig.passwordEncoder(DynamicBrokerConfig.scala:352)
>   at 
> kafka.server.DynamicBrokerConfig.decodePassword$1(DynamicBrokerConfig.scala:393)
>   at 
> kafka.server.DynamicBrokerConfig.$anonfun$fromPersistentProps$5(DynamicBrokerConfig.scala:404)
>   at 
> kafka.server.DynamicBrokerConfig.$anonfun$fromPersistentProps$5$adapted(DynamicBrokerConfig.scala:402)
>   at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
>   at scala.collection.MapOps.foreachEntry(Map.scala:244)
>   at scala.collection.MapOps.foreachEntry$(Map.scala:240)
>   at scala.collection.AbstractMap.foreachEntry(Map.scala:405)
>   at 
> kafka.server.DynamicBrokerConfig.fromPersistentProps(DynamicBrokerConfig.scala:402)
>   at 
> kafka.server.DynamicBrokerConfig.$anonfun$updateBrokerConfig$1(DynamicBrokerConfig.scala:300)
>   at 
> kafka.server.DynamicBrokerConfig.updateBrokerConfig(DynamicBrokerConfig.scala:299)
>   at 
> kafka.server.BrokerConfigHandler.processConfigChanges(ConfigHandler.scala:221)
>   at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$15(BrokerMetadataPublisher.scala:212)
>   at java.base/java.util.HashMap$KeySet.forEach(HashMap.java:1008)
>   at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$14(BrokerMetadataPublisher.scala:190)
>   at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$14$adapted(BrokerMetadataPublisher.scala:189)
>   at scala.Option.foreach(Option.scala:437)
>   at 
> kafka.server.metadata.BrokerMetadataPublisher.publish(BrokerMetadataPublisher.scala:189)
>   at 
> kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:293)
>   at 
> kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.$anonfun$run$2(BrokerMetadataListener.scala:126)
>   at 
> kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.$anonfun$run$2$adapted(BrokerMetadataListener.scala:126)
>   at scala.Option.foreach(Option.scala:437)
>   at 
> kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.run(BrokerMetadataListener.scala:126)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> {code}.
> If a {{password.encoder.secret}} is supplied, this still fails but with:
> {code}
> [2022-07-26 16:27:23,247] ERROR Dynamic password config 
> listener.name.external.ssl.key.password could not be decoded, ignoring. 
> (kafka.server.DynamicBrokerConfig)
> java.lang.StringIndexOutOfBoundsException: begin 0, end -1, length 3
>   at java.base/java.lang.String.checkBoundsBeginEnd(String.java:4604)
>   at java.base/java.lang.String.substring(String.java:2707)
>   at kafka.utils.CoreUtils$.$anonfun$parseCsvMap$1(CoreUtils.scala:173)
>   at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:929)
>   at kafka.utils.CoreUtils$.parseCsvMap(CoreUtils.scala:171)
>   at kafka.utils.PasswordEncoder.decode(PasswordEncoder.scala:88)
>   at 
> kafka.server

[jira] [Created] (KAFKA-14111) Dynamic config update fails for "password" configs in KRaft

2022-07-26 Thread David Arthur (Jira)
David Arthur created KAFKA-14111:


 Summary: Dynamic config update fails for "password" configs in 
KRaft
 Key: KAFKA-14111
 URL: https://issues.apache.org/jira/browse/KAFKA-14111
 Project: Kafka
  Issue Type: Bug
Reporter: David Arthur


Two related bugs found when working on updating 
DynamicBrokerReconfigurationTest for KRaft. 

Firstly, if we issue an AlterConfig (or IncrementalAlterConfig) for a broker 
config that is defined as a "password", it will succeed on the controller, but 
throw an error when the broker handles it. 

For example, on a vanilla cluster running "config/kraft/server.properties"

{code}
/bin/kafka-configs.sh --bootstrap-server localhost:9092  --alter --broker 1 
--add-config listener.name.external.ssl.key.password=foo 
{code}

results in

{code}
[2022-07-26 16:24:05,049] ERROR Dynamic password config 
listener.name.external.ssl.key.password could not be decoded, ignoring. 
(kafka.server.DynamicBrokerConfig)
org.apache.kafka.common.config.ConfigException: Password encoder secret not 
configured
at 
kafka.server.DynamicBrokerConfig.$anonfun$passwordEncoder$1(DynamicBrokerConfig.scala:352)
at scala.Option.getOrElse(Option.scala:201)
at 
kafka.server.DynamicBrokerConfig.passwordEncoder(DynamicBrokerConfig.scala:352)
at 
kafka.server.DynamicBrokerConfig.decodePassword$1(DynamicBrokerConfig.scala:393)
at 
kafka.server.DynamicBrokerConfig.$anonfun$fromPersistentProps$5(DynamicBrokerConfig.scala:404)
at 
kafka.server.DynamicBrokerConfig.$anonfun$fromPersistentProps$5$adapted(DynamicBrokerConfig.scala:402)
at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
at scala.collection.MapOps.foreachEntry(Map.scala:244)
at scala.collection.MapOps.foreachEntry$(Map.scala:240)
at scala.collection.AbstractMap.foreachEntry(Map.scala:405)
at 
kafka.server.DynamicBrokerConfig.fromPersistentProps(DynamicBrokerConfig.scala:402)
at 
kafka.server.DynamicBrokerConfig.$anonfun$updateBrokerConfig$1(DynamicBrokerConfig.scala:300)
at 
kafka.server.DynamicBrokerConfig.updateBrokerConfig(DynamicBrokerConfig.scala:299)
at 
kafka.server.BrokerConfigHandler.processConfigChanges(ConfigHandler.scala:221)
at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$15(BrokerMetadataPublisher.scala:212)
at java.base/java.util.HashMap$KeySet.forEach(HashMap.java:1008)
at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$14(BrokerMetadataPublisher.scala:190)
at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$14$adapted(BrokerMetadataPublisher.scala:189)
at scala.Option.foreach(Option.scala:437)
at 
kafka.server.metadata.BrokerMetadataPublisher.publish(BrokerMetadataPublisher.scala:189)
at 
kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:293)
at 
kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.$anonfun$run$2(BrokerMetadataListener.scala:126)
at 
kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.$anonfun$run$2$adapted(BrokerMetadataListener.scala:126)
at scala.Option.foreach(Option.scala:437)
at 
kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.run(BrokerMetadataListener.scala:126)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
at java.base/java.lang.Thread.run(Thread.java:833)
{code}.


If a {{password.encoder.secret}} is supplied, this still fails but with:


{code}
[2022-07-26 16:27:23,247] ERROR Dynamic password config 
listener.name.external.ssl.key.password could not be decoded, ignoring. 
(kafka.server.DynamicBrokerConfig)
java.lang.StringIndexOutOfBoundsException: begin 0, end -1, length 3
at java.base/java.lang.String.checkBoundsBeginEnd(String.java:4604)
at java.base/java.lang.String.substring(String.java:2707)
at kafka.utils.CoreUtils$.$anonfun$parseCsvMap$1(CoreUtils.scala:173)
at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:929)
at kafka.utils.CoreUtils$.parseCsvMap(CoreUtils.scala:171)
at kafka.utils.PasswordEncoder.decode(PasswordEncoder.scala:88)
at 
kafka.server.DynamicBrokerConfig.decodePassword$1(DynamicBrokerConfig.scala:393)
at 
kafka.server.DynamicBrokerConfig.$anonfun$fromPersistentProps$5(DynamicBrokerConfig.scala:404)
at 
kafka.server.DynamicBrokerConfig.$anonfun$fromPersistentProps$5$adapted(DynamicBrokerConfig.scala:402)
at 
kafka.utils.Implicits$MapExtensionMe

[jira] [Commented] (KAFKA-14079) Source task will not commit offsets and develops memory leak if "error.tolerance" is set to "all"

2022-07-26 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571601#comment-17571601
 ] 

Randall Hauch commented on KAFKA-14079:
---

Merged to the `3.3` branch with permission from the 3.3 release manager.

> Source task will not commit offsets and develops memory leak if 
> "error.tolerance" is set to "all"
> -
>
> Key: KAFKA-14079
> URL: https://issues.apache.org/jira/browse/KAFKA-14079
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Christopher L. Shannon
>Assignee: Christopher L. Shannon
>Priority: Critical
> Fix For: 3.3.0, 3.2.1, 3.4.0
>
>
> KAFKA-13348 added the ability to ignore producer exceptions by setting 
> {{error.tolerance}} to {{{}all{}}}.  When this is set to all a null record 
> metadata is passed to commitRecord() and the task continues.
> The issue is that records are tracked by {{SubmittedRecords}} and the first 
> time an error happens the code does not ack the record with the error and 
> just skips it so it will not have the offsets committed or be removed from 
> SubmittedRecords before calling commitRecord(). 
> This leads to a bug where future offsets won't be committed anymore and also 
> a memory leak because the algorithm that removes acked records from the 
> internal map to commit offsets [looks 
> |https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java#L177]
>  at the head of the Deque where the records are tracked in and if it sees the 
> record is unacked it will not process anymore removals. This leads to all new 
> records that go through the task to continue to be added and not have offsets 
> committed and never removed from tracking until an OOM error occurs.
> The fix is to make sure to ack the failed records so they can have their 
> offsets commited and be removed from tracking. This is fine to do as the 
> records are intended to be skipped and not reprocessed. Metrics also need to 
> be updated as well.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14079) Source task will not commit offsets and develops memory leak if "error.tolerance" is set to "all"

2022-07-26 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-14079:
--
Fix Version/s: 3.4.0

> Source task will not commit offsets and develops memory leak if 
> "error.tolerance" is set to "all"
> -
>
> Key: KAFKA-14079
> URL: https://issues.apache.org/jira/browse/KAFKA-14079
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Christopher L. Shannon
>Assignee: Christopher L. Shannon
>Priority: Critical
> Fix For: 3.3.0, 3.2.1, 3.4.0
>
>
> KAFKA-13348 added the ability to ignore producer exceptions by setting 
> {{error.tolerance}} to {{{}all{}}}.  When this is set to all a null record 
> metadata is passed to commitRecord() and the task continues.
> The issue is that records are tracked by {{SubmittedRecords}} and the first 
> time an error happens the code does not ack the record with the error and 
> just skips it so it will not have the offsets committed or be removed from 
> SubmittedRecords before calling commitRecord(). 
> This leads to a bug where future offsets won't be committed anymore and also 
> a memory leak because the algorithm that removes acked records from the 
> internal map to commit offsets [looks 
> |https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java#L177]
>  at the head of the Deque where the records are tracked in and if it sees the 
> record is unacked it will not process anymore removals. This leads to all new 
> records that go through the task to continue to be added and not have offsets 
> committed and never removed from tracking until an OOM error occurs.
> The fix is to make sure to ack the failed records so they can have their 
> offsets commited and be removed from tracking. This is fine to do as the 
> records are intended to be skipped and not reprocessed. Metrics also need to 
> be updated as well.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on pull request #12403: KAFKA-13166 Fix missing ControllerApis error handling

2022-07-26 Thread GitBox


cmccabe commented on PR #12403:
URL: https://github.com/apache/kafka/pull/12403#issuecomment-1195865697

   I looked into this more, and I think this approach could be a good start. 
But let's keep an eye out for if we can simplify this in the future. LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe opened a new pull request, #12443: MINOR: Convert some junit tests to kraft

2022-07-26 Thread GitBox


cmccabe opened a new pull request, #12443:
URL: https://github.com/apache/kafka/pull/12443

   Convert ProducerCompressionTest, MirrorMakerIntegrationTest, 
EdgeCaseRequestTest to kraft.
   
   Make it explicit that ServerShutdownTest#testControllerShutdownDuringSend is 
ZK-only.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12439: KAFKA-10199: Further refactor task lifecycle management

2022-07-26 Thread GitBox


guozhangwang commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r930278502


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java:
##
@@ -91,7 +92,7 @@ public boolean isActive() {
  * @throws StreamsException fatal error, should close the thread
  */
 @Override
-public void initializeIfNeeded() {
+public boolean initializeIfNeeded() {

Review Comment:
   I incorporated that by adding a `pendingTasksToRestore` set into `Tasks`. 
Please lmk wdyt.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12439: KAFKA-10199: Further refactor task lifecycle management

2022-07-26 Thread GitBox


guozhangwang commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r930248847


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##
@@ -544,7 +548,7 @@ public void updateInputPartitions(final Set 
topicPartitions, fin
 }
 
 @Override
-public void closeCleanAndRecycleState() {
+public void recycleAndConvert() {

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12439: KAFKA-10199: Further refactor task lifecycle management

2022-07-26 Thread GitBox


guozhangwang commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r930246317


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -377,7 +390,17 @@ public void handleAssignment(final Map> activeTasks,
 throw first.getValue();
 }
 
-tasks.createTasks(activeTasksToCreate, standbyTasksToCreate);
+createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+}
+
+private void createNewTasks(final Map> 
activeTasksToCreate,
+final Map> 
standbyTasksToCreate) {
+final Collection newActiveTasks = activeTasksToCreate.isEmpty() ?
+Collections.emptySet() : 
activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate);
+final Collection newStandbyTask = standbyTasksToCreate.isEmpty() 
?
+Collections.emptySet() : 
standbyTaskCreator.createTasks(standbyTasksToCreate);

Review Comment:
   Yeah you got me :) this is mainly for unit tests laziness since for many 
cases we would not need to create active/standby tasks, but still need to mock 
a no-op function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12439: KAFKA-10199: Further refactor task lifecycle management

2022-07-26 Thread GitBox


guozhangwang commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r930245276


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java:
##
@@ -77,6 +93,22 @@ public void registerTaskError(final Task task, final 
Throwable t, final long now
 }
 }
 
+Collection successfullyProcessed() {
+return successfullyProcessed;
+}
+
+void addToSuccessfullyProcessed(final Task task) {
+successfullyProcessed.add(task);
+}
+
+void removeTaskFromSuccessfullyProcessedBeforeClosing(final Task task) {
+successfullyProcessed.remove(task);
+}
+
+void clearSuccessfullyProcessed() {
+successfullyProcessed.clear();
+}
+

Review Comment:
   I tried to do that, but then I would need to move 
`topologyNameToErrorMetadata` into the Task Executor as well, which makes this 
metadata class a bit less useful. So I ended up still keeping them into the 
metadata, and leave the `Executor` just contain pure processing logic without 
bookkeeping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12439: KAFKA-10199: Further refactor task lifecycle management

2022-07-26 Thread GitBox


guozhangwang commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r930242497


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java:
##
@@ -77,6 +93,22 @@ public void registerTaskError(final Task task, final 
Throwable t, final long now
 }
 }
 
+Collection successfullyProcessed() {
+return successfullyProcessed;
+}
+
+void addToSuccessfullyProcessed(final Task task) {
+successfullyProcessed.add(task);
+}
+
+void removeTaskFromSuccessfullyProcessedBeforeClosing(final Task task) {
+successfullyProcessed.remove(task);
+}
+
+void clearSuccessfullyProcessed() {
+successfullyProcessed.clear();
+}
+

Review Comment:
   Sure thing, I can do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12442: KAFKA-10199: Bookkeep tasks during assignment for use with state updater

2022-07-26 Thread GitBox


guozhangwang commented on code in PR #12442:
URL: https://github.com/apache/kafka/pull/12442#discussion_r930193079


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -396,11 +460,8 @@ private Map> 
pendingTasksToCreate(final Map> 
tasksToRecycle,
   final Set tasksToCloseClean,
-  final Set tasksToCloseDirty,

Review Comment:
   Nice catch. Thanks for the cleanup!



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -64,6 +64,11 @@ class Tasks {
 private final Map> pendingActiveTasks = new 
HashMap<>();
 private final Map> pendingStandbyTasks = new 
HashMap<>();
 
+private final Set pendingActiveTasksToRecycle = new HashSet<>();

Review Comment:
   Maybe we can then rename the ones in 64/65 above as `pending..TasksToCreate` 
then?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -380,6 +346,104 @@ public void handleAssignment(final Map> activeTasks,
 tasks.createTasks(activeTasksToCreate, standbyTasksToCreate);
 }
 
+private void classifyTasksWithoutStateUpdater(final Map> activeTasksToCreate,
+  final Map> standbyTasksToCreate,
+  final Map> tasksToRecycle,
+  final Set 
tasksToCloseClean) {
+for (final Task task : tasks.allTasks()) {
+final TaskId taskId = task.id();
+if (activeTasksToCreate.containsKey(taskId)) {
+if (task.isActive()) {
+final Set topicPartitions = 
activeTasksToCreate.get(taskId);
+if (tasks.updateActiveTaskInputPartitions(task, 
topicPartitions)) {
+task.updateInputPartitions(topicPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+}
+task.resume();
+} else {
+tasksToRecycle.put(task, activeTasksToCreate.get(taskId));
+}
+activeTasksToCreate.remove(taskId);
+} else if (standbyTasksToCreate.containsKey(taskId)) {
+if (!task.isActive()) {
+final Set topicPartitions = 
standbyTasksToCreate.get(taskId);
+task.updateInputPartitions(topicPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+task.resume();
+} else {
+tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
+}
+standbyTasksToCreate.remove(taskId);
+} else {
+tasksToCloseClean.add(task);
+}
+}
+}
+
+private void classifyRunningTasks(final Map> 
activeTasksToCreate,
+  final Map> 
standbyTasksToCreate,
+  final Map> 
tasksToRecycle,
+  final Set tasksToCloseClean) {
+for (final Task task : tasks.allTasks()) {
+final TaskId taskId = task.id();
+if (activeTasksToCreate.containsKey(taskId)) {
+if (task.isActive()) {
+final Set topicPartitions = 
activeTasksToCreate.get(taskId);
+if (tasks.updateActiveTaskInputPartitions(task, 
topicPartitions)) {
+task.updateInputPartitions(topicPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+}
+task.resume();
+} else {
+throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
+}
+activeTasksToCreate.remove(taskId);
+} else if (standbyTasksToCreate.containsKey(taskId)) {
+if (!task.isActive()) {
+throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
+} else {
+tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
+}
+standbyTasksToCreate.remove(taskId);
+} else {
+tasksToCloseClean.add(task);
+}
+}
+}
+
+private void classifyTasksWithStateUpdater(final Map> activeTasksToCreate,
+   final Map> standbyTasksToCreate,
+   final Map> tasksToRecycle,
+   final Set 
tasksToCloseClean) {
+classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, 
tasksToRecycle, tasksToCloseClean);
+for (final Task task : stateUpdater.getTasks()) {
+final TaskId taskId = task.id();
+if (activeTas

[GitHub] [kafka] mumrah commented on pull request #12403: KAFKA-13166 Fix missing ControllerApis error handling

2022-07-26 Thread GitBox


mumrah commented on PR #12403:
URL: https://github.com/apache/kafka/pull/12403#issuecomment-1195746529

   Hey @cmccabe, thanks for the review! I agree it's a bit weird to return the 
completed feature here from the authorizer calls. I tried a few different 
approaches, and having all the handlers return the same type seemed to be a 
simple way to fix the error handling. The main issue I'm solving here is adding 
the final `whenComplete` to handle errors in completion handlers (which used to 
be `whenComplete`, but are now `handle` calls). 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14039) Fix KRaft AlterConfigPolicy usage

2022-07-26 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-14039:
-
Fix Version/s: 3.3.0
   3.4.0

> Fix KRaft AlterConfigPolicy usage
> -
>
> Key: KAFKA-14039
> URL: https://issues.apache.org/jira/browse/KAFKA-14039
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.3.0, 3.4.0
>
>
> In ConfigurationControlManager, we are currently passing all the 
> configuration values known to the controller down into the AlterConfigPolicy. 
> This does not match the behavior in ZK mode where we only pass configs which 
> were included in the alter configs request.
> This can lead to different unexpected behavior in custom AlterConfigPolicy 
> implementations



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12439: KAFKA-10199: Further refactor task lifecycle management

2022-07-26 Thread GitBox


guozhangwang commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r930178451


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -480,6 +503,21 @@ private void closeAndRecycleTasks(final Map> tasksToRe
 }
 }
 
+private void convertActiveToStandby(final StreamTask activeTask,
+final Set partitions) {
+activeTask.recycleAndConvert();

Review Comment:
   For standby tasks this is good; for active tasks my intention is to call 
`closeAndRemoveTaskProducerIfNeeded` right after the task is already in 
`closed` state, before the new standby task is created.
   
   Thinking about it a bit, maybe I can defer that after the new standby task 
is created already. That may feel a bit awkward but practically is still 
correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12295: KAFKA-13586: Prevent exception thrown during connector update from crashing distributed herder

2022-07-26 Thread GitBox


C0urante commented on PR #12295:
URL: https://github.com/apache/kafka/pull/12295#issuecomment-1195702185

   There's https://github.com/apache/kafka/pull/11792, but it's gone a bit 
stale (last activity was March 24th) and is marked as a draft. I've pinged the 
author to check on the status.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11792: Replace EasyMock/PowerMock with Mockito in DistributedHerderTest

2022-07-26 Thread GitBox


C0urante commented on PR #11792:
URL: https://github.com/apache/kafka/pull/11792#issuecomment-1195701534

   @jeff303 do you still plan to work on this? Happy to provide a round of 
review for what's here already if you'd like some feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12439: KAFKA-10199: Further refactor task lifecycle management

2022-07-26 Thread GitBox


guozhangwang commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r930175351


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -493,7 +531,9 @@ boolean tryToCompleteRestoration(final long now, final 
java.util.function.Consum
 final List activeTasks = new LinkedList<>();
 for (final Task task : tasks.allTasks()) {
 try {
-task.initializeIfNeeded();
+if (task.initializeIfNeeded() && stateUpdater != null) {
+stateUpdater.add(task);
+}

Review Comment:
   The motivation is that, if we add the newly created tasks to the state 
updater immediately, they will fail since the state updater expect their states 
to be `restoring / running`, i.e. after the tasks have already initialized.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12439: KAFKA-10199: Further refactor task lifecycle management

2022-07-26 Thread GitBox


guozhangwang commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r930170868


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##
@@ -211,13 +209,36 @@ private RecordCollector createRecordCollector(final 
TaskId taskId,
 );
 }
 
+/*
+ * TODO: we pass in the new input partitions to validate if they still 
match,
+ *   in the future we when we have fixed partitions -> tasks mapping,
+ *   we should always reuse the input partition and hence no need 
validations
+ */
 StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,
final Set 
inputPartitions,
final Consumer 
consumer) {
+if (!inputPartitions.equals(standbyTask.inputPartitions)) {
+log.warn("Detected unmatched input partitions for task {} when 
recycling it from standby to active", standbyTask.id);
+}
+
 final RecordCollector recordCollector = 
createRecordCollector(standbyTask.id, getLogContext(standbyTask.id), 
standbyTask.topology);
-final StreamTask task = standbyTask.recycle(time, cache, 
recordCollector, inputPartitions, consumer);
+final StreamTask task = new StreamTask(
+standbyTask.id,
+inputPartitions,
+standbyTask.topology,
+consumer,
+standbyTask.config,
+streamsMetrics,
+stateDirectory,
+cache,
+time,
+standbyTask.stateMgr,
+recordCollector,
+standbyTask.processorContext,
+standbyTask.logContext
+);
 
-log.trace("Created active task {} with assigned partitions {}", 
task.id, inputPartitions);
+log.trace("Recycled active task {} from recycled standby with assigned 
partitions {}", task.id, inputPartitions);

Review Comment:
   ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12439: KAFKA-10199: Further refactor task lifecycle management

2022-07-26 Thread GitBox


guozhangwang commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r929297148


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1213,7 +1270,6 @@ void maybeCloseTasksFromRemovedTopologies(final 
Set currentNamedTopologi
 
 final Set allTasksToRemove = union(HashSet::new, 
activeTasksToRemove, standbyTasksToRemove);
 closeAndCleanUpTasks(activeTasksToRemove, standbyTasksToRemove, 
true);
-allTasksToRemove.forEach(tasks::removeTask);

Review Comment:
   I found this is redundant code, since in the above `closeAndCleanUpTasks` 
we've already triggered `tasks#removeTask`.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java:
##
@@ -77,6 +93,22 @@ public void registerTaskError(final Task task, final 
Throwable t, final long now
 }
 }
 
+Collection successfullyProcessed() {

Review Comment:
   This bookkeeping logic is moved from `Tasks`, as part of 2).



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##
@@ -544,7 +544,7 @@ public void updateInputPartitions(final Set 
topicPartitions, fin
 }
 
 @Override
-public void closeCleanAndRecycleState() {
+public void recycleAndConvert() {

Review Comment:
   We consolidate the `closeCleanAndRecycleState` and the `recycle` below into 
this single function now, and extract the creation of the newly recycled task 
into the creators, as part of 1). Ditto in StandbyTask.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-26 Thread GitBox


ijuma commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1195681900

   Option 3 says not to continue JUnit 5 migrations though - it says to do the 
mockito migration first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-26 Thread GitBox


clolov commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1195674893

   Divij and I also prefer option 3 because of the same reasons posted by 
Bruno. So the next steps on our side are:
   1) Address comments on https://github.com/apache/kafka/pull/12441 which 
should bring us back to a state where tests are ran on every pull request
   2) Continue with the smaller pull requests for unit test migration from 
JUnit 4 to JUnit 5 which do not depend on PowerMock/EasyMock
   3) Raise (multiple) pull request(s) to move PowerMock/EasyMock tests in 
Streams to Mockito
   4) Once 3 is done prepare pull request(s) for them to be upgraded to JUnit 5
   5) Cleanup the setup which allows JUnit 4 tests to run in Streams + any new 
tests which might have appeared in the interim
   
   I will be away until 2nd of August so Divij will provide updates on 
https://github.com/apache/kafka/pull/12441. Once I am back I will continue with 
the plan as detailed above. Of course, if there are any suggestions for it to 
be done otherwise I am more than happy to discuss those.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12439: KAFKA-10199: Further refactor task lifecycle management

2022-07-26 Thread GitBox


cadonna commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r930069951


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##
@@ -211,13 +209,36 @@ private RecordCollector createRecordCollector(final 
TaskId taskId,
 );
 }
 
+/*
+ * TODO: we pass in the new input partitions to validate if they still 
match,
+ *   in the future we when we have fixed partitions -> tasks mapping,
+ *   we should always reuse the input partition and hence no need 
validations
+ */
 StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,
final Set 
inputPartitions,
final Consumer 
consumer) {
+if (!inputPartitions.equals(standbyTask.inputPartitions)) {
+log.warn("Detected unmatched input partitions for task {} when 
recycling it from standby to active", standbyTask.id);
+}
+
 final RecordCollector recordCollector = 
createRecordCollector(standbyTask.id, getLogContext(standbyTask.id), 
standbyTask.topology);
-final StreamTask task = standbyTask.recycle(time, cache, 
recordCollector, inputPartitions, consumer);
+final StreamTask task = new StreamTask(
+standbyTask.id,
+inputPartitions,
+standbyTask.topology,
+consumer,
+standbyTask.config,
+streamsMetrics,
+stateDirectory,
+cache,
+time,
+standbyTask.stateMgr,
+recordCollector,
+standbyTask.processorContext,
+standbyTask.logContext
+);
 
-log.trace("Created active task {} with assigned partitions {}", 
task.id, inputPartitions);
+log.trace("Recycled active task {} from recycled standby with assigned 
partitions {}", task.id, inputPartitions);

Review Comment:
   ```suggestion
   log.trace("Create active task {} from recycled standby task with 
assigned partitions {}", task.id, inputPartitions);
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -493,7 +531,9 @@ boolean tryToCompleteRestoration(final long now, final 
java.util.function.Consum
 final List activeTasks = new LinkedList<>();
 for (final Task task : tasks.allTasks()) {
 try {
-task.initializeIfNeeded();
+if (task.initializeIfNeeded() && stateUpdater != null) {
+stateUpdater.add(task);
+}

Review Comment:
   I am not sure this is the right place to add the task to the state updater. 
I would rather add brand new tasks in `createNewTasks()`. We can add recycled 
tasks once we read the tasks to recycle from the removed tasks queue of the 
state updater and we recycled them. 



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##
@@ -219,7 +219,7 @@ public boolean isActive() {
  * @throws StreamsException fatal error, should close the thread
  */
 @Override
-public void initializeIfNeeded() {
+public boolean initializeIfNeeded() {

Review Comment:
   See my comment in `StandbyTask` regarding this return value.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java:
##
@@ -109,11 +107,30 @@ Collection createTasks(final Map> tasksToBeCre
 return createdTasks;
 }
 
+/*
+ * TODO: we pass in the new input partitions to validate if they still 
match,
+ *   in the future we when we have fixed partitions -> tasks mapping,
+ *   we should always reuse the input partition and hence no need 
validations
+ */
 StandbyTask createStandbyTaskFromActive(final StreamTask streamTask,

Review Comment:
   Could you please add a unit test for this method?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java:
##
@@ -91,7 +92,7 @@ public boolean isActive() {
  * @throws StreamsException fatal error, should close the thread
  */
 @Override
-public void initializeIfNeeded() {
+public boolean initializeIfNeeded() {

Review Comment:
   I think you do not need to add the boolean return value here. See my other 
comment in `tryToCompleteRestoration()`. I would rather extract the if-branch 
of `if (state() == State.CREATED)` to a separate method and call that method in 
`TaskManager#createNewTasks()`. WDYT?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java:
##
@@ -77,6 +93,22 @@ public void registerTaskError(final Task task, final 
Throwable t, final long now
 }
 }
 
+Collection successfullyProcessed() {
+return successfullyProcessed;
+}
+
+void addToSuccessfullyProcessed(

[GitHub] [kafka] ijuma commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-26 Thread GitBox


ijuma commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1195649633

   I am fine with 3 as long as we don't make changes that make it worse for 
modules that have already converted to JUnit 5. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14110) Streaming recursion in Kafka Streams

2022-07-26 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-14110:
-
Description: 
[KIP-857 Streaming recursion in Kafka 
Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams]

Some use-cases require {_}streaming recursion{_}, which involves piping the 
output of a pipeline back to the input of the same pipeline. An example of this 
is graph/tree-traversal, where it can be useful to recursively traverse up a 
tree as new leaf nodes arrive.

See KIP for more details.

  was:Some use-cases require {_}streaming recursion{_}, which involves piping 
the output of a pipeline back to the input of the same pipeline. An example of 
this is graph/tree-traversal, where it can be useful to recursively traverse up 
a tree as new leaf nodes arrive.


> Streaming recursion in Kafka Streams
> 
>
> Key: KAFKA-14110
> URL: https://issues.apache.org/jira/browse/KAFKA-14110
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Priority: Major
>
> [KIP-857 Streaming recursion in Kafka 
> Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams]
> Some use-cases require {_}streaming recursion{_}, which involves piping the 
> output of a pipeline back to the input of the same pipeline. An example of 
> this is graph/tree-traversal, where it can be useful to recursively traverse 
> up a tree as new leaf nodes arrive.
> See KIP for more details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14110) Streaming recursion in Kafka Streams

2022-07-26 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-14110:
-
Description: Some use-cases require {_}streaming recursion{_}, which 
involves piping the output of a pipeline back to the input of the same 
pipeline. An example of this is graph/tree-traversal, where it can be useful to 
recursively traverse up a tree as new leaf nodes arrive.

> Streaming recursion in Kafka Streams
> 
>
> Key: KAFKA-14110
> URL: https://issues.apache.org/jira/browse/KAFKA-14110
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Priority: Major
>
> Some use-cases require {_}streaming recursion{_}, which involves piping the 
> output of a pipeline back to the input of the same pipeline. An example of 
> this is graph/tree-traversal, where it can be useful to recursively traverse 
> up a tree as new leaf nodes arrive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14110) Streaming recursion in Kafka Streams

2022-07-26 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-14110:


 Summary: Streaming recursion in Kafka Streams
 Key: KAFKA-14110
 URL: https://issues.apache.org/jira/browse/KAFKA-14110
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Nicholas Telford






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on pull request #12295: KAFKA-13586: Prevent exception thrown during connector update from crashing distributed herder

2022-07-26 Thread GitBox


ijuma commented on PR #12295:
URL: https://github.com/apache/kafka/pull/12295#issuecomment-1195629498

   Do we have a PR for converting this particular test to use mockito or not 
yet?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #12438: KAFKA-13868: Replace YouTube embedded video with links on streams page

2022-07-26 Thread GitBox


mimaison commented on code in PR #12438:
URL: https://github.com/apache/kafka/pull/12438#discussion_r930099870


##
docs/streams/index.html:
##
@@ -35,32 +35,25 @@ Kafka Streams
 
 The easiest way to write mission-critical 
real-time applications and microservices
Kafka Streams is a client library for 
building applications and microservices, where the input and output data are 
stored in Kafka clusters. It combines the simplicity of writing and deploying 
standard Java and Scala applications on the client side with the benefits of 
Kafka's server-side cluster technology.
+   
+   VIDEO TOUR OF THE STREAMS API

-  
-
-https://www.youtube.com/embed/Z3JKCLG3VP4?rel=0&showinfo=0&end=602"; 
frameborder="0" allowfullscreen>
-https://www.youtube.com/embed/LxxeXI1mPKo?rel=0&showinfo=0&end=622"; 
frameborder="0" allowfullscreen>
-https://www.youtube.com/embed/7JYEEx7SBuE?rel=0&showinfo=0end=557"; 
frameborder="0" allowfullscreen>
-https://www.youtube.com/embed/3kJgYIkAeHs?rel=0&showinfo=0&end=564"; 
frameborder="0" allowfullscreen>
- 
-
-
-TOUR OF THE STREAMS API
-
-   
-  1Intro to Streams
-   
-   
-  2Creating a Streams Application
-   
-   
-  3Transforming Data Pt. 1
-   
-   
-  4Transforming Data Pt. 2
-   
-
-
+ 
+   
+   
+   1https://www.youtube.com/embed/Z3JKCLG3VP4";>Intro to Streams
+   
+   
+   2https://www.youtube.com/embed/LxxeXI1mPKo";>Creating a Streams Application
+   
+   
+   3https://www.youtube.com/embed/7JYEEx7SBuE";>Transforming Data Pt. 1
+   
+   
+   4https://www.youtube.com/embed/3kJgYIkAeHs";>Transforming Data Pt. 11

Review Comment:
   As we want to website fixed asap, I think the best is to open a similar PR 
against kafka-site. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #12438: KAFKA-13868: Replace YouTube embedded video with links on streams page

2022-07-26 Thread GitBox


mimaison merged PR #12438:
URL: https://github.com/apache/kafka/pull/12438


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12438: KAFKA-13868: Replace YouTube embedded video with links on streams page

2022-07-26 Thread GitBox


divijvaidya commented on code in PR #12438:
URL: https://github.com/apache/kafka/pull/12438#discussion_r930066946


##
docs/streams/index.html:
##
@@ -35,32 +35,25 @@ Kafka Streams
 
 The easiest way to write mission-critical 
real-time applications and microservices
Kafka Streams is a client library for 
building applications and microservices, where the input and output data are 
stored in Kafka clusters. It combines the simplicity of writing and deploying 
standard Java and Scala applications on the client side with the benefits of 
Kafka's server-side cluster technology.
+   
+   VIDEO TOUR OF THE STREAMS API

-  
-
-https://www.youtube.com/embed/Z3JKCLG3VP4?rel=0&showinfo=0&end=602"; 
frameborder="0" allowfullscreen>
-https://www.youtube.com/embed/LxxeXI1mPKo?rel=0&showinfo=0&end=622"; 
frameborder="0" allowfullscreen>
-https://www.youtube.com/embed/7JYEEx7SBuE?rel=0&showinfo=0end=557"; 
frameborder="0" allowfullscreen>
-https://www.youtube.com/embed/3kJgYIkAeHs?rel=0&showinfo=0&end=564"; 
frameborder="0" allowfullscreen>
- 
-
-
-TOUR OF THE STREAMS API
-
-   
-  1Intro to Streams
-   
-   
-  2Creating a Streams Application
-   
-   
-  3Transforming Data Pt. 1
-   
-   
-  4Transforming Data Pt. 2
-   
-
-
+ 
+   
+   
+   1https://www.youtube.com/embed/Z3JKCLG3VP4";>Intro to Streams
+   
+   
+   2https://www.youtube.com/embed/LxxeXI1mPKo";>Creating a Streams Application
+   
+   
+   3https://www.youtube.com/embed/7JYEEx7SBuE";>Transforming Data Pt. 1
+   
+   
+   4https://www.youtube.com/embed/3kJgYIkAeHs";>Transforming Data Pt. 11

Review Comment:
   Of course. Although the current website [still mentions "Pt. 
11"](https://kafka.apache.org/documentation/streams/) but I guess it's because 
we haven't deployed the latest code for this page over there.
   
   I have changed it to 2.
   
   Also, please let me know what is the next step to "deploy" the change into 
kafka-site?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #12442: KAFKA-10199: Bookkeep tasks during assignment for use with state updater

2022-07-26 Thread GitBox


cadonna opened a new pull request, #12442:
URL: https://github.com/apache/kafka/pull/12442

   Bookkeeps tasks to be recycled, closed, and updated during handling of
   the assignment. The bookkeeping is needed for integrating the
   state updater.
   
   These change is hidden behind internal config STATE_UPDATER_ENABLED.
   If the config is false Streams should not use the state updater
   and behave as usual.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-26 Thread GitBox


cadonna commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1195567473

   I agree that the migration should be done quickly. 
   As far as I understand @clolov plans to do the whole migration quickly. Is 
this correct? If this is the case, I do not see the issue with making the 
migration incrementally for the sake of smaller PRs that are easier reviewed.  
   If you create one big PR for the migration you might run into the issue that 
during the review other PRs add tests in JUnit4. You would need to rebase and 
extend the PR to get green builds. 
   I agree that we should get the migration from EasyMock/PowerMock to Mockito 
done before we do the migration from JUnit4 to JUnit5 to make the reviews 
simpler.
   So our option now are:
   1. Revert this PR, do the migration to Mockito incrementally, and do the 
migration to JUnit 5 in one shot
   2. Do the change in the builds to allow JUnit 5 along with JUnit 4 and do 
both migrations incrementally.
   3. Do the change in the builds to allow JUnit 5 along with JUnit 4, migrate 
to Mockito, and then migrate all tests to JUnit 5.
   
   For options 2 and 3 committers should be made aware to only accept JUnit 5 
tests in new PRs. That should be easy for Streams. 
   
   I am in favor of 3 because it happened to me that I already added some tests 
in JUnit5 in some of my PRs because I was not aware that JUnit 4 and JUnit 5 do 
not run alongside each other in our builds. Migrating the JUnit 5 tests to 
JUnit 4 in my PRs do not seem to be a big deal, though. 
   
   I am also fine with option 1. Option 2 seems the oddest to me.
   
   I would like to know the preferences of @clolov and @divijvaidya since they 
are driving this migration?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14101) Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary

2022-07-26 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571464#comment-17571464
 ] 

Tom Bentley commented on KAFKA-14101:
-

Both those seem reasonable to me [~ChrisEgerton]. Thanks.

> Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary
> 
>
> Key: KAFKA-14101
> URL: https://issues.apache.org/jira/browse/KAFKA-14101
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Attachments: 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary.test.stdout
>
>
> I hit this one while running the tests on your branch from 
> https://github.com/apache/kafka/pull/12429
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest > 
> testConnectorBoundary FAILED
> java.lang.AssertionError: Committed records should exclude 
> connector-aborted transactions expected:<[1, 3, 4, 5, 9, 10, 11, 12, 13, 14, 
> 15, 16, 17, 18, 19, 20, 21, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 
> 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 
> 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 
> 85, 86, 87, 88, 89]> but was:<[4, 5, 10, 13, 16, 18, 20, 37, 39, 40, 46, 47, 
> 49, 54, 59, 64, 65, 68, 70, 71, 77, 83, 85, 89, 146, 148, 153, 154, 157, 158, 
> 159, 163, 165, 169, 175, 176, 178, 183, 184, 185, 187, 188, 191, 196, 199, 
> 211, 216, 217, 218, 222, 223, 229, 232, 238, 244, 251, 255, 259, 261, 269, 
> 272, 274, 275, 276, 277, 278, 279, 285, 291, 293, 296, 299]>
> at org.junit.Assert.fail(Assert.java:89)
> at org.junit.Assert.failNotEquals(Assert.java:835)
> at org.junit.Assert.assertEquals(Assert.java:120)
> at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:456)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14101) Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary

2022-07-26 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571454#comment-17571454
 ] 

Chris Egerton commented on KAFKA-14101:
---

[~tombentley] Yeah, a renaming would help here. Given that the contract of the 
method is that it consumes from the topic until it has read at least the given 
number of records in the given duration, and throws an exception if it's unable 
to, what do you think about {{{}consumeAtLeast{}}}?

A note on the Javadocs about out-of-order consumption of records probably 
wouldn't hurt either, right?

> Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary
> 
>
> Key: KAFKA-14101
> URL: https://issues.apache.org/jira/browse/KAFKA-14101
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Attachments: 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary.test.stdout
>
>
> I hit this one while running the tests on your branch from 
> https://github.com/apache/kafka/pull/12429
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest > 
> testConnectorBoundary FAILED
> java.lang.AssertionError: Committed records should exclude 
> connector-aborted transactions expected:<[1, 3, 4, 5, 9, 10, 11, 12, 13, 14, 
> 15, 16, 17, 18, 19, 20, 21, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 
> 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 
> 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 
> 85, 86, 87, 88, 89]> but was:<[4, 5, 10, 13, 16, 18, 20, 37, 39, 40, 46, 47, 
> 49, 54, 59, 64, 65, 68, 70, 71, 77, 83, 85, 89, 146, 148, 153, 154, 157, 158, 
> 159, 163, 165, 169, 175, 176, 178, 183, 184, 185, 187, 188, 191, 196, 199, 
> 211, 216, 217, 218, 222, 223, 229, 232, 238, 244, 251, 255, 259, 261, 269, 
> 272, 274, 275, 276, 277, 278, 279, 285, 291, 293, 296, 299]>
> at org.junit.Assert.fail(Assert.java:89)
> at org.junit.Assert.failNotEquals(Assert.java:835)
> at org.junit.Assert.assertEquals(Assert.java:120)
> at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:456)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on a diff in pull request #12438: KAFKA-13868: Replace YouTube embedded video with links on streams page

2022-07-26 Thread GitBox


mimaison commented on code in PR #12438:
URL: https://github.com/apache/kafka/pull/12438#discussion_r930015414


##
docs/streams/index.html:
##
@@ -35,32 +35,25 @@ Kafka Streams
 
 The easiest way to write mission-critical 
real-time applications and microservices
Kafka Streams is a client library for 
building applications and microservices, where the input and output data are 
stored in Kafka clusters. It combines the simplicity of writing and deploying 
standard Java and Scala applications on the client side with the benefits of 
Kafka's server-side cluster technology.
+   
+   VIDEO TOUR OF THE STREAMS API

-  
-
-https://www.youtube.com/embed/Z3JKCLG3VP4?rel=0&showinfo=0&end=602"; 
frameborder="0" allowfullscreen>
-https://www.youtube.com/embed/LxxeXI1mPKo?rel=0&showinfo=0&end=622"; 
frameborder="0" allowfullscreen>
-https://www.youtube.com/embed/7JYEEx7SBuE?rel=0&showinfo=0end=557"; 
frameborder="0" allowfullscreen>
-https://www.youtube.com/embed/3kJgYIkAeHs?rel=0&showinfo=0&end=564"; 
frameborder="0" allowfullscreen>
- 
-
-
-TOUR OF THE STREAMS API
-
-   
-  1Intro to Streams
-   
-   
-  2Creating a Streams Application
-   
-   
-  3Transforming Data Pt. 1
-   
-   
-  4Transforming Data Pt. 2
-   
-
-
+ 
+   
+   
+   1https://www.youtube.com/embed/Z3JKCLG3VP4";>Intro to Streams
+   
+   
+   2https://www.youtube.com/embed/LxxeXI1mPKo";>Creating a Streams Application
+   
+   
+   3https://www.youtube.com/embed/7JYEEx7SBuE";>Transforming Data Pt. 1
+   
+   
+   4https://www.youtube.com/embed/3kJgYIkAeHs";>Transforming Data Pt. 11

Review Comment:
   Can we keep `Transforming Data Pt. 2` instead of `Transforming Data Pt. 11`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13982) Replace EasyMock and PowerMock with Mockito for WorkerConfigTransformerTest

2022-07-26 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-13982:
--
Fix Version/s: 3.4.0

> Replace EasyMock and PowerMock with Mockito for WorkerConfigTransformerTest
> ---
>
> Key: KAFKA-13982
> URL: https://issues.apache.org/jira/browse/KAFKA-13982
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: connect, runtime
> Fix For: 3.4.0
>
>
> I believe the file 
> connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
>  still needs to be migrated from using PowerMock to Mockito.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12441: KAFKA-14108: Ensure both JUnit 4 and JUnit 5 tests run

2022-07-26 Thread GitBox


divijvaidya commented on code in PR #12441:
URL: https://github.com/apache/kafka/pull/12441#discussion_r930013162


##
build.gradle:
##
@@ -466,6 +466,10 @@ subprojects {
 if (shouldUseJUnit5) {
   useJUnitPlatform {
 includeTags "integration"
+// KAFKA-14109
+// Both engines are needed to run JUnit 4 tests alongside JUnit 5 
tests.
+// junit-vintage (JUnit 4) can be removed once the JUnit 4 migration 
is complete.
+includeEngines "junit-vintage", "junit-jupiter"

Review Comment:
   Is there a disadvantage/side-effect if we do it for all the modules? 
   
   From what I understand (please correct me if I am wrong), loading the 
vintage engine for JUnit5 tests won't have any side effects since they would be 
run with jupiter engine anyways. Vintage engine only runs Junit4 tests in a 
Junit5 platform. It does not impact already converted Junit5 tests running on 
Junit5 platform.
   
   I am advocating for this because it keeps the changes minimal and simplified 
here. Given that it is a temporary transient stage (we already have PRs out for 
most of the test conversion to JUnit5), I would preferr minimal changes so that 
reverting them is easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13586) ConfigExceptions thrown by FileConfigProvider during connector/task startup crash worker

2022-07-26 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-13586:
-

Assignee: (was: Greg Harris)

> ConfigExceptions thrown by FileConfigProvider during connector/task startup 
> crash worker
> 
>
> Key: KAFKA-13586
> URL: https://issues.apache.org/jira/browse/KAFKA-13586
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Greg Harris
>Priority: Minor
>
> If the filesystems of a multi-worker connect cluster are inconsistent, the 
> FileConfigProvider may be able to find a configuration on worker A but not 
> worker B.
> This may lead to worker B experiencing a crash when given a connector/task 
> assignment that was previously validated by worker A.
> Steps to reproduce:
> 1. Configure a two-worker Connect cluster to use the FileConfigProvider
> 2. Place a secret file on worker A (leader) but not worker B (member).
> 3. Create a connector via REST which references the secret file on-disk.
> 4. Observe that the connector creation succeeds
> 5. Wait for a rebalance which assigns either the connector or task to worker 
> B.
> Expected behavior:
> The connector/task is marked FAILED, and the exception is attributed to the 
> FileConfigProvider not able to find the file.
> Actual behavior:
> Worker B prints this log message and shuts down:
> {noformat}
> [Worker clientId=connect-1, groupId=my-connect-cluster] Uncaught exception in 
> herder work thread, exiting: 
> 2org.apache.kafka.common.config.ConfigException: Invalid value 
> java.nio.file.NoSuchFileException: /path/to/secrets/file.properties for 
> configuration Could not read properties from file 
> /path/to/secrets/file.properties
>   at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:92)
>   at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>   at 
> org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:135)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1464)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.processConnectorConfigUpdates(DistributedHerder.java:638)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:457)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:326)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
>   at java.base/java.lang.Thread.run(Thread.java:831){noformat}
> Having an inconsistent filesystem is not a recommended configuration, but it 
> is preferable in such situations to prevent such a connector configuration 
> error from crashing the worker irrecoverably.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #12295: KAFKA-13586: Prevent exception thrown during connector update from crashing distributed herder

2022-07-26 Thread GitBox


C0urante commented on PR #12295:
URL: https://github.com/apache/kafka/pull/12295#issuecomment-1195531314

   @dstelljes I just noticed that, with the changes in this PR, it's no longer 
possible to run individual test cases in this suite. For example, this command 
now fails:
   
   ```bash
   ./gradlew :connect:runtime:test --tests 
org.apache.kafka.connect.runtime.distributed.DistributedHerderTest.testConnectorConfigUpdate
   ```
   
   I'm also unable to run individual tests in my IDE (IntelliJ).
   
   This isn't necessarily a blocker but if we could fix it that'd be nice. 
Otherwise, since we're planning on migrating tests from EasyMock/PowerMock to 
Mockito, I can make a note on Jira to revisit this after we've done that for 
this test suite. cc @ijuma in case you have context here or would recommend a 
different approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12441: KAFKA-14108: Ensure both JUnit 4 and JUnit 5 tests run

2022-07-26 Thread GitBox


ijuma commented on code in PR #12441:
URL: https://github.com/apache/kafka/pull/12441#discussion_r929998599


##
gradle/dependencies.gradle:
##
@@ -159,6 +159,10 @@ libs += [
   jmhGeneratorAnnProcess: 
"org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
   joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
   jose4j: "org.bitbucket.b_c:jose4j:$versions.jose4j",
+  // KAFKA-14109
+  // The below dependency is needed for compiling JUnit 4 tests.
+  // It can be safely removed once all of streams has moved to JUnit 5.
+  junit4: "junit:junit:$versions.junit4",

Review Comment:
   This explanation is odd - compilation works currently and we don't have this 
dependency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12441: KAFKA-14108: Ensure both JUnit 4 and JUnit 5 tests run

2022-07-26 Thread GitBox


ijuma commented on code in PR #12441:
URL: https://github.com/apache/kafka/pull/12441#discussion_r929996624


##
build.gradle:
##
@@ -466,6 +466,10 @@ subprojects {
 if (shouldUseJUnit5) {
   useJUnitPlatform {
 includeTags "integration"
+// KAFKA-14109

Review Comment:
   We don't typically include jira references in the code like this (it 
pollutes the code and goes stale typically).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12441: KAFKA-14108: Ensure both JUnit 4 and JUnit 5 tests run

2022-07-26 Thread GitBox


cadonna commented on PR #12441:
URL: https://github.com/apache/kafka/pull/12441#issuecomment-119551

   @clolov I also found a mistake in #12285:
   In `KTableSourceTopicRestartIntegrationTest` line 103 should be 
   ```
   sourceTopic = SOURCE_TOPIC + "-" + 
testInfo.getTestMethod().map(Method::getName).orElse("");`
   ```
   instead of 
   ```
   sourceTopic = SOURCE_TOPIC + "-" + 
testInfo.getTestMethod().map(Method::getName);
   ```
   otherwise the test does not run wince the topic name contains illegal 
characters.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12441: KAFKA-14108: Ensure both JUnit 4 and JUnit 5 tests run

2022-07-26 Thread GitBox


ijuma commented on code in PR #12441:
URL: https://github.com/apache/kafka/pull/12441#discussion_r929996127


##
build.gradle:
##
@@ -466,6 +466,10 @@ subprojects {
 if (shouldUseJUnit5) {
   useJUnitPlatform {
 includeTags "integration"
+// KAFKA-14109
+// Both engines are needed to run JUnit 4 tests alongside JUnit 5 
tests.
+// junit-vintage (JUnit 4) can be removed once the JUnit 4 migration 
is complete.
+includeEngines "junit-vintage", "junit-jupiter"

Review Comment:
   We don't want to do this for all modules since most of them have already 
been converted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-26 Thread GitBox


ijuma commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1195515462

   Doing things incrementally over a long period of time is a bit confusing 
since you cannot enforce that JUnit 4 or JUnit 5 is used at any point in time. 
My take is that it's simplest if the conversion is done quickly. However, it's 
best to convert the tests to Mockito first - that part is more complicated and 
can be done incrementally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito

2022-07-26 Thread GitBox


C0urante merged PR #12422:
URL: https://github.com/apache/kafka/pull/12422


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito

2022-07-26 Thread GitBox


C0urante commented on PR #12422:
URL: https://github.com/apache/kafka/pull/12422#issuecomment-1195498271

   Thanks @ijuma. No test failures in Connect and all others appear unrelated; 
this looks good to go.
   
   Thanks for the PR @clolov!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12441: KAFKA-14108: Ensure both JUnit 4 and JUnit 5 tests run

2022-07-26 Thread GitBox


cadonna commented on PR #12441:
URL: https://github.com/apache/kafka/pull/12441#issuecomment-1195488452

   Thanks for the PR, @clolov !
   
   The builds have errors:
   ```
   * What went wrong:
   [2022-07-26T12:41:05.041Z] A problem occurred evaluating root project 
'Kafka_kafka-pr_PR-12441'.
   [2022-07-26T12:41:05.041Z] > Cannot convert a null value to an object of 
type Dependency.
   [2022-07-26T12:41:05.041Z]   The following types/formats are supported:
   [2022-07-26T12:41:05.041Z] - Instances of Dependency.
   [2022-07-26T12:41:05.041Z] - String or CharSequence values, for example 
'org.gradle:gradle-core:1.0'.
   [2022-07-26T12:41:05.041Z] - Maps, for example [group: 'org.gradle', 
name: 'gradle-core', version: '1.0'].
   [2022-07-26T12:41:05.041Z] - FileCollections, for example 
files('some.jar', 'someOther.jar').
   [2022-07-26T12:41:05.041Z] - Projects, for example 
project(':some:project:path').
   [2022-07-26T12:41:05.041Z] - ClassPathNotation, for example gradleApi().
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] soarez commented on pull request #12314: KAFKA-13903: Queue size metric in QuorumController

2022-07-26 Thread GitBox


soarez commented on PR #12314:
URL: https://github.com/apache/kafka/pull/12314#issuecomment-1195483345

   @mumrah could you take another look? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12413: MINOR: Upgrade to Gradle 7.5

2022-07-26 Thread GitBox


showuon commented on PR #12413:
URL: https://github.com/apache/kafka/pull/12413#issuecomment-1195464946

   > I'll do some testing regarding that, but will merge this PR in the 
meantime.
   
   Sure. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #12413: MINOR: Upgrade to Gradle 7.5

2022-07-26 Thread GitBox


ijuma merged PR #12413:
URL: https://github.com/apache/kafka/pull/12413


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12413: MINOR: Upgrade to Gradle 7.5

2022-07-26 Thread GitBox


ijuma commented on PR #12413:
URL: https://github.com/apache/kafka/pull/12413#issuecomment-1195448786

   > Just curious, the latest zinc is 1.7.1 (ref: 
https://mvnrepository.com/artifact/org.scala-sbt/zinc), why did we only upgrade 
to 1.6.1? I'm fine to stick to 1.6.1 since it fixes the log4j2 security issue.
   
   Gradle 7.5 was tested with zinc 1.6.1 and I'm not sure if zinc 1.7.1 has 
incompatible changes with regards to Gradle. I'll do some testing regarding 
that, but will merge this PR in the meantime.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

2022-07-26 Thread GitBox


mimaison commented on PR #12429:
URL: https://github.com/apache/kafka/pull/12429#issuecomment-1195440012

   @tombentley Do you want to take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12441: KAFKA-14108: Ensure both JUnit 4 and JUnit 5 tests run

2022-07-26 Thread GitBox


divijvaidya commented on PR #12441:
URL: https://github.com/apache/kafka/pull/12441#issuecomment-1195438950

   @clolov 
   1. don't we need to remove "streams" from 
   ```
 def shouldUseJUnit5 = !(["runtime", "streams"].contains(it.project.name))
   ```
   as well?
   
   2. please add the test report for "before" the change and for "after" the 
change. This would help us validate that number of tests run "after" this 
change is greater than or equal to "before" the change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-26 Thread GitBox


clolov commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1195428834

   Hello @cadonna! I spoke with Divij offline and we found a way to run both 
JUnit 4 and JUnit 5 tests while the migration is going on. The pull request 
which should enable this is https://github.com/apache/kafka/pull/12441.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov opened a new pull request, #12441: KAFKA-14108: Ensure both JUnit 4 and JUnit 5 tests run

2022-07-26 Thread GitBox


clolov opened a new pull request, #12441:
URL: https://github.com/apache/kafka/pull/12441

   This pull request addresses the problem reported in 
https://github.com/apache/kafka/pull/12285 and tracked in 
https://issues.apache.org/jira/browse/KAFKA-14108


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14107) Upgrade Jetty for CVE fixes

2022-07-26 Thread Andrew Borley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571392#comment-17571392
 ] 

Andrew Borley commented on KAFKA-14107:
---

I think this is a dup of https://issues.apache.org/jira/browse/KAFKA-14100

> Upgrade Jetty for CVE fixes
> ---
>
> Key: KAFKA-14107
> URL: https://issues.apache.org/jira/browse/KAFKA-14107
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.2.0
>Reporter: Andrew Borley
>Priority: Minor
>  Labels: security
> Fix For: 3.3.0
>
>
> There are a couple of CVEs for Jetty:
> - [CVE-2022-2048](https://nvd.nist.gov/vuln/detail/CVE-2022-2048)
> - [CVE-2022-2047](https://nvd.nist.gov/vuln/detail/CVE-2022-2047)
> Fixed by upgrading to 9.4.48.v20220622+



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14109) Clean up JUnit 4 test infrastructure

2022-07-26 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-14109:
-

 Summary: Clean up JUnit 4 test infrastructure
 Key: KAFKA-14109
 URL: https://issues.apache.org/jira/browse/KAFKA-14109
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Christo Lolov
Assignee: Christo Lolov


We need to cleanup the setup in ... once the JUnit 4 to JUnit 5 migration is 
complete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14109) Clean up JUnit 4 test infrastructure

2022-07-26 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-14109:
--
Description: We need to cleanup the setup in 
https://issues.apache.org/jira/browse/KAFKA-14108 once the JUnit 4 to JUnit 5 
migration is complete.  (was: We need to cleanup the setup in ... once the 
JUnit 4 to JUnit 5 migration is complete.)

> Clean up JUnit 4 test infrastructure
> 
>
> Key: KAFKA-14109
> URL: https://issues.apache.org/jira/browse/KAFKA-14109
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> We need to cleanup the setup in 
> https://issues.apache.org/jira/browse/KAFKA-14108 once the JUnit 4 to JUnit 5 
> migration is complete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ajborley opened a new pull request, #12440: KAFKA-14107: Upgrade Jetty version for CVE fixes

2022-07-26 Thread GitBox


ajborley opened a new pull request, #12440:
URL: https://github.com/apache/kafka/pull/12440

   KAFKA-14107 Upgrade Jetty for CVE fixes.
   
   Jetty: [CVE-2022-2048](https://nvd.nist.gov/vuln/detail/CVE-2022-2048)
   and [CVE-2022-2047](https://nvd.nist.gov/vuln/detail/CVE-2022-2047)
   - Fixed by upgrading to 9.4.48.v20220622
   
   Signed-off-by: Andrew Borley 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14108) Allow JUnit 4 and JUnit 5 tests to run while the migration is going on

2022-07-26 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-14108:
-

 Summary: Allow JUnit 4 and JUnit 5 tests to run while the 
migration is going on
 Key: KAFKA-14108
 URL: https://issues.apache.org/jira/browse/KAFKA-14108
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Christo Lolov
Assignee: Christo Lolov


We found out in [https://github.com/apache/kafka/pull/12285] that not all tests 
are running. We need to remedy this before we proceed with the JUnit 4 to JUnit 
5 migration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14107) Upgrade Jetty for CVE fixes

2022-07-26 Thread Andrew Borley (Jira)
Andrew Borley created KAFKA-14107:
-

 Summary: Upgrade Jetty for CVE fixes
 Key: KAFKA-14107
 URL: https://issues.apache.org/jira/browse/KAFKA-14107
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.2.0
Reporter: Andrew Borley
 Fix For: 3.3.0


There are a couple of CVEs for Jetty:

- [CVE-2022-2048](https://nvd.nist.gov/vuln/detail/CVE-2022-2048)

- [CVE-2022-2047](https://nvd.nist.gov/vuln/detail/CVE-2022-2047)

Fixed by upgrading to 9.4.48.v20220622+



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14001) Part 1

2022-07-26 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-14001:
--
Labels: streams  (was: )

> Part 1
> --
>
> Key: KAFKA-14001
> URL: https://issues.apache.org/jira/browse/KAFKA-14001
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: streams
>
> This sub-task is used to track the first pull request in a series of pull 
> requests which will be modifying test files in the Streams module in moving 
> it from JUnit 4 to JUnit 5.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14001) Part 1

2022-07-26 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov resolved KAFKA-14001.
---
  Reviewer: Bruno Cadonna
Resolution: Fixed

> Part 1
> --
>
> Key: KAFKA-14001
> URL: https://issues.apache.org/jira/browse/KAFKA-14001
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> This sub-task is used to track the first pull request in a series of pull 
> requests which will be modifying test files in the Streams module in moving 
> it from JUnit 4 to JUnit 5.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito

2022-07-26 Thread GitBox


ijuma commented on PR #12422:
URL: https://github.com/apache/kafka/pull/12422#issuecomment-1195370508

   @C0urante you can use your committer privileges to merge this. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito

2022-07-26 Thread GitBox


ijuma commented on code in PR #12422:
URL: https://github.com/apache/kafka/pull/12422#discussion_r929857189


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java:
##
@@ -53,64 +50,72 @@ public class WorkerConfigTransformerTest {
 public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL";
 public static final String TEST_RESULT_WITH_LONGER_TTL = 
"testResultWithLongerTTL";
 
-@Mock private Herder herder;
-@Mock private Worker worker;
-@Mock private HerderRequest requestId;
+private final Herder herder = Mockito.mock(Herder.class);
+private final Worker worker = Mockito.mock(Worker.class);
+private final HerderRequest requestId = Mockito.mock(HerderRequest.class);

Review Comment:
   Yeah, that's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zigarn commented on pull request #12434: KAFKA-14099 - Fix request logs in connect

2022-07-26 Thread GitBox


zigarn commented on PR #12434:
URL: https://github.com/apache/kafka/pull/12434#issuecomment-1195358695

   @C0urante In the meantime I added a test by copying the `LogCaptureAppender` 
from #10528.
   
   Not the best with the while loop to wait for the log to be created in the 
completion phase. A more robust solution would be to create an handler that 
will do the test in the completion phase, but this will need to expose the 
`RestServer.handlers`. I'll see if something can work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

2022-07-26 Thread GitBox


mimaison commented on PR #12429:
URL: https://github.com/apache/kafka/pull/12429#issuecomment-1195330919

   I've run the tests a dozen times and I've not seen any failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-26 Thread GitBox


showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r929766809


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   Sure. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-26 Thread GitBox


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r929748156


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   So this is exactly the version that I have now
   
   ```scala
   // Iterate through every grouping of duplicates by port to see if they are 
valid
   duplicatePortsPartitionedByValidIps.foreach {
 case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
   val errorMessage = "If you have two listeners on " +
 s"the same port then one needs to be IPv4 and the other IPv6, 
listeners: $listeners, port: $port"
   if (duplicatesWithoutIpHosts.nonEmpty)
 throw new IllegalArgumentException(errorMessage)
   
   duplicatesWithIpHosts match {
 case eps if eps.isEmpty =>
 case Seq(ep1, ep2) =>
   if (requireDistinctPorts)
 require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), 
errorMessage)
 case _ =>
   // Having more than 2 duplicate endpoints doesn't make sense since 
we only have 2 IP stacks (one is IPv4
   // and the other is IPv6)
   if (requireDistinctPorts)
 throw new IllegalArgumentException("Each listener must have a 
different port unless exactly one listener has " +
   s"an IPv4 address and the other IPv6 address, listeners: 
$listeners, port: $port")
   }
   }
```

And its still failing on the test. Lets revisit this later, I believe 
something is being missed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #10528: KAFKA-12497: Skip periodic offset commits for failed source tasks

2022-07-26 Thread GitBox


mimaison commented on PR #10528:
URL: https://github.com/apache/kafka/pull/10528#issuecomment-1195254372

   @tombentley @vvcephei Can you take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-26 Thread GitBox


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r929748156


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   So this is exactly the version that I have
   
   ```scala
   // Iterate through every grouping of duplicates by port to see if they are 
valid
   duplicatePortsPartitionedByValidIps.foreach {
 case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
   val errorMessage = "If you have two listeners on " +
 s"the same port then one needs to be IPv4 and the other IPv6, 
listeners: $listeners, port: $port"
   if (duplicatesWithoutIpHosts.nonEmpty)
 throw new IllegalArgumentException(errorMessage)
   
   duplicatesWithIpHosts match {
 case eps if eps.isEmpty =>
 case Seq(ep1, ep2) =>
   if (requireDistinctPorts)
 require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), 
errorMessage)
 case _ =>
   // Having more than 2 duplicate endpoints doesn't make sense since 
we only have 2 IP stacks (one is IPv4
   // and the other is IPv6)
   if (requireDistinctPorts)
 throw new IllegalArgumentException("Each listener must have a 
different port unless exactly one listener has " +
   s"an IPv4 address and the other IPv6 address, listeners: 
$listeners, port: $port")
   }
   }
```

And its still failing on the test. Lets revisit this later, I believe 
something is being missed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-26 Thread GitBox


rajinisivaram commented on PR #12416:
URL: https://github.com/apache/kafka/pull/12416#issuecomment-1195238184

   @badaiaqrandista Thanks for the update, looks good. But looks like there is 
a timing issue in the test since it has failed for the JDK8 PR build, can you 
take a look? Remember seeing it yesterday before the changes as well, so maybe 
a timing issue with the test itself.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-26 Thread GitBox


showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r929731476


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   Agree we can address this comment in another PR.
   
   I've checked the patch file, and found the following lines are not removed:
   ```
  if (requireDistinctPorts)
checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
   ```
   Because we don't expect `duplicatesWithoutIpHosts` is nonEmpty, we don't 
need to validate them.
   
   So, it will look like this:
   ```
   case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
val errorMessage = "If you have two listeners on " +
s"the same port then one needs to be IPv4 and the other IPv6, 
listeners: $listeners, port: $port"
if (duplicatesWithoutIpHosts.nonEmpty)
   throw new IllegalArgumentException(errorMessage)
   
 duplicatesWithIpHosts match {
   case eps if eps.isEmpty =>
   case Seq(ep1, ep2) =>
 if (requireDistinctPorts)
   require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), 
errorMessage)
case _ =>
  
   ```



##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcl

[GitHub] [kafka] showuon commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-26 Thread GitBox


showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r929731476


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   Agree we can address this comment in another PR.
   
   I've checked the patch file, and found the following lines are not removed:
   ```
  if (requireDistinctPorts)
checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
   ```
   Because we don't expect `duplicatesWithoutIpHosts` is nonEmpty, we don't 
need to validate them.
   
   So, it will look like this:
   ```
   case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
 val errorMessage = "If you have two listeners on " +
s"the same port then one needs to be IPv4 and the other IPv6, 
listeners: $listeners, port: $port"
if (duplicatesWithoutIpHosts.nonEmpty)
   throw new IllegalArgumentException(errorMessage)
   
 duplicatesWithIpHosts match {
   case eps if eps.isEmpty =>
   case Seq(ep1, ep2) =>
 if (requireDistinctPorts)
   require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), 
errorMessage)
case _ =>
  
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito

2022-07-26 Thread GitBox


clolov commented on PR #12422:
URL: https://github.com/apache/kafka/pull/12422#issuecomment-1195214901

   Heya @C0urante and @ijuma I just want to confirm whether we are waiting on a 
change which I need to make?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #12436: MINOR: Use builder for mock task in DefaultStateUpdaterTest

2022-07-26 Thread GitBox


cadonna merged PR #12436:
URL: https://github.com/apache/kafka/pull/12436


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12436: MINOR: Use builder for mock task in DefaultStateUpdaterTest

2022-07-26 Thread GitBox


cadonna commented on PR #12436:
URL: https://github.com/apache/kafka/pull/12436#issuecomment-1195152136

   Build failures are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication()
   Build / JDK 8 and Scala 2.12 / 
kafka.api.TransactionsTest.testCommitTransactionTimeout(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
kafka.api.TransactionsTest.testSendOffsetsToTransactionTimeout(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testKeyStoreAlter()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testSnapshotOnlyAfterConfiguredMinBytes()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-26 Thread GitBox


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r929615594


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,62 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =

Review Comment:
   Commit pushed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mnegodaev commented on pull request #11442: KAFKA-7883 add schema.namespace support to SetSchemaMetadata SMT in Kafka Connect

2022-07-26 Thread GitBox


mnegodaev commented on PR #11442:
URL: https://github.com/apache/kafka/pull/11442#issuecomment-1195099586

   @rhauch @mimaison @kkonstantine @pjmagee 
   If you guys don't mind I'm going to wait till Wednesday and then call a 
[VOTE] to have the proposal adopted. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-26 Thread GitBox


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r929602456


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   > But, if you think my above suggestion doesn't look better, you can keep 
current implementation. Just want to let you know I think there's a better and 
more clear way to implement the logic. Thanks.
   
   Don't disagree with the changes, just that they appear to be making a 
regression (i.e. breaking an existing test). Just to make sure something really 
weird isn't going on I attached a `.patch` file located within a zip so we are 
talking about the same change.
   
   
[reorder-exception.patch.zip](https://github.com/apache/kafka/files/9187405/reorder-exception.patch.zip)
   
   In any case its not critical so we can always do a minor change later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-26 Thread GitBox


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r929602456


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   > But, if you think my above suggestion doesn't look better, you can keep 
current implementation. Just want to let you know I think there's a better and 
more clear way to implement the logic. Thanks.
   
   Don't disagree with the changes, just that they appear to be making a 
regression (i.e. breaking an existing test). Just to make sure something really 
weird isn't going on I attached a `.patch` file located within a zip so we are 
talking about the same change.
   
   
[reorder-exception.patch.zip](https://github.com/apache/kafka/files/9187405/reorder-exception.patch.zip)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org