[GitHub] [kafka] zigarn commented on pull request #12434: KAFKA-14099 - Fix request logs in connect

2022-07-25 Thread GitBox


zigarn commented on PR #12434:
URL: https://github.com/apache/kafka/pull/12434#issuecomment-1195083745

   Thanks @C0urante for the feedback.
   No problem for the issue in the first place, I guess not many people were 
impacted by this as nobody raised this issue.
   
   I'm more concerned about why it's not working in the current way to do it. I 
wonder if the `DefaultHandler` is working either? From my understanding, looks 
like `context` and `adminContext` are working because they are explicitly 
started, but I tried to do the same with `requestLogHandler` with no luck.
   
   I was wondering if there was a way to test this, so if you say there is with 
`LogCaptureAppender`, let's go for it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14101) Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary

2022-07-25 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571214#comment-17571214
 ] 

Tom Bentley commented on KAFKA-14101:
-

[~ChrisEgerton] I wonder whether we should rename {{consume}} to 
{{consumeSome}} to make the semantics a little more obvious.

> Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary
> 
>
> Key: KAFKA-14101
> URL: https://issues.apache.org/jira/browse/KAFKA-14101
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Attachments: 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary.test.stdout
>
>
> I hit this one while running the tests on your branch from 
> https://github.com/apache/kafka/pull/12429
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest > 
> testConnectorBoundary FAILED
> java.lang.AssertionError: Committed records should exclude 
> connector-aborted transactions expected:<[1, 3, 4, 5, 9, 10, 11, 12, 13, 14, 
> 15, 16, 17, 18, 19, 20, 21, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 
> 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 
> 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 
> 85, 86, 87, 88, 89]> but was:<[4, 5, 10, 13, 16, 18, 20, 37, 39, 40, 46, 47, 
> 49, 54, 59, 64, 65, 68, 70, 71, 77, 83, 85, 89, 146, 148, 153, 154, 157, 158, 
> 159, 163, 165, 169, 175, 176, 178, 183, 184, 185, 187, 188, 191, 196, 199, 
> 211, 216, 217, 218, 222, 223, 229, 232, 238, 244, 251, 255, 259, 261, 269, 
> 272, 274, 275, 276, 277, 278, 279, 285, 291, 293, 296, 299]>
> at org.junit.Assert.fail(Assert.java:89)
> at org.junit.Assert.failNotEquals(Assert.java:835)
> at org.junit.Assert.assertEquals(Assert.java:120)
> at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:456)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14106) Fetcher thread was shutdown and all fetched partitions are lost.

2022-07-25 Thread Yang Ling (Jira)
Yang Ling created KAFKA-14106:
-

 Summary: Fetcher thread was shutdown and all fetched partitions 
are lost.
 Key: KAFKA-14106
 URL: https://issues.apache.org/jira/browse/KAFKA-14106
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 3.0.0, 2.2.2
Reporter: Yang Ling


Dynamic changes of listeners will lead into out of sync. Our operation is as 
following:
 # Broker is started and listening on a ip-address.
 # Create some topics.
 # Change listening to a domain name via dynamic-configuration for some reason.
 # Create some new topics.
 # Produce message into any older topics.
 # All topics, produced in step 5, are out of sync.

Following is major logs:
{panel}
[2022-07-23 15:30:53,282] INFO [ReplicaFetcherManager on broker 0] Added 
fetcher to broker BrokerEndPoint(id=2, host=168.1.3.88:9092) for partitions 
Map(test-11 -> (offset=0, leaderEpoch=0), test-5 -> (offset=0, leaderEpoch=0), 
test-8 -> (offset=0, leaderEpoch=0), test-2 -> (offset=0, leaderEpoch=0)) 
(kafka.server.ReplicaFetcherManager)
[2022-07-25 15:01:51,581] INFO [ReplicaFetcher replicaId=0, leaderId=2, 
fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2022-07-25 18:14:05,297] INFO [ReplicaFetcherManager on broker 0]Added fetcher 
to broker BrokerEndPoint(id=2, host=kafka-server-1:9092) for partitions 
Map(test2-6 -> (offset=0, leaderEpoch=0), test2-0 -> (offset=0, leaderEpoch=0), 
test2-3 -> (offset=0, leaderEpoch=0), test2-9 -> (offset=0, leaderEpoch=0)) 
(kafka.server.ReplicaFetcherManager)
{panel}

After read source code. We found following code in AbstractFetcherManager:
{code:scala}
def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, 
InitialFetchState]) {
...
  for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {
val brokerIdAndFetcherId = 
BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match {
  case Some(currentFetcherThread) if currentFetcherThread.sourceBroker 
== brokerAndFetcherId.broker =>
currentFetcherThread
  case Some(f) =>
f.shutdown() // - marked
addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
  case None =>
addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
}
}
  }
...
}
{code}

As marked code defined, if sourceBroker is changed, in our case, the older 
fetcher thread will be shutdown and a new fetcher thread will be created using 
new sourceBroker. In this way, all of the fetched partitions in older fetcher 
thread will be lost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-07-25 Thread GitBox


C0urante commented on PR #12019:
URL: https://github.com/apache/kafka/pull/12019#issuecomment-1194881570

   Converting this to a draft since I haven't had time to prioritize it (sorry 
@YeonCheolGit!) and the changes here are not safe to merge as-are.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


showuon commented on PR #11478:
URL: https://github.com/apache/kafka/pull/11478#issuecomment-1194872263

   @mimaison @tombentley , do you want to have another look? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r929446869


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,62 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =

Review Comment:
   nit: since methods exposed in `CoreUtils` are expected to be used by other 
instances, I don't think we expect the `validateOneIsIpv4AndOtherIpv6` or 
`checkDuplicateListenerPorts` to be used by other cases. Maybe we can move them 
into `listenerListToEndPoints` like what `validate` method do.



##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   I checked, the `apache.org` will be seen as `duplicatesWithoutIpHosts`, and 
it'll enter here:
   ```
   if (duplicatesWithoutIpHosts.nonEmpty)
 throw new IllegalArgumentException("...")
   ```
   which should also throw exception, and pass the test, unless the error 
message is not expected. But, if you think my above suggestion doesn't look 
better, you can keep current implementation. Just want to let you know I think 
there's a better and more clear way to implement the logic. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14102) (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first started app can consume messages

2022-07-25 Thread Shuo Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Chen updated KAFKA-14102:
--
Priority: Blocker  (was: Major)

> (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first 
> started app can consume messages
> 
>
> Key: KAFKA-14102
> URL: https://issues.apache.org/jira/browse/KAFKA-14102
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Shuo Chen
>Priority: Blocker
>
> We have 2 web applications (A and B) will consume messages from the same 
> Kafka Server,  so they have the same configurations:
> {code:java}
> security.protocol=SASL_SSL
> sasl.mechanism=OAUTHBEARER
> sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
>  required; 
> sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler
> jaas.enabled=true{code}
>  
> A and B deployed together in one Tomcat server (means they are in JVM 
> process), startup  sequential is A -> B,  then we find B cannot consume the 
> message with following exception:
> {code:java}
> [2022-07-22 02:52:45,184] [ INFO] 6 
> [org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
> o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
> clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
> groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
> org.apache.kafka.common.errors.SaslAuthenticationException: Failed to 
> configure SaslClientAuthenticator
> Caused by: java.lang.IllegalArgumentException: Callback handler must be 
> castable to 
> org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
> at 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
>  ~[kafka-clients-3.0.1.jar:?]
> at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
>  ~[kafka-clients-3.0.1.jar:?]
> ... suppressed 2 lines
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:206)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:143) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
> ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordi

[jira] [Updated] (KAFKA-14090) Allow Kafka Streams to be configured to not create internal topics

2022-07-25 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14090:

Component/s: streams
 (was: clients)

> Allow Kafka Streams to be configured to not create internal topics
> --
>
> Key: KAFKA-14090
> URL: https://issues.apache.org/jira/browse/KAFKA-14090
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Abraham Leal
>Priority: Minor
>
> These should be a way to instruct Kafka Streams to not create internal topics 
> on start-up through configuration and fail if the internal topics needed 
> aren't there.
> The reasoning for this option is in the case of governance for the 
> application: An organization may wish to disallow the creation of topics by 
> clients and opt for all topic creation to be done through administrators or a 
> certain process. Injecting this property in all clients would ensure good 
> central governance of the backing Kafka cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14101) Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary

2022-07-25 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571130#comment-17571130
 ] 

Chris Egerton edited comment on KAFKA-14101 at 7/25/22 11:43 PM:
-

Thanks, [~mimaison]. I believe this is due to a bug that I've now fixed in that 
PR; we should be using {{consumeAll}} instead of {{consume}} in the test 
[here|https://github.com/C0urante/kafka/blob/466a59df2d02de16d285325be4d44c99b24eee4d/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L408-L410]
 in order to guarantee that we've collected records from every partition of the 
topic that the connector produces to instead of just consuming until we get at 
least 100 from one or more topic partitions.

Please let me know if you see this issue come up again on the branch for 
[https://github.com/apache/kafka/pull/12429] now that I've pushed a change that 
should fix this.

Apologies for all the flakiness, and thank you for quickly finding and 
reporting all of it. Really hoping we can iron out all the kinks before we 
start producing 3.3.0 RCs 🤞


was (Author: chrisegerton):
Thanks, [~mimaison]. I believe this is due to a bug in that PR; should be using 
{{consumeAll}} instead of {{consume}} in the test 
[here|https://github.com/C0urante/kafka/blob/466a59df2d02de16d285325be4d44c99b24eee4d/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L408-L410]
 in order to guarantee that we've collected records from every partition of the 
topic that the connector produces to, instead of just consuming until we get at 
least 100 from one or more topic partitions.

Please let me know if there are any issues on trunk, or if you see this issue 
come up again on the branch for [https://github.com/apache/kafka/pull/12429] 
now that I've pushed a change that _should_ fix this.

Apologies for all the flakiness, and thank you for quickly finding and 
reporting all of it. Really hoping we can iron out all the kinks before we 
start producing 3.3.0 RCs 🤞

> Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary
> 
>
> Key: KAFKA-14101
> URL: https://issues.apache.org/jira/browse/KAFKA-14101
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Attachments: 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary.test.stdout
>
>
> I hit this one while running the tests on your branch from 
> https://github.com/apache/kafka/pull/12429
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest > 
> testConnectorBoundary FAILED
> java.lang.AssertionError: Committed records should exclude 
> connector-aborted transactions expected:<[1, 3, 4, 5, 9, 10, 11, 12, 13, 14, 
> 15, 16, 17, 18, 19, 20, 21, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 
> 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 
> 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 
> 85, 86, 87, 88, 89]> but was:<[4, 5, 10, 13, 16, 18, 20, 37, 39, 40, 46, 47, 
> 49, 54, 59, 64, 65, 68, 70, 71, 77, 83, 85, 89, 146, 148, 153, 154, 157, 158, 
> 159, 163, 165, 169, 175, 176, 178, 183, 184, 185, 187, 188, 191, 196, 199, 
> 211, 216, 217, 218, 222, 223, 229, 232, 238, 244, 251, 255, 259, 261, 269, 
> 272, 274, 275, 276, 277, 278, 279, 285, 291, 293, 296, 299]>
> at org.junit.Assert.fail(Assert.java:89)
> at org.junit.Assert.failNotEquals(Assert.java:835)
> at org.junit.Assert.assertEquals(Assert.java:120)
> at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:456)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14101) Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary

2022-07-25 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571130#comment-17571130
 ] 

Chris Egerton edited comment on KAFKA-14101 at 7/25/22 11:39 PM:
-

Thanks, [~mimaison]. I believe this is due to a bug in that PR; should be using 
{{consumeAll}} instead of {{consume}} in the test 
[here|https://github.com/C0urante/kafka/blob/466a59df2d02de16d285325be4d44c99b24eee4d/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L408-L410]
 in order to guarantee that we've collected records from every partition of the 
topic that the connector produces to, instead of just consuming until we get at 
least 100 from one or more topic partitions.

Please let me know if there are any issues on trunk, or if you see this issue 
come up again on the branch for [https://github.com/apache/kafka/pull/12429] 
now that I've pushed a change that _should_ fix this.

Apologies for all the flakiness, and thank you for quickly finding and 
reporting all of it. Really hoping we can iron out all the kinks before we 
start producing 3.3.0 RCs 🤞


was (Author: chrisegerton):
Thanks, [~mimaison]. I believe this is due to a bug in that PR; should be using 
{{consumeAll}} instead of {{consume}} in the test 
[here|https://github.com/C0urante/kafka/blob/466a59df2d02de16d285325be4d44c99b24eee4d/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L408-L410].

Please let me know if there are any issues on trunk, or if you see this issue 
come up again on the branch for [https://github.com/apache/kafka/pull/12429] 
now that I've pushed a change that _should_ fix this.

Apologies for all the flakiness, and thank you for quickly finding and 
reporting all of it. Really hoping we can iron out all the kinks before we 
start producing 3.3.0 RCs 🤞

> Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary
> 
>
> Key: KAFKA-14101
> URL: https://issues.apache.org/jira/browse/KAFKA-14101
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Attachments: 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary.test.stdout
>
>
> I hit this one while running the tests on your branch from 
> https://github.com/apache/kafka/pull/12429
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest > 
> testConnectorBoundary FAILED
> java.lang.AssertionError: Committed records should exclude 
> connector-aborted transactions expected:<[1, 3, 4, 5, 9, 10, 11, 12, 13, 14, 
> 15, 16, 17, 18, 19, 20, 21, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 
> 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 
> 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 
> 85, 86, 87, 88, 89]> but was:<[4, 5, 10, 13, 16, 18, 20, 37, 39, 40, 46, 47, 
> 49, 54, 59, 64, 65, 68, 70, 71, 77, 83, 85, 89, 146, 148, 153, 154, 157, 158, 
> 159, 163, 165, 169, 175, 176, 178, 183, 184, 185, 187, 188, 191, 196, 199, 
> 211, 216, 217, 218, 222, 223, 229, 232, 238, 244, 251, 255, 259, 261, 269, 
> 272, 274, 275, 276, 277, 278, 279, 285, 291, 293, 296, 299]>
> at org.junit.Assert.fail(Assert.java:89)
> at org.junit.Assert.failNotEquals(Assert.java:835)
> at org.junit.Assert.assertEquals(Assert.java:120)
> at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:456)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14101) Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary

2022-07-25 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571130#comment-17571130
 ] 

Chris Egerton commented on KAFKA-14101:
---

Thanks, [~mimaison]. I believe this is due to a bug in that PR; should be using 
{{consumeAll}} instead of {{consume}} in the test 
[here|https://github.com/C0urante/kafka/blob/466a59df2d02de16d285325be4d44c99b24eee4d/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L408-L410].

Please let me know if there are any issues on trunk, or if you see this issue 
come up again on the branch for [https://github.com/apache/kafka/pull/12429] 
now that I've pushed a change that _should_ fix this.

Apologies for all the flakiness, and thank you for quickly finding and 
reporting all of it. Really hoping we can iron out all the kinks before we 
start producing 3.3.0 RCs 🤞

> Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary
> 
>
> Key: KAFKA-14101
> URL: https://issues.apache.org/jira/browse/KAFKA-14101
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Attachments: 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary.test.stdout
>
>
> I hit this one while running the tests on your branch from 
> https://github.com/apache/kafka/pull/12429
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest > 
> testConnectorBoundary FAILED
> java.lang.AssertionError: Committed records should exclude 
> connector-aborted transactions expected:<[1, 3, 4, 5, 9, 10, 11, 12, 13, 14, 
> 15, 16, 17, 18, 19, 20, 21, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 
> 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 
> 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 
> 85, 86, 87, 88, 89]> but was:<[4, 5, 10, 13, 16, 18, 20, 37, 39, 40, 46, 47, 
> 49, 54, 59, 64, 65, 68, 70, 71, 77, 83, 85, 89, 146, 148, 153, 154, 157, 158, 
> 159, 163, 165, 169, 175, 176, 178, 183, 184, 185, 187, 188, 191, 196, 199, 
> 211, 216, 217, 218, 222, 223, 229, 232, 238, 244, 251, 255, 259, 261, 269, 
> 272, 274, 275, 276, 277, 278, 279, 285, 291, 293, 296, 299]>
> at org.junit.Assert.fail(Assert.java:89)
> at org.junit.Assert.failNotEquals(Assert.java:835)
> at org.junit.Assert.assertEquals(Assert.java:120)
> at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:456)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

2022-07-25 Thread GitBox


C0urante commented on code in PR #12429:
URL: https://github.com/apache/kafka/pull/12429#discussion_r929381705


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -826,15 +830,22 @@ public void testSeparateOffsetsTopic() throws Exception {
 assertConnectorStopped(connectorStop);
 
 // consume all records from the source topic or fail, to ensure 
that they were correctly produced
-ConsumerRecords records = 
connectorTargetedCluster.consumeAll(
+ConsumerRecords sourceRecords = 
connectorTargetedCluster.consumeAll(
 CONSUME_RECORDS_TIMEOUT_MS,
 
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
"read_committed"),
 null,
 topic
 );
-assertTrue("Not enough records produced by source connector. 
Expected at least: " + recordsProduced + " + but got " + records.count(),
-records.count() >= recordsProduced);
-assertExactlyOnceSeqnos(records, numTasks);
+assertTrue("Not enough records produced by source connector. 
Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(),
+sourceRecords.count() >= recordsProduced);
+// also have to check which offsets have actually been committed, 
since we no longer have exactly-once guarantees

Review Comment:
   Good point; added a Javadoc.
   
   I'll note that, while writing it, it's occurred to me that we could 
strengthen some of the guarantees in the test. For example, we don't assert 
that the connector has actually committed any new offsets after bringing up the 
cluster again with exactly-once support disabled. But I'm a little reluctant to 
address these immediately since it may introduce additional flakiness if we get 
things wrong, and this PR is likely going to be backported to 3.3 in order to 
improve stability while generating release candidates. If you believe these 
additional guarantees are worth pursuing, though, let me know and I can file a 
separate follow-up knowing that it's targeted exclusively at trunk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

2022-07-25 Thread GitBox


C0urante commented on code in PR #12429:
URL: https://github.com/apache/kafka/pull/12429#discussion_r929381599


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
 recordNum >= recordsProduced);

Review Comment:
   Ahh, good catch--the "all" is incorrect in that comment. I've updated it 
from "consume all records" to "consume at least the expected number of records".
   
   On a separate note, I believe a similar mixup is the cause of 
https://issues.apache.org/jira/browse/KAFKA-14101, where we _should_ be using 
`consumeAll` but aren't at the moment. I've pushed a commit that should fix 
that.



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
 recordNum >= recordsProduced);
 
 // also consume from the connector's dedicated offsets topic; just 
need to read one offset record
-ConsumerRecord offsetRecord = 
connectorTargetedCluster
-.consume(
-1,
+ConsumerRecords offsetRecords = 
connectorTargetedCluster
+.consumeAll(
 TimeUnit.MINUTES.toMillis(1),
 
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
"read_committed"),
+null,
 offsetsTopic
-).iterator().next();
-long seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
-assertEquals("Offset commits should occur on connector-defined 
poll boundaries, which happen every " + recordsProduced + " records",
-0, seqno % recordsProduced);
+);
+List seqnos = 
parseAndAssertOffsetsForSingleTask(offsetRecords);
+seqnos.forEach(seqno ->
+assertEquals("Offset commits should occur on connector-defined 
poll boundaries, which happen every " + recordsProduced + " records",
+0, seqno % recordsProduced)
+);
 
 // also consume from the cluster's global offsets topic; again, 
just need to read one offset record

Review Comment:
   Right again, thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

2022-07-25 Thread GitBox


C0urante commented on code in PR #12429:
URL: https://github.com/apache/kafka/pull/12429#discussion_r929381491


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -217,6 +217,9 @@ protected void finalOffsetCommit(boolean failed) {
 if (failed) {
 log.debug("Skipping final offset commit as task has failed");
 return;
+} else if (isCancelled()) {
+log.debug("Skipping final offset commit as task has been cancelled 
and its producer has already been closed");

Review Comment:
   That's fair; technically, it may not be closed if we hit this part right 
after `cancelled` is flipped to `true` but not before the producer is actually 
closed. I'd personally err on the side of skipping the offset commit if the 
task is cancelled since it's scary to see offset commit failure messages and 
the odds of an offset commit being viable once we know the task has been 
cancelled are low, but I'm not too attached to this (especially on a flaky test 
PR). What are your thoughts?
   
   FWIW, I've also updated the log message to remove the explanation of why 
we're skipping the commit; the detail about the producer being closed doesn't 
seem strictly necessary.



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
 recordNum >= recordsProduced);
 
 // also consume from the connector's dedicated offsets topic; just 
need to read one offset record

Review Comment:
   Nope, thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12439: KAFKA-10199: Further refactor task lifecycle management

2022-07-25 Thread GitBox


guozhangwang commented on PR #12439:
URL: https://github.com/apache/kafka/pull/12439#issuecomment-1194710146

   Ping @cadonna for reviews, especially about 4) above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14099) No REST API request logs in Kafka connect

2022-07-25 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571115#comment-17571115
 ] 

Chris Egerton commented on KAFKA-14099:
---

[~zigarn] Since you filed a PR to address this (thanks again for the fix!), 
I've assigned the ticket to you. Feel free to unassign at any point if you want 
to signal to others that nobody is actively working on this.

> No REST API request logs in Kafka connect
> -
>
> Key: KAFKA-14099
> URL: https://issues.apache.org/jira/browse/KAFKA-14099
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Alexandre Garnier
>Assignee: Alexandre Garnier
>Priority: Minor
>  Labels: pull-request-available
>
> Prior to 2.2.1, when an REST API request was performed, there was a request 
> log in the log file:
> {code:java}
> [2022-07-23 07:18:16,128] INFO 172.18.0.1 - - [23/Jul/2022:07:18:16 +] 
> "GET /connectors HTTP/1.1" 200 2 "-" "curl/7.81.0" 66 
> (org.apache.kafka.connect.runtime.rest.RestServer:62)
> {code}
> Since 2.2.1, no more request logs.
>  
> With a bisect, I found the problem comes from [PR 
> 6651|https://github.com/apache/kafka/pull/6651] to fix KAFKA-8304
> From what I understand of the problem, the ContextHandlerCollection is added 
> in the Server 
> ([https://github.com/dongjinleekr/kafka/blob/63a6130af30536d67fca5802005695a84c875b5e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L195])
>  before handlers are really added in the ContextHandlerCollection 
> ([https://github.com/dongjinleekr/kafka/blob/63a6130af30536d67fca5802005695a84c875b5e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L296]).
> I don't know the impact on other handlers, but clearly it doesn't work for 
> the RequestLogHandler.
>  
> A solution I found for the logging issue is to set the RequestLog directly in 
> the server without using an handlers:
> {code:java}
> diff --git 
> i/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
>  
> w/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
> index ab18419efc..4d09cc0e6c 100644
> --- 
> i/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
> +++ 
> w/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
> @@ -187,6 +187,11 @@ public class RestServer {
>  public void initializeServer() {
>  log.info("Initializing REST server");
>  
> +Slf4jRequestLogWriter slf4jRequestLogWriter = new 
> Slf4jRequestLogWriter();
> +
> slf4jRequestLogWriter.setLoggerName(RestServer.class.getCanonicalName());
> +CustomRequestLog requestLog = new 
> CustomRequestLog(slf4jRequestLogWriter, CustomRequestLog.EXTENDED_NCSA_FORMAT 
> + " %{ms}T");
> +jettyServer.setRequestLog(requestLog);
> +
>  /* Needed for graceful shutdown as per `setStopTimeout` 
> documentation */
>  StatisticsHandler statsHandler = new StatisticsHandler();
>  statsHandler.setHandler(handlers);
> @@ -275,14 +280,7 @@ public class RestServer {
>  configureHttpResponsHeaderFilter(context);
>  }
>  
> -RequestLogHandler requestLogHandler = new RequestLogHandler();
> -Slf4jRequestLogWriter slf4jRequestLogWriter = new 
> Slf4jRequestLogWriter();
> -
> slf4jRequestLogWriter.setLoggerName(RestServer.class.getCanonicalName());
> -CustomRequestLog requestLog = new 
> CustomRequestLog(slf4jRequestLogWriter, CustomRequestLog.EXTENDED_NCSA_FORMAT 
> + " %{ms}T");
> -requestLogHandler.setRequestLog(requestLog);
> -
>  contextHandlers.add(new DefaultHandler());
> -contextHandlers.add(requestLogHandler);
>  
>  handlers.setHandlers(contextHandlers.toArray(new Handler[0]));
>  try {
> {code}
> Same issue raised on StackOverflow: 
> [https://stackoverflow.com/questions/67699702/no-rest-api-logs-in-kafka-connect]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #10528: KAFKA-12497: Skip periodic offset commits for failed source tasks

2022-07-25 Thread GitBox


C0urante commented on PR #10528:
URL: https://github.com/apache/kafka/pull/10528#issuecomment-1194686456

   @mimaison I've finally gotten around to rebasing this one, mind taking 
another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-25 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r929322796


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,93 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-25 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r929319425


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,93 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in 
Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+shutdownServerAndMetrics(server)
+
+// create server with SSL listener
+val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+testableServer.enableRequestProcessing(Map.empty)
+val testableSelector = testableServer.testableSelector
+val proxyServer = new ProxyServer(testableServer)
+val selectTimeout = 5000  // in ms
+// set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is 
distinct and can be identified
+testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+try {
+  // trigger SSL handshake by sending the first request and receiving its 
response without buffering
+  val requestBytes = producerRequestBytes()
+  val sslSocket = sslClientSocket(proxyServer.localPort)
+
+  sendRequest(sslSocket, requestBytes)
+  val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+  processRequest(testableServer.dataPlaneRequestChannel, request1)
+  receiveResponse(sslSocket)
+
+  // then put 2 requests in SslTransportLayer.netReadBuffer via the 
ProxyServer
+  val connectionId = request1.context.connectionId
+  val listener = 
testableServer.config.dataPlaneListeners.head.listenerName.value
+  val channel = 
testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw
 new IllegalStateException("Channel not found"))
+  val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, 
classOf[KafkaChannel], "transportLayer")
+  val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, 
classOf[SslTransportLayer], "netReadBuffer")
+
+  proxyServer.enableBuffering(netReadBuffer)
+  sendRequest(sslSocket, requestBytes)
+  sendRequest(sslSocket, requestBytes)
+
+  val keysWithBufferedRead: util.Set[SelectionKey] = 
JTestUtils.fieldValue(testableSelector, classOf[Selector], 
"keysWithBufferedRead")
+  keysWithBufferedRead.add(channel.selectionKey)
+  JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+  // process the first request in the server side
+  // this would move bytes from netReadBuffer to appReadBuffer, then 
process only the first request
+  // we call wakeup() so Selector.poll() does not block in this step 
(because we artificially add data into netReadBuffer)
+  testableSelector.wakeup()
+  val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, 1000)
+  processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+  // receive response in the client side
+  receiveResponse(sslSocket)
+
+  // process the second request in the server side
+  // this would process the second request in the appReadBuffer
+  // NOTE 1: this should not block because the data is already in the 
buffer
+  // NOTE 2: we do not call wakeup() here so Selector.poll() would block 
if the fix is not in place
+  val processTimeStart = System.nanoTime()  // using nanoTime() because it 
is meant to calculate elapsed time
+  receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 
1000)
+  val processTimeEnd = System.nanoTime()
+
+  // check the duration of processing the second request
+ 

[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-25 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r929317899


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,93 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in 
Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+shutdownServerAndMetrics(server)
+
+// create server with SSL listener

Review Comment:
   done



##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,93 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in 
Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+shutdownServerAndMetrics(server)
+
+// create server with SSL listener
+val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+testableServer.enableRequestProcessing(Map.empty)
+val testableSelector = testableServer.testableSelector
+val proxyServer = new ProxyServer(testableServer)
+val selectTimeout = 5000  // in ms

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-13559) The broker's ProduceResponse may be delayed for 300ms

2022-07-25 Thread Badai Aqrandista (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571100#comment-17571100
 ] 

Badai Aqrandista edited comment on KAFKA-13559 at 7/25/22 9:28 PM:
---

The sequence of events from the server point of view:

Step 1 - SslTransportLayer receives more than one requests in the socket and 
put it in the buffer (SslTransportLayer.netReadBuffer).
Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request 
is read from SslTransportLayer.appReadBuffer, instead of the socket. Because of 
this, "select(timeout)" in "Selector.poll()" should not block for the "poll 
timeout" (hardcoded to 300 in SocketServer.scala).


was (Author: badai):
The sequence of events from the server point of view:

Step 1 - SslTransportLayer receives more than one requests in the socket and 
put it in the buffer (SslTransportLayer.netReadBuffer).
Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request 
is read from SslTransportLayer.appReadBuffer, instead of the socket. Because of 
this, "select(timeout)" in "Selector.poll()" should not block for the "poll 
timeout" (hardcoded to 300 in Selector.java, but in this test it is set to 
5000).

> The broker's  ProduceResponse may be delayed for 300ms
> --
>
> Key: KAFKA-13559
> URL: https://issues.apache.org/jira/browse/KAFKA-13559
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 2.7.0
>Reporter: frankshi
>Assignee: Badai Aqrandista
>Priority: Major
> Attachments: image-1.png, image-2.png, 
> image-2021-12-21-14-44-56-689.png, image-2021-12-21-14-45-22-716.png, 
> image-3.png, image-5.png, image-6.png, image-7.png, image.png
>
>
> Hi team:
> We have found the value in the source code 
> [here|https://github.com/apache/kafka/blob/2.7/core/src/main/scala/kafka/network/SocketServer.scala#L922]
>  may lead broker’s  ProduceResponse to be delayed for 300ms.
>  * Server-version: 2.13-2.7.0.
>  * Client-version: confluent-kafka-python-1.5.0.
> we have set the client’s  configuration as following:
> {code:java}
> ling.ms = 0
> acks = 1
> delivery.timeout.ms = 100
> request.timeout.ms =  80
> Sasl.mechanism =  “PLAIN”
> Security.protocol  =  “SASL_SSL”
> ..
> {code}
> Because we set ACKs = 1, the client sends ProduceRequests and receives 
> ProduceResponses from brokers. The leader broker doesn't need to wait for the 
> ISR’s writing data to disk successfully.  It can reply to the client by 
> sending ProduceResponses directly. In our situation, the ping value between 
> the client and the kafka brokers is about ~10ms, and most of the time, the 
> responses are received about 10ms after the Produce requests are sent. But 
> sometimes the responses are received about ~300ms later.
> The following shows the log from the client.
> {code:java}
> 2021-11-26 02:31:30,567  Sent partial ProduceRequest (v7, 0+16527/37366 
> bytes, CorrId 2753)
> 2021-11-26 02:31:30,568  Sent partial ProduceRequest (v7, 16527+16384/37366 
> bytes, CorrId 2753)
> 2021-11-26 02:31:30,568  Sent ProduceRequest (v7, 37366 bytes @ 32911, CorrId 
> 2753)
> 2021-11-26 02:31:30,570  Sent ProduceRequest (v7, 4714 bytes @ 0, CorrId 2754)
> 2021-11-26 02:31:30,571  Sent ProduceRequest (v7, 1161 bytes @ 0, CorrId 2755)
> 2021-11-26 02:31:30,572  Sent ProduceRequest (v7, 1240 bytes @ 0, CorrId 2756)
> 2021-11-26 02:31:30,572  Received ProduceResponse (v7, 69 bytes, CorrId 2751, 
> rtt 9.79ms)
> 2021-11-26 02:31:30,572  Received ProduceResponse (v7, 69 bytes, CorrId 2752, 
> rtt 10.34ms)
> 2021-11-26 02:31:30,573  Received ProduceResponse (v7, 69 bytes, CorrId 2753, 
> rtt 10.11ms)
> 2021-11-26 02:31:30,872  Received ProduceResponse (v7, 69 bytes, CorrId 2754, 
> rtt 309.69ms)
> 2021-11-26 02:31:30,883  Sent ProduceRequest (v7, 1818 bytes @ 0, CorrId 2757)
> 2021-11-26 02:31:30,887  Sent ProduceRequest (v7, 1655 bytes @ 0, CorrId 2758)
> 2021-11-26 02:31:30,888  Received ProduceResponse (v7, 69 bytes, CorrId 2755, 
> rtt 318.85ms)
> 2021-11-26 02:31:30,893  Sent partial ProduceRequest (v7, 0+16527/37562 
> bytes, CorrId 2759)
> 2021-11-26 02:31:30,894  Sent partial ProduceRequest (v7, 16527+16384/37562 
> bytes, CorrId 2759)
> 2021-11-26 02:31:30,895  Sent ProduceRequest (v7, 37562 bytes @ 32911, CorrId 
> 2759)
> 2021-11-26 02:31:30,896  Sent ProduceRequest (v7, 4700

[jira] [Commented] (KAFKA-13559) The broker's ProduceResponse may be delayed for 300ms

2022-07-25 Thread Badai Aqrandista (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571100#comment-17571100
 ] 

Badai Aqrandista commented on KAFKA-13559:
--

The sequence of events from the server point of view:

Step 1 - SslTransportLayer receives more than one requests in the socket and 
put it in the buffer (SslTransportLayer.netReadBuffer).
Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request 
is read from SslTransportLayer.appReadBuffer, instead of the socket. Because of 
this, "select(timeout)" in "Selector.poll()" should not block for the "poll 
timeout" (hardcoded to 300 in Selector.java, but in this test it is set to 
5000).

> The broker's  ProduceResponse may be delayed for 300ms
> --
>
> Key: KAFKA-13559
> URL: https://issues.apache.org/jira/browse/KAFKA-13559
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 2.7.0
>Reporter: frankshi
>Assignee: Badai Aqrandista
>Priority: Major
> Attachments: image-1.png, image-2.png, 
> image-2021-12-21-14-44-56-689.png, image-2021-12-21-14-45-22-716.png, 
> image-3.png, image-5.png, image-6.png, image-7.png, image.png
>
>
> Hi team:
> We have found the value in the source code 
> [here|https://github.com/apache/kafka/blob/2.7/core/src/main/scala/kafka/network/SocketServer.scala#L922]
>  may lead broker’s  ProduceResponse to be delayed for 300ms.
>  * Server-version: 2.13-2.7.0.
>  * Client-version: confluent-kafka-python-1.5.0.
> we have set the client’s  configuration as following:
> {code:java}
> ling.ms = 0
> acks = 1
> delivery.timeout.ms = 100
> request.timeout.ms =  80
> Sasl.mechanism =  “PLAIN”
> Security.protocol  =  “SASL_SSL”
> ..
> {code}
> Because we set ACKs = 1, the client sends ProduceRequests and receives 
> ProduceResponses from brokers. The leader broker doesn't need to wait for the 
> ISR’s writing data to disk successfully.  It can reply to the client by 
> sending ProduceResponses directly. In our situation, the ping value between 
> the client and the kafka brokers is about ~10ms, and most of the time, the 
> responses are received about 10ms after the Produce requests are sent. But 
> sometimes the responses are received about ~300ms later.
> The following shows the log from the client.
> {code:java}
> 2021-11-26 02:31:30,567  Sent partial ProduceRequest (v7, 0+16527/37366 
> bytes, CorrId 2753)
> 2021-11-26 02:31:30,568  Sent partial ProduceRequest (v7, 16527+16384/37366 
> bytes, CorrId 2753)
> 2021-11-26 02:31:30,568  Sent ProduceRequest (v7, 37366 bytes @ 32911, CorrId 
> 2753)
> 2021-11-26 02:31:30,570  Sent ProduceRequest (v7, 4714 bytes @ 0, CorrId 2754)
> 2021-11-26 02:31:30,571  Sent ProduceRequest (v7, 1161 bytes @ 0, CorrId 2755)
> 2021-11-26 02:31:30,572  Sent ProduceRequest (v7, 1240 bytes @ 0, CorrId 2756)
> 2021-11-26 02:31:30,572  Received ProduceResponse (v7, 69 bytes, CorrId 2751, 
> rtt 9.79ms)
> 2021-11-26 02:31:30,572  Received ProduceResponse (v7, 69 bytes, CorrId 2752, 
> rtt 10.34ms)
> 2021-11-26 02:31:30,573  Received ProduceResponse (v7, 69 bytes, CorrId 2753, 
> rtt 10.11ms)
> 2021-11-26 02:31:30,872  Received ProduceResponse (v7, 69 bytes, CorrId 2754, 
> rtt 309.69ms)
> 2021-11-26 02:31:30,883  Sent ProduceRequest (v7, 1818 bytes @ 0, CorrId 2757)
> 2021-11-26 02:31:30,887  Sent ProduceRequest (v7, 1655 bytes @ 0, CorrId 2758)
> 2021-11-26 02:31:30,888  Received ProduceResponse (v7, 69 bytes, CorrId 2755, 
> rtt 318.85ms)
> 2021-11-26 02:31:30,893  Sent partial ProduceRequest (v7, 0+16527/37562 
> bytes, CorrId 2759)
> 2021-11-26 02:31:30,894  Sent partial ProduceRequest (v7, 16527+16384/37562 
> bytes, CorrId 2759)
> 2021-11-26 02:31:30,895  Sent ProduceRequest (v7, 37562 bytes @ 32911, CorrId 
> 2759)
> 2021-11-26 02:31:30,896  Sent ProduceRequest (v7, 4700 bytes @ 0, CorrId 2760)
> 2021-11-26 02:31:30,897  Received ProduceResponse (v7, 69 bytes, CorrId 2756, 
> rtt 317.74ms)
> 2021-11-26 02:31:30,897  Received ProduceResponse (v7, 69 bytes, CorrId 2757, 
> rtt 4.22ms)
> 2021-11-26 02:31:30,899  Received ProduceResponse (v7, 69 bytes, CorrId 2758, 
> rtt 2.61ms){code}
>  
> The requests of CorrId 2753 and 2754 are almost sent at the same time, but 
> the Response of 2754 is delayed for ~300ms. 
> We checked the logs on the broker.
>  
> {code:java}
> [2021-11-26 02:31:30,873] DEBUG Completed 
> request:RequestHeader(apiKey=PRODUCE, apiVersion=7, clientId=rdkafka, 
> correlationId=2754) – {acks=1,timeout=80,numPartitions=1},response: 
> {responses=[\{topic=***,partition_responses=[{partition=3

[jira] [Created] (KAFKA-14105) Remove quorum.all_non_upgrade for system tests

2022-07-25 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-14105:
-

 Summary: Remove quorum.all_non_upgrade for system tests
 Key: KAFKA-14105
 URL: https://issues.apache.org/jira/browse/KAFKA-14105
 Project: Kafka
  Issue Type: Task
  Components: kraft, system tests
Reporter: Ron Dagostino


We defined `all_non_upgrade = [zk, remote_kraft]` in `quorum.py` to encapsulate 
the quorum(s) that we want system tests to generally run with when they are 
unrelated to upgrading.  The idea was that we would just annotate tests with 
that and then we would be able to change the definition of it as we move 
through and beyond the KRaft bridge release.  But it is confusing, and 
search-and-replace is cheap -- especially if we are only doing it once or twice 
over the course of the project.  So we should eliminate the definition of 
`quorum.all_non_upgrade` (which was intended to be mutable over the course of 
the project) in favor of something like `zk_and_remote_kraft`, which will 
forever list ZK and REMOTE_KRAFT.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-25 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r929297029


##
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set selectionKeys,
 long nowNanos = channelStartTimeNanos != 0 ? 
channelStartTimeNanos : currentTimeNanos;
 try {
 attemptWrite(key, channel, nowNanos);
+
+// Following is to fix KAFKA-13559. This will prevent 
poll() from blocking for 300 ms when the
+// socket has no data but the buffer has data. Only 
happens when using SSL.
+if (channel.hasBytesBuffered())
+madeReadProgressLastPoll = true;

Review Comment:
   Thank you. You are right. And the test passed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang opened a new pull request, #12439: KAFKA-10199: Further refactor task lifecycle management

2022-07-25 Thread GitBox


guozhangwang opened a new pull request, #12439:
URL: https://github.com/apache/kafka/pull/12439

   1. Consolidate the task recycle procedure into a single function within the 
task. The current procedure now becomes: a) task.recycleStateAndConvert, at end 
of it the task is in `closed` while its stateManager is retained, and the 
manager type has been converted; 2) create the new task with old task's fields 
and the stateManager inside the creators.
   2. Move the task execution related metadata into the corresponding 
`TaskExecutionMetadata` class, including the task idle related metadata (e.g. 
successfully processed tasks); reduce the number of params needed for 
`TaskExecutor` as well as `Tasks`.
   3. Move the task execution related fields (embedded producer and consumer) 
and task creators out of `Tasks` and migrated into `TaskManager`. Now the 
`Tasks` is only a bookkeeping place without any task mutation logic.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14104) Perform CRC validation on KRaft Batch Records and Snapshots

2022-07-25 Thread Niket Goel (Jira)
Niket Goel created KAFKA-14104:
--

 Summary: Perform CRC validation on KRaft Batch Records and 
Snapshots
 Key: KAFKA-14104
 URL: https://issues.apache.org/jira/browse/KAFKA-14104
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.2.0
Reporter: Niket Goel
 Fix For: 3.3


Today we stamp the BatchRecord header with a CRC [1] and verify that CRC before 
the log is written to disk [2]. The CRC checks should also be verified when the 
records are read back from disk. The same procedure should be followed for 
KRaft snapshots as well.

[1] 
[https://github.com/apache/kafka/blob/6b76c01cf895db0651e2cdcc07c2c392f00a8ceb/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L501=]
 

[2] 
[https://github.com/apache/kafka/blob/679e9e0cee67e7d3d2ece204a421ea7da31d73e9/core/src/main/scala/kafka/log/UnifiedLog.scala#L1143]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14104) Perform CRC validation on KRaft Batch Records and Snapshots

2022-07-25 Thread Niket Goel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niket Goel reassigned KAFKA-14104:
--

Assignee: Niket Goel

> Perform CRC validation on KRaft Batch Records and Snapshots
> ---
>
> Key: KAFKA-14104
> URL: https://issues.apache.org/jira/browse/KAFKA-14104
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.2.0
>Reporter: Niket Goel
>Assignee: Niket Goel
>Priority: Blocker
> Fix For: 3.3
>
>
> Today we stamp the BatchRecord header with a CRC [1] and verify that CRC 
> before the log is written to disk [2]. The CRC checks should also be verified 
> when the records are read back from disk. The same procedure should be 
> followed for KRaft snapshots as well.
> [1] 
> [https://github.com/apache/kafka/blob/6b76c01cf895db0651e2cdcc07c2c392f00a8ceb/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L501=]
>  
> [2] 
> [https://github.com/apache/kafka/blob/679e9e0cee67e7d3d2ece204a421ea7da31d73e9/core/src/main/scala/kafka/log/UnifiedLog.scala#L1143]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14078) Replica fetches to follower should return NOT_LEADER error

2022-07-25 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-14078.
-
Resolution: Fixed

> Replica fetches to follower should return NOT_LEADER error
> --
>
> Key: KAFKA-14078
> URL: https://issues.apache.org/jira/browse/KAFKA-14078
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.3.0
>
>
> After the fix for KAFKA-13837, if a follower receives a request from another 
> replica, it will return UNKNOWN_LEADER_EPOCH even if the leader epoch 
> matches. We need to do epoch leader/epoch validation first before we check 
> whether we have a valid replica.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji merged pull request #12411: KAFKA-14078; Do leader/epoch validation in Fetch before checking for valid replica

2022-07-25 Thread GitBox


hachikuji merged PR #12411:
URL: https://github.com/apache/kafka/pull/12411


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12438: KAFKA-13868: Replace YouTube embedded video with links on streams page

2022-07-25 Thread GitBox


divijvaidya commented on PR #12438:
URL: https://github.com/apache/kafka/pull/12438#issuecomment-1194568297

   @bbejeck @mimaison This is the second to last PR before we can close 
https://issues.apache.org/jira/browse/TINKERPOP-2738. The last one will migrate 
this code to website with 33 version.
   
   Kindly review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14099) No REST API request logs in Kafka connect

2022-07-25 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-14099:
-

Assignee: Alexandre Garnier

> No REST API request logs in Kafka connect
> -
>
> Key: KAFKA-14099
> URL: https://issues.apache.org/jira/browse/KAFKA-14099
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Alexandre Garnier
>Assignee: Alexandre Garnier
>Priority: Minor
>  Labels: pull-request-available
>
> Prior to 2.2.1, when an REST API request was performed, there was a request 
> log in the log file:
> {code:java}
> [2022-07-23 07:18:16,128] INFO 172.18.0.1 - - [23/Jul/2022:07:18:16 +] 
> "GET /connectors HTTP/1.1" 200 2 "-" "curl/7.81.0" 66 
> (org.apache.kafka.connect.runtime.rest.RestServer:62)
> {code}
> Since 2.2.1, no more request logs.
>  
> With a bisect, I found the problem comes from [PR 
> 6651|https://github.com/apache/kafka/pull/6651] to fix KAFKA-8304
> From what I understand of the problem, the ContextHandlerCollection is added 
> in the Server 
> ([https://github.com/dongjinleekr/kafka/blob/63a6130af30536d67fca5802005695a84c875b5e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L195])
>  before handlers are really added in the ContextHandlerCollection 
> ([https://github.com/dongjinleekr/kafka/blob/63a6130af30536d67fca5802005695a84c875b5e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L296]).
> I don't know the impact on other handlers, but clearly it doesn't work for 
> the RequestLogHandler.
>  
> A solution I found for the logging issue is to set the RequestLog directly in 
> the server without using an handlers:
> {code:java}
> diff --git 
> i/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
>  
> w/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
> index ab18419efc..4d09cc0e6c 100644
> --- 
> i/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
> +++ 
> w/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
> @@ -187,6 +187,11 @@ public class RestServer {
>  public void initializeServer() {
>  log.info("Initializing REST server");
>  
> +Slf4jRequestLogWriter slf4jRequestLogWriter = new 
> Slf4jRequestLogWriter();
> +
> slf4jRequestLogWriter.setLoggerName(RestServer.class.getCanonicalName());
> +CustomRequestLog requestLog = new 
> CustomRequestLog(slf4jRequestLogWriter, CustomRequestLog.EXTENDED_NCSA_FORMAT 
> + " %{ms}T");
> +jettyServer.setRequestLog(requestLog);
> +
>  /* Needed for graceful shutdown as per `setStopTimeout` 
> documentation */
>  StatisticsHandler statsHandler = new StatisticsHandler();
>  statsHandler.setHandler(handlers);
> @@ -275,14 +280,7 @@ public class RestServer {
>  configureHttpResponsHeaderFilter(context);
>  }
>  
> -RequestLogHandler requestLogHandler = new RequestLogHandler();
> -Slf4jRequestLogWriter slf4jRequestLogWriter = new 
> Slf4jRequestLogWriter();
> -
> slf4jRequestLogWriter.setLoggerName(RestServer.class.getCanonicalName());
> -CustomRequestLog requestLog = new 
> CustomRequestLog(slf4jRequestLogWriter, CustomRequestLog.EXTENDED_NCSA_FORMAT 
> + " %{ms}T");
> -requestLogHandler.setRequestLog(requestLog);
> -
>  contextHandlers.add(new DefaultHandler());
> -contextHandlers.add(requestLogHandler);
>  
>  handlers.setHandlers(contextHandlers.toArray(new Handler[0]));
>  try {
> {code}
> Same issue raised on StackOverflow: 
> [https://stackoverflow.com/questions/67699702/no-rest-api-logs-in-kafka-connect]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #12433: KAFKA-14093: Use single-worker Connect cluster when testing fenced leader recovery

2022-07-25 Thread GitBox


C0urante commented on PR #12433:
URL: https://github.com/apache/kafka/pull/12433#issuecomment-1194542890

   Thanks guys!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571074#comment-17571074
 ] 

ASF GitHub Bot commented on KAFKA-13868:


bbejeck merged PR #427:
URL: https://github.com/apache/kafka-site/pull/427




> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571075#comment-17571075
 ] 

ASF GitHub Bot commented on KAFKA-13868:


bbejeck commented on PR #427:
URL: https://github.com/apache/kafka-site/pull/427#issuecomment-1194536588

   Thanks for this fix @divijvaidya 




> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2022-07-25 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571042#comment-17571042
 ] 

Divij Vaidya commented on KAFKA-12679:
--

Hey [~pnahas] 

I understand that it has been a while since this Jira is open. Would you like 
to submit the patch as a pull request against the trunk branch? 
One thing that would be useful to merge this change is to have a reproducer 
test that succeeds after the fix. I would be happy to help you merge this in 
any way I can. 

Please let us know whether you are still interested.

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Priority: Major
> Fix For: 3.4.0
>
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2022-07-25 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-12679:
-
Fix Version/s: 3.4.0

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Priority: Major
> Fix For: 3.4.0
>
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya opened a new pull request, #12438: KAFKA-13868: Replace YouTube embedded video with links on streams page

2022-07-25 Thread GitBox


divijvaidya opened a new pull request, #12438:
URL: https://github.com/apache/kafka/pull/12438

   As per the [discussion in the 
community](https://lists.apache.org/thread/p24xvbf8nkvxpbj668vc0g3x3lojsnk4), 
we want to replace the embedded YouTube videos with hyperlinks to satisfy the 
[ASF privacy policy](https://privacy.apache.org/faq/committers.html).
   
   This code change replaces the embedded videos from the streams web page on 
the website with hyperlinks.
   
   **Before**
   ![Screenshot 2022-07-25 at 20 05 
02](https://user-images.githubusercontent.com/71267/180844431-82f159de-51c2-4d84-895b-2b58efab3a85.png)
   
   **After**
   ![Screenshot 2022-07-25 at 19 58 
01](https://user-images.githubusercontent.com/71267/180844335-7d4cf22b-543f-4800-9eff-18389cf4318b.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #12423: KAFKA-13158: Move ConnectClusterStateImpl to use Mockito

2022-07-25 Thread GitBox


mimaison merged PR #12423:
URL: https://github.com/apache/kafka/pull/12423


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571021#comment-17571021
 ] 

ASF GitHub Bot commented on KAFKA-13868:


divijvaidya commented on PR #427:
URL: https://github.com/apache/kafka-site/pull/427#issuecomment-1194393568

   @bbejeck perhaps you would like to review this?




> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571020#comment-17571020
 ] 

ASF GitHub Bot commented on KAFKA-13868:


divijvaidya opened a new pull request, #427:
URL: https://github.com/apache/kafka-site/pull/427

   As per the [discussion in the 
community](https://lists.apache.org/thread/p24xvbf8nkvxpbj668vc0g3x3lojsnk4), 
we want to replace the embedded YouTube videos with hyperlinks to satisfy the 
[ASF privacy policy](https://privacy.apache.org/faq/committers.html).
   
   This code change replaces the embedded videos from two of the pages on the 
website with hyperlinks.
   
   **Before**
   ![Screenshot 2022-07-25 at 19 31 
50](https://user-images.githubusercontent.com/71267/180839003-da0f967c-019b-449e-a7c3-3bbac37611dd.png)
   ![Screenshot 2022-07-25 at 19 32 
13](https://user-images.githubusercontent.com/71267/180839020-b37a118d-b8ca-480e-8832-bd19b29cfbdd.png)
   
   **After**
   ![Screenshot 2022-07-25 at 19 32 
01](https://user-images.githubusercontent.com/71267/180839040-591e67df-8053-4633-9c35-52d7fd32fd0c.png)
   ![Screenshot 2022-07-25 at 19 32 
23](https://user-images.githubusercontent.com/71267/180839061-355b3e06-1e9c-40da-89b0-aefd95ee5be5.png)
   
   




> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags

2022-07-25 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571014#comment-17571014
 ] 

Guozhang Wang commented on KAFKA-13877:
---

Hello [~lkokhreidze], are you still actively working on this flaky test?

> Flaky 
> RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
> 
>
> Key: KAFKA-13877
> URL: https://issues.apache.org/jira/browse/KAFKA-13877
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Levani Kokhreidze
>Priority: Major
>  Labels: newbie
>
> The following test fails on local testbeds about once per 10-15 runs:
> {code}
> java.lang.AssertionError
>   at org.junit.Assert.fail(Assert.java:87)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at org.junit.Assert.assertTrue(Assert.java:53)
>   at 
> org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53)
>   at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12427: KAFKA-10199: Add tasks to state updater when they are created

2022-07-25 Thread GitBox


guozhangwang commented on code in PR #12427:
URL: https://github.com/apache/kafka/pull/12427#discussion_r929113602


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -136,10 +139,14 @@ private void createActiveTasks(final Map> activeTask
 
 if (!activeTasksToCreate.isEmpty()) {
 for (final Task activeTask : 
activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate)) {
-activeTasksPerId.put(activeTask.id(), activeTask);
-pendingActiveTasks.remove(activeTask.id());
-for (final TopicPartition topicPartition : 
activeTask.inputPartitions()) {
-activeTasksPerPartition.put(topicPartition, activeTask);
+if (stateUpdater != null) {

Review Comment:
   Sounds good. I will prepare a PR for this :) 



##
streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java:
##
@@ -273,4 +281,70 @@ public static boolean isCheckSupplierCall() {
 return Arrays.stream(Thread.currentThread().getStackTrace())
 .anyMatch(caller -> 
"org.apache.kafka.streams.internals.ApiUtils".equals(caller.getClassName()) && 
"checkSupplier".equals(caller.getMethodName()));
 }
+
+public static StreamTask createStatefulTask(final TaskId taskId,

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] peternied commented on pull request #12407: [BUG] Remove duplicate common.message.* from clients:test jar file

2022-07-25 Thread GitBox


peternied commented on PR #12407:
URL: https://github.com/apache/kafka/pull/12407#issuecomment-1194293359

   @tombentley Can you help me get this PR reviewed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-25 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570964#comment-17570964
 ] 

Jun Rao commented on KAFKA-13868:
-

Thanks, [~mimaison]. I see it now.

> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Gerrrr opened a new pull request, #12437: KAFKA-13769 Add tests for ForeignJoinSubscriptionProcessorSupplier

2022-07-25 Thread GitBox


Ge opened a new pull request, #12437:
URL: https://github.com/apache/kafka/pull/12437

   This PR introduces a test suite that could've caught the issue hotfixed in 
https://github.com/apache/kafka/pull/12420.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14103) Check for hostnames in CoreUtils.scala

2022-07-25 Thread Matthew de Detrich (Jira)
Matthew de Detrich created KAFKA-14103:
--

 Summary: Check for hostnames in CoreUtils.scala
 Key: KAFKA-14103
 URL: https://issues.apache.org/jira/browse/KAFKA-14103
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Matthew de Detrich


In the process of working on [https://github.com/apache/kafka/pull/11478] we 
realized that Kafka does not do any hostname validation when parsing listener 
configurations. It would be ideal to investigate hostname validation so that we 
can eagerly short-circuit on invalid hostnames rather than the current 
behaviour (this needs to be verified). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mdedetrich commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


mdedetrich commented on PR #11478:
URL: https://github.com/apache/kafka/pull/11478#issuecomment-1194125450

   JIRA issue created at https://issues.apache.org/jira/browse/KAFKA-14103


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928933278


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   Doing so causes this test to fail
   
   ```
   props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://apache.org:9092,SSL://[::1]:9092")
   caught = assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
   assertTrue(caught.getMessage.contains("Each listener must have a different 
port"))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928918529


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
+
+  duplicatesWithIpHosts match {
+case eps if eps.isEmpty =>
+case Seq(ep1, ep2) =>
+  if (requireDistinctPorts) {
+val errorMessage = "If you have two listeners on " +
+  s"the same port then one needs to be IPv4 and the other 
IPv6, listeners: $listeners, port: $port"
+require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), 
errorMessage)
+
+// If we reach this point it means that even though 
duplicatesWithIpHosts in isolation can be valid, if
+// there happens to be ANOTHER listener on this port without 
an IP host (such as a null host) then its
+// not valid.
+if (duplicatesWithoutIpHosts.nonEmpty)
+  throw new IllegalArgumentException(errorMessage)
+  }
+case _ =>
+  if (requireDistinctPorts)

Review Comment:
   Committed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928893597


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
+
+  duplicatesWithIpHosts match {
+case eps if eps.isEmpty =>
+case Seq(ep1, ep2) =>
+  if (requireDistinctPorts) {
+val errorMessage = "If you have two listeners on " +
+  s"the same port then one needs to be IPv4 and the other 
IPv6, listeners: $listeners, port: $port"
+require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), 
errorMessage)
+
+// If we reach this point it means that even though 
duplicatesWithIpHosts in isolation can be valid, if
+// there happens to be ANOTHER listener on this port without 
an IP host (such as a null host) then its
+// not valid.
+if (duplicatesWithoutIpHosts.nonEmpty)
+  throw new IllegalArgumentException(errorMessage)
+  }
+case _ =>
+  if (requireDistinctPorts)

Review Comment:
   nit: we can add a comment here to make it clear, ex:
   // more than 2 duplicate endpoints is not allowed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928891618


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   I think we should throw exception if `duplicatesWithoutIpHosts` is not empty 
here, because we actually only expect 
`duplicatesWithIpHosts/duplicatesWithoutIpHosts` to be: `"2 endpoints"/empty 
list`, right? After all, we only allow duplicate ports under **valid** IPv4 and 
IPv6, so we should be able to throw exception directly here, and remove the 
check below:
   ```
   if (duplicatesWithoutIpHosts.nonEmpty)
 throw new IllegalArgumentException(errorMessage)
   ```
   WDYT?
   
   Something like this:
   ```
   case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
 // It's invalid if listener on this port without an IP host (such as a 
null host) 
 if (duplicatesWithoutIpHosts.nonEmpty)
 throw new IllegalArgumentException("...")
   
 duplicatesWithIpHosts match {
   case eps 
   case Seq(ep1, ep2) =>
   if (requireDistinctPorts)
   require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), "If 
you have two listeners on " +
   s"the same port then one needs to be IPv4 and the other IPv6, 
listeners: $listeners, port: $port")
   case _ => 
  
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928891618


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   I think we should throw exception if `duplicatesWithoutIpHosts` is not empty 
here, because we actually only expect 
`duplicatesWithIpHosts/duplicatesWithoutIpHosts` to be: `"2 endpoints"/empty 
list`, right? After all, we only allow duplicate ports under **valid** IPv4 and 
IPv6, so we should be able to throw exception directly here, and remove the 
check below:
   ```
   if (duplicatesWithoutIpHosts.nonEmpty)
 throw new IllegalArgumentException(errorMessage)
   ```
   WDYT?
   
   Something like this:
   ```
   case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
 // It's invalid if listener on this port without an IP host (such as a 
null host) 
 if (duplicatesWithoutIpHosts.nonEmpty)
 throw new IllegalArgumentException("...")
   
 duplicatesWithIpHosts match {
   case eps 
   case Seq(ep1, ep2) =>
   if (requireDistinctPorts)
   require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), "If 
you have two listeners on " +
   s"the same port then one needs to be IPv4 and the other IPv6, 
listeners: $listeners, port: $port")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14093) Flaky ExactlyOnceSourceIntegrationTest.testFencedLeaderRecovery

2022-07-25 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-14093.

Fix Version/s: 3.3.0
 Assignee: Chris Egerton
   Resolution: Fixed

> Flaky ExactlyOnceSourceIntegrationTest.testFencedLeaderRecovery
> ---
>
> Key: KAFKA-14093
> URL: https://issues.apache.org/jira/browse/KAFKA-14093
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.3.0
>
> Attachments: 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testFencedLeaderRecovery.test.stdout
>
>
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest > 
> testFencedLeaderRecovery FAILED
> java.lang.AssertionError: expected 
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException to be 
> thrown, but nothing was thrown



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928891618


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   I think we should throw exception if `duplicatesWithoutIpHosts` is not empty 
here, because we actually only expect 
`duplicatesWithIpHosts/duplicatesWithoutIpHosts` to be: `"2 endpoints"/empty 
list`, right? After all, we only allow duplicate ports under **valid** IPv4 and 
IPv6, so we should be able to throw exception directly here, and remove the 
check below:
   ```
   if (duplicatesWithoutIpHosts.nonEmpty)
 throw new IllegalArgumentException(errorMessage)
   ```
   WDYT?
   
   Something like this:
   ```
   case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
 // It's invalid if listener on this port without an IP host (such as a 
null host) 
 if (duplicatesWithoutIpHosts.nonEmpty)
 throw new IllegalArgumentException("...")
   
 duplicatesWithIpHosts match {
   case eps 
   if (requireDistinctPorts)
   require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), 
"If you have two listeners on " +
 s"the same port then one needs to be IPv4 and the other 
IPv6, listeners: $listeners, port: $port")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


showuon commented on PR #11478:
URL: https://github.com/apache/kafka/pull/11478#issuecomment-1194061491

   > As mentioned above if we are on the same page then creating a JIRA issue 
to do additional checking to see if hosts are valid seems appropriate.
   
   Yes, please create a JIRA issue. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928892036


##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -206,6 +206,51 @@ class KafkaConfigTest {
 assertBadConfigContainingMessage(props, "Each listener must have a 
different name")
   }
 
+  @Test
+  def testIPv4AndIPv6SamePortListeners(): Unit = {
+val props = new Properties()
+props.put(KafkaConfig.BrokerIdProp, "1")
+props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+
+props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://[::1]:9092,SSL://[::1]:9092")
+var caught = assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
+assertTrue(caught.getMessage.contains("If you have two listeners on the 
same port then one needs to be IPv4 and the other IPv6"))
+
+props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9092")
+caught = assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
+assertTrue(caught.getMessage.contains("If you have two listeners on the 
same port then one needs to be IPv4 and the other IPv6"))
+
+props.put(KafkaConfig.ListenersProp, 
"SSL://[::1]:9096,PLAINTEXT://127.0.0.1:9096,SASL_SSL://:9096")

Review Comment:
   Thanks for adding this test case.



##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,60 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, _) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   I think we should throw exception if `duplicatesWithoutIpHosts` is not empty 
here, because we actually only expect 
`duplicatesWithIpHosts/duplicatesWithoutIpHosts` to be: `"2 IPs"/empty list`, 
right? After all, we only allow duplicate ports under **valid** IPv4 and IPv6, 
so we should be able to throw exception directly here, and remove the check 
below:
   ```
   if (duplicatesWithoutIpHosts.nonEmpty)
 throw new IllegalArgumentException(errorMessage)
   ```
   WDYT?
   
   Something like this:
   ```
   case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
 // It's invalid if listener on this port without an IP host (such as a 
null host) 
 if (duplicatesWithoutIpHosts.nonEmpty)
 throw new IllegalArgumentException("...")
   
 duplicatesWithIpHosts match {
   case eps 
   if (requireDistinctPorts)
   require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), 
"If you have two listeners on " +
 s"the same port then one needs to be IPv4 and the other 
IPv6, listeners: $listeners, port: $port")
   ```



##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,

[GitHub] [kafka] mimaison commented on pull request #12433: KAFKA-14093: Use single-worker Connect cluster when testing fenced leader recovery

2022-07-25 Thread GitBox


mimaison commented on PR #12433:
URL: https://github.com/apache/kafka/pull/12433#issuecomment-1194059039

   @jsancio I'm backporting this to 3.3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #12433: KAFKA-14093: Use single-worker Connect cluster when testing fenced leader recovery

2022-07-25 Thread GitBox


mimaison merged PR #12433:
URL: https://github.com/apache/kafka/pull/12433


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #12436: MINOR: Use builder for mock task in DefaultStateUpdaterTest

2022-07-25 Thread GitBox


cadonna opened a new pull request, #12436:
URL: https://github.com/apache/kafka/pull/12436

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12427: KAFKA-10199: Add tasks to state updater when they are created

2022-07-25 Thread GitBox


cadonna commented on code in PR #12427:
URL: https://github.com/apache/kafka/pull/12427#discussion_r928855148


##
streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java:
##
@@ -273,4 +281,70 @@ public static boolean isCheckSupplierCall() {
 return Arrays.stream(Thread.currentThread().getStackTrace())
 .anyMatch(caller -> 
"org.apache.kafka.streams.internals.ApiUtils".equals(caller.getClassName()) && 
"checkSupplier".equals(caller.getMethodName()));
 }
+
+public static StreamTask createStatefulTask(final TaskId taskId,

Review Comment:
   BTW, you do not want to use a mock task in `StreamTaskTest` since you are 
testing a task there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14101) Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary

2022-07-25 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-14101:
-

Assignee: Chris Egerton

> Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary
> 
>
> Key: KAFKA-14101
> URL: https://issues.apache.org/jira/browse/KAFKA-14101
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Attachments: 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary.test.stdout
>
>
> I hit this one while running the tests on your branch from 
> https://github.com/apache/kafka/pull/12429
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest > 
> testConnectorBoundary FAILED
> java.lang.AssertionError: Committed records should exclude 
> connector-aborted transactions expected:<[1, 3, 4, 5, 9, 10, 11, 12, 13, 14, 
> 15, 16, 17, 18, 19, 20, 21, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 
> 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 
> 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 
> 85, 86, 87, 88, 89]> but was:<[4, 5, 10, 13, 16, 18, 20, 37, 39, 40, 46, 47, 
> 49, 54, 59, 64, 65, 68, 70, 71, 77, 83, 85, 89, 146, 148, 153, 154, 157, 158, 
> 159, 163, 165, 169, 175, 176, 178, 183, 184, 185, 187, 188, 191, 196, 199, 
> 211, 216, 217, 218, 222, 223, 229, 232, 238, 244, 251, 255, 259, 261, 269, 
> 272, 274, 275, 276, 277, 278, 279, 285, 291, 293, 296, 299]>
> at org.junit.Assert.fail(Assert.java:89)
> at org.junit.Assert.failNotEquals(Assert.java:835)
> at org.junit.Assert.assertEquals(Assert.java:120)
> at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:456)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14102) (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first started app can consume messages

2022-07-25 Thread Shuo Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Chen updated KAFKA-14102:
--
Description: 
We have 2 web applications (A and B) will consume messages from the same Kafka 
Server,  so they have the same configurations:
{code:java}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required; sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler
jaas.enabled=true{code}
 

A and B deployed together in one Tomcat server (means they are in JVM process), 
startup  sequential is A -> B,  then we find B cannot consume the message with 
following exception:
{code:java}
[2022-07-22 02:52:45,184] [ INFO] 6 
[org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure 
SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be 
castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
at 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
 ~[kafka-clients-3.0.1.jar:?]
at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
 ~[kafka-clients-3.0.1.jar:?]
... suppressed 2 lines
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:206)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:143) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) 
~[kafka-clients-3.0.1.jar:?]
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522)
 ~[spring-kafka-2.8.6.jar:2.8.6]
at 
or

[jira] [Updated] (KAFKA-14102) (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first started app can consume messages

2022-07-25 Thread Shuo Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Chen updated KAFKA-14102:
--
Description: 
We have 2 web applications (A and B) will consume messages from the same Kafka 
Server,  so they have the same configurations:
{code:java}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required; 
sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler 
jaas.enabled=true{code}
 

 

A and B deployed together in one Tomcat server (means they are in JVM process), 
startup  sequential is A -> B,  then we find B cannot consume the message with 
following exception:
{code:java}
[2022-07-22 02:52:45,184] [ INFO] 6 
[org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure 
SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be 
castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
at 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
 ~[kafka-clients-3.0.1.jar:?]
at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
 ~[kafka-clients-3.0.1.jar:?]
... suppressed 2 lines
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:206)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:143) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) 
~[kafka-clients-3.0.1.jar:?]
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522)
 ~[spring-kafka-2.8.6.jar:2.8.6]
a

[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928815976


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,63 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctListenerNames = endpoints.map(_.listenerName).distinct
+require(distinctListenerNames.size == endpoints.size, s"Each listener must 
have a different name unless you have exactly " +
+  s"one listener on IPv4 and the other IPv6 on the same port, listeners: 
$listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-  val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-  require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+  val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case 
(_, eps) => eps }.toList
+
+  checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+  // Exception case, lets allow duplicate ports if the host is on IPv4 and 
the other is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)

Review Comment:
   So I think there is a bit of confusion here
   
   > Should we throw exception when the host is invalid?
   
   Initially when you said this I thought you were talking about if the host 
actually exists (i.e. making a connection to a host to see if machine is alive) 
rather than to just see if the host is valid in terms of formatting
   
   > In that case, should we do this validation? 
inetAddressValidator.isValid(ep.host). It looks like we don't care about if the 
host is valid or not, all we want to filter out, is the null host case (i.e. 
PLAINTEXT://:9096), is my understanding correct?
   
   So the main reason behind `inetAddressValidator.isValid(ep.host)` is 
actually to filter out valid IP addresses from everything else (because we do 
the duplicate port checking for IPv4 vs IPv6 it only makes sense on IP 
addresses). The host part is actually completely irrelevant and the general 
intention about the code is to only care about the valid IP part (hence why we 
are partitioning on valid IP's), whether there are null or valid hosts is 
irrelevant (also because the code beforehand didn't do any checking of hosts 
either, it only checked ports and whether they are duplicate).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


mdedetrich commented on PR #11478:
URL: https://github.com/apache/kafka/pull/11478#issuecomment-1193974612

   > In that case, should we do this validation? 
inetAddressValidator.isValid(ep.host). It looks like we don't care about if the 
host is valid or not, all we want to filter out, is the null host case (i.e. 
PLAINTEXT://:9096), is my understanding correct?
   
   So we still need it, not for seeing whether hosts are invalid or not (since 
we don't care about hosts) but rather to filter out all of the IP addresses. 
This is answered in more detail at 
https://github.com/apache/kafka/pull/11478#discussion_r928815976


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928815976


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,63 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctListenerNames = endpoints.map(_.listenerName).distinct
+require(distinctListenerNames.size == endpoints.size, s"Each listener must 
have a different name unless you have exactly " +
+  s"one listener on IPv4 and the other IPv6 on the same port, listeners: 
$listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-  val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-  require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+  val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case 
(_, eps) => eps }.toList
+
+  checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+  // Exception case, lets allow duplicate ports if the host is on IPv4 and 
the other is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)

Review Comment:
   So I think there is a bit of confusion here
   
   > Should we throw exception when the host is invalid?
   
   Initially when you said this I thought you were talking about if the host 
actually exists (i.e. making a connection to a host to see if machine is alive) 
rather than to just see if the host is valid in terms of formatting
   
   > In that case, should we do this validation? 
inetAddressValidator.isValid(ep.host). It looks like we don't care about if the 
host is valid or not, all we want to filter out, is the null host case (i.e. 
PLAINTEXT://:9096), is my understanding correct?
   
   So the main reason behind `inetAddressValidator.isValid(ep.host)` is 
actually to filter out valid IP addresses from everything else (because we do 
the duplicate port checking for IPv4 vs IPv6 it only makes sense on IP 
addresses). The host part is actually completely irrelevant and the general 
intention about the code is to only care about the valid IP part (hence why we 
are partitioning on valid IP's), whether there are null or valid hosts is 
irrelevant (also because the code beforehand didn't do any checking of hosts 
either, it only checked ports and whether they are duplicate).
   
   If we are on the same page then creating a JIRA issue to do additional 
checking to see if hosts are valid is appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14102) (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first started app can consume messages

2022-07-25 Thread Shuo Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Chen updated KAFKA-14102:
--
Description: 
We have 2 web applications (A and B) will consume messages from the same Kafka 
Server,  so they have the same configurations:
{code:java}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required; 
sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler 
jaas.enabled=true{code}
 

A and B are deployed together in one Tomcat server (means they are in JVM 
process), startup  sequential is A -> B,  then we find B cannot consume the 
message with following exception:
{code:java}
[2022-07-22 02:52:45,184] [ INFO] 6 
[org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure 
SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be 
castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
at 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
 ~[kafka-clients-3.0.1.jar:?]
at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
 ~[kafka-clients-3.0.1.jar:?]
... suppressed 2 lines
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:206)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:143) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) 
~[kafka-clients-3.0.1.jar:?]
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522)
 ~[spring-kafka-2.8.6.jar:2.8.6]

[jira] [Updated] (KAFKA-14102) (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first started app can consume messages

2022-07-25 Thread Shuo Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Chen updated KAFKA-14102:
--
Description: 
We have 2 web applications (A and B) will consume messages from the same Kafka 
Server,  so they have the same configurations:
{code:java}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required; 
sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler 
jaas.enabled=true{code}
 

A and B deployed together in one Tomcat server (means they are in JVM process), 
startup  sequential is A -> B,  then we find B cannot consume the message with 
following exception:
{code:java}
[2022-07-22 02:52:45,184] [ INFO] 6 
[org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure 
SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be 
castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
at 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
 ~[kafka-clients-3.0.1.jar:?]
at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
 ~[kafka-clients-3.0.1.jar:?]
... suppressed 2 lines
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:206)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:143) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) 
~[kafka-clients-3.0.1.jar:?]
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522)
 ~[spring-kafka-2.8.6.jar:2.8.6]
at 

[jira] [Updated] (KAFKA-14102) (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first started app can consume messages

2022-07-25 Thread Shuo Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Chen updated KAFKA-14102:
--
Description: 
We have 2 web applications (A and B) will consume messages from the same Kafka 
Server,  so they have the same configurations:
{code:java}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required; 
sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler 
jaas.enabled=true{code}
 

 

A and B deployed together in one Tomcat server (means they are in JVM process), 
startup  sequential is A -> B,  then we find B cannot consume the message with 
following exception:
{code:java}
[2022-07-22 02:52:45,184] [ INFO] 6 
[org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure 
SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be 
castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
at 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
 ~[kafka-clients-3.0.1.jar:?]
at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
 ~[kafka-clients-3.0.1.jar:?]
... suppressed 2 lines
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:206)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:143) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) 
~[kafka-clients-3.0.1.jar:?]
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522)
 ~[spring-kafka-2.8.6.jar:2.8.6]
a

[jira] [Updated] (KAFKA-14102) (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first started app can consume messages

2022-07-25 Thread Shuo Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Chen updated KAFKA-14102:
--
Description: 
We have 2 web applications (A and B) will consume messages from the same Kafka 
Server,  so they have the same configurations:

 
{code:java}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required; 
sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler 
jaas.enabled=true{code}
 

 

A and B deployed together in one Tomcat server (means they are in JVM process), 
startup  sequential is A -> B,  then we find B cannot consume the message with 
following exception:

 
{code:java}
[2022-07-22 02:52:45,184] [ INFO] 6 
[org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure 
SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be 
castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
at 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
 ~[kafka-clients-3.0.1.jar:?]
at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
 ~[kafka-clients-3.0.1.jar:?]
... suppressed 2 lines
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:206)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:143) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) 
~[kafka-clients-3.0.1.jar:?]
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522)
 ~[spring-kafka-2.8.6.jar:2.

[jira] [Created] (KAFKA-14102) (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first started app can consume messages

2022-07-25 Thread Shuo Chen (Jira)
Shuo Chen created KAFKA-14102:
-

 Summary: (SASL/OAUTHBEARER) multiple applications in one JVM 
process, only the first started app can consume messages
 Key: KAFKA-14102
 URL: https://issues.apache.org/jira/browse/KAFKA-14102
 Project: Kafka
  Issue Type: Bug
  Components: clients, KafkaConnect
Affects Versions: 3.0.1
Reporter: Shuo Chen


We have 2 web applications (A and B) will consume messages from the same Kafka 
Server,  so they have the same configurations:

 
{code:java}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required; 
sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler 
jaas.enabled=true{code}
 

 

A and B deployed together in one Tomcat server (means they are in JVM process), 
startup  sequential is A -> B,  then we find B cannot consume the message with 
following exception:

 
{code:java}
[2022-07-22 02:52:45,184] [ INFO] 6 
[org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure 
SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be 
castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
at 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
 ~[kafka-clients-3.0.1.jar:?]
at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
 ~[kafka-clients-3.0.1.jar:?]
... suppressed 2 lines
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:206)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:143) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clien

[GitHub] [kafka] tombentley commented on a diff in pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

2022-07-25 Thread GitBox


tombentley commented on code in PR #12429:
URL: https://github.com/apache/kafka/pull/12429#discussion_r928773599


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -217,6 +217,9 @@ protected void finalOffsetCommit(boolean failed) {
 if (failed) {
 log.debug("Skipping final offset commit as task has failed");
 return;
+} else if (isCancelled()) {
+log.debug("Skipping final offset commit as task has been cancelled 
and its producer has already been closed");

Review Comment:
   Is it necessarily true that the producer is already closed, given that the 
closure on cancellation is actually delegated to an executor?



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
 recordNum >= recordsProduced);
 
 // also consume from the connector's dedicated offsets topic; just 
need to read one offset record

Review Comment:
   Is the "just need to read one offset record" part of the comment still 
correct?



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
 recordNum >= recordsProduced);
 
 // also consume from the connector's dedicated offsets topic; just 
need to read one offset record
-ConsumerRecord offsetRecord = 
connectorTargetedCluster
-.consume(
-1,
+ConsumerRecords offsetRecords = 
connectorTargetedCluster
+.consumeAll(
 TimeUnit.MINUTES.toMillis(1),
 
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
"read_committed"),
+null,
 offsetsTopic
-).iterator().next();
-long seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
-assertEquals("Offset commits should occur on connector-defined 
poll boundaries, which happen every " + recordsProduced + " records",
-0, seqno % recordsProduced);
+);
+List seqnos = 
parseAndAssertOffsetsForSingleTask(offsetRecords);
+seqnos.forEach(seqno ->
+assertEquals("Offset commits should occur on connector-defined 
poll boundaries, which happen every " + recordsProduced + " records",
+0, seqno % recordsProduced)
+);
 
 // also consume from the cluster's global offsets topic; again, 
just need to read one offset record

Review Comment:
   "just need to read one offset record" again seems suspect, given the change 
from `consume` to `consumeAll` below.



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -826,15 +830,22 @@ public void testSeparateOffsetsTopic() throws Exception {
 assertConnectorStopped(connectorStop);
 
 // consume all records from the source topic or fail, to ensure 
that they were correctly produced
-ConsumerRecords records = 
connectorTargetedCluster.consumeAll(
+ConsumerRecords sourceRecords = 
connectorTargetedCluster.consumeAll(
 CONSUME_RECORDS_TIMEOUT_MS,
 
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
"read_committed"),
 null,
 topic
 );
-assertTrue("Not enough records produced by source connector. 
Expected at least: " + recordsProduced + " + but got " + records.count(),
-records.count() >= recordsProduced);
-assertExactlyOnceSeqnos(records, numTasks);
+assertTrue("Not enough records produced by source connector. 
Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(),
+sourceRecords.count() >= recordsProduced);
+// also have to check which offsets have actually been committed, 
since we no longer have exactly-once guarantees

Review Comment:
   I don't quite follow the "no longer have EOS guarantees" part. And that made 
me notice that this test method (unlike the others in the class) doesn't have a 
javadoc description about what exactly is being tested here.



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
 recordNum >= recordsProduced);

Review Comment:
   I know this is out of the scope of this fix, but I noticed that the 
`recordNum >= records

[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928747703


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,57 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case 
(_, eps) => eps }.toList
+
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   So I have just pushed a commit `Handle duplicatesWithoutIpHosts and 
duplicatesWithoutIpHosts nonEmpty case` which solves this issue by checking if 
`duplicatesWithoutIpHosts` is non empty even if `duplicatesWithIpHosts` happens 
to be valid for a given port.
   
   A comment has been added to make clear whats going on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928747703


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,57 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case 
(_, eps) => eps }.toList
+
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   So I have just pushed a commit `Handle duplicatesWithoutIpHosts and 
duplicatesWithoutIpHosts nonEmpty case` which solves this issue by adjusting 
the `checkDuplicateListenerPorts` function with a `shortCircuitFlag` and 
checking that `duplicatesWithoutIpHosts.isEmpty`. Also added your test case.
   
   EDIT: Still working on solution



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928747703


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,57 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case 
(_, eps) => eps }.toList
+
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   So I have just pushed a commit `Handle duplicatesWithoutIpHosts and 
duplicatesWithoutIpHosts nonEmpty case` which solves this issue by adjusting 
the `checkDuplicateListenerPorts` function with a `shortCircuitFlag` and 
checking that `duplicatesWithoutIpHosts.isEmpty`. Also added your test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12427: KAFKA-10199: Add tasks to state updater when they are created

2022-07-25 Thread GitBox


cadonna commented on code in PR #12427:
URL: https://github.com/apache/kafka/pull/12427#discussion_r928724038


##
streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java:
##
@@ -273,4 +281,70 @@ public static boolean isCheckSupplierCall() {
 return Arrays.stream(Thread.currentThread().getStackTrace())
 .anyMatch(caller -> 
"org.apache.kafka.streams.internals.ApiUtils".equals(caller.getClassName()) && 
"checkSupplier".equals(caller.getMethodName()));
 }
+
+public static StreamTask createStatefulTask(final TaskId taskId,

Review Comment:
   That was the idea. I just did not want to pollute too much this PR.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -136,10 +139,14 @@ private void createActiveTasks(final Map> activeTask
 
 if (!activeTasksToCreate.isEmpty()) {
 for (final Task activeTask : 
activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate)) {
-activeTasksPerId.put(activeTask.id(), activeTask);
-pendingActiveTasks.remove(activeTask.id());
-for (final TopicPartition topicPartition : 
activeTask.inputPartitions()) {
-activeTasksPerPartition.put(topicPartition, activeTask);
+if (stateUpdater != null) {

Review Comment:
   I agree!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14101) Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary

2022-07-25 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570820#comment-17570820
 ] 

Mickael Maison commented on KAFKA-14101:


cc [~ChrisEgerton]

> Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary
> 
>
> Key: KAFKA-14101
> URL: https://issues.apache.org/jira/browse/KAFKA-14101
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Priority: Major
> Attachments: 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary.test.stdout
>
>
> I hit this one while running the tests on your branch from 
> https://github.com/apache/kafka/pull/12429
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest > 
> testConnectorBoundary FAILED
> java.lang.AssertionError: Committed records should exclude 
> connector-aborted transactions expected:<[1, 3, 4, 5, 9, 10, 11, 12, 13, 14, 
> 15, 16, 17, 18, 19, 20, 21, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 
> 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 
> 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 
> 85, 86, 87, 88, 89]> but was:<[4, 5, 10, 13, 16, 18, 20, 37, 39, 40, 46, 47, 
> 49, 54, 59, 64, 65, 68, 70, 71, 77, 83, 85, 89, 146, 148, 153, 154, 157, 158, 
> 159, 163, 165, 169, 175, 176, 178, 183, 184, 185, 187, 188, 191, 196, 199, 
> 211, 216, 217, 218, 222, 223, 229, 232, 238, 244, 251, 255, 259, 261, 269, 
> 272, 274, 275, 276, 277, 278, 279, 285, 291, 293, 296, 299]>
> at org.junit.Assert.fail(Assert.java:89)
> at org.junit.Assert.failNotEquals(Assert.java:835)
> at org.junit.Assert.assertEquals(Assert.java:120)
> at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:456)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14101) Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary

2022-07-25 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14101:
--

 Summary: Flaky 
ExactlyOnceSourceIntegrationTest.testConnectorBoundary
 Key: KAFKA-14101
 URL: https://issues.apache.org/jira/browse/KAFKA-14101
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Mickael Maison
 Attachments: 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary.test.stdout

I hit this one while running the tests on your branch from 
https://github.com/apache/kafka/pull/12429

org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest > 
testConnectorBoundary FAILED
java.lang.AssertionError: Committed records should exclude 
connector-aborted transactions expected:<[1, 3, 4, 5, 9, 10, 11, 12, 13, 14, 
15, 16, 17, 18, 19, 20, 21, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 
48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 
68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 
88, 89]> but was:<[4, 5, 10, 13, 16, 18, 20, 37, 39, 40, 46, 47, 49, 54, 59, 
64, 65, 68, 70, 71, 77, 83, 85, 89, 146, 148, 153, 154, 157, 158, 159, 163, 
165, 169, 175, 176, 178, 183, 184, 185, 187, 188, 191, 196, 199, 211, 216, 217, 
218, 222, 223, 229, 232, 238, 244, 251, 255, 259, 261, 269, 272, 274, 275, 276, 
277, 278, 279, 285, 291, 293, 296, 299]>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotEquals(Assert.java:835)
at org.junit.Assert.assertEquals(Assert.java:120)
at 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:456)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-25 Thread GitBox


cadonna commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1193824200

   Personally, I would prefer to make the migration incrementally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-25 Thread GitBox


cadonna commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1193822849

   @clolov Let's first try to make it work with the junit vintage engine. I am 
not clear if there is still an issue with powermock and junit5 and it seems 
like Streams still uses powermock.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-25 Thread GitBox


divijvaidya commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1193821477

   @clolov we don't have to do it in one go. We can create a new project/task 
in `build.gradle` which runs streams `integration tests` tagged with 
`@tag('integration')` using Junit5. When we are done with Junit4 to Junit5 
migration, we can remove that project/task use the usual `integrationTest` task.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-25 Thread GitBox


clolov commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1193806664

   So then would you like me to coalesce 
https://github.com/apache/kafka/pull/12301 and 
https://github.com/apache/kafka/pull/12302 into one, add all remaining tests, 
make the changes to build.gradle and all other files into a single pull request?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ashmeet13 commented on pull request #12414: KAFKA-14073 Logging the reason for Snapshot

2022-07-25 Thread GitBox


ashmeet13 commented on PR #12414:
URL: https://github.com/apache/kafka/pull/12414#issuecomment-1193791498

   @dengziming I have made the requested changes and added the sample logs in 
the description of the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-25 Thread GitBox


rajinisivaram commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r928627267


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,93 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:

Review Comment:
   We don't usually add references to tickets in comments. Can we rewrite this 
comment to say what it is testing and move the explanation of the bug into the 
PR description that will be included in the commit message?



##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,93 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in 
Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+shutdownServerAndMetrics(server)
+
+// create server with SSL listener

Review Comment:
   Unnecessary comment



##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,93 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in 
Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+shutdownServerAndMetrics(server)
+
+// create server with SSL listener
+val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+testableServer.enableRequestProcessing(Map.empty)
+val testableSelector = testableServer.testableSelector
+val proxyServer = new ProxyServer(testableServer)
+val selectTimeout = 5000  // in ms

Review Comment:
   Call this selectTimeoutMs and remove comment?



##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,93 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is t

[GitHub] [kafka] ashmeet13 commented on a diff in pull request #12414: KAFKA-14073 Logging the reason for Snapshot

2022-07-25 Thread GitBox


ashmeet13 commented on code in PR #12414:
URL: https://github.com/apache/kafka/pull/12414#discussion_r928623470


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -119,16 +119,24 @@ class BrokerMetadataListener(
   }
 
   _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
-  if (shouldSnapshot()) {
-maybeStartSnapshot()
+  
+  val (takeSnapshot, snapshotReason) = shouldSnapshot()
+  if (takeSnapshot) {
+maybeStartSnapshot(snapshotReason)
   }
 
   _publisher.foreach(publish)
 }
   }
 
-  private def shouldSnapshot(): Boolean = {
-(_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) || 
metadataVersionChanged()
+  private def shouldSnapshot(): (Boolean, String) = {

Review Comment:
   Thanks, I have made this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928621118


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,57 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case 
(_, eps) => eps }.toList
+
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)

Review Comment:
   Indeed this is redundant, removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2022-07-25 Thread GitBox


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928617296


##
core/src/main/scala/kafka/utils/CoreUtils.scala:
##
@@ -252,16 +255,57 @@ object CoreUtils {
 listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+(inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+  (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+val distinctPorts = endpoints.map(_.port).distinct
+require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
 def validate(endPoints: Seq[EndPoint]): Unit = {
-  // filter port 0 for unit tests
-  val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
   val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
   require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-  if (requireDistinctPorts) {
-val distinctPorts = portsExcludingZero.distinct
-require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+  val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+// filter port 0 for unit tests
+ep => ep.port != 0
+  }.groupBy(_.port).partition {
+case (_, endpoints) => endpoints.size > 1
+  }
+
+  val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case 
(_, eps) => eps }.toList
+
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+  // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+  val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+case (port, eps) =>
+  (port, eps.partition(ep =>
+ep.host != null && inetAddressValidator.isValid(ep.host)
+  ))
+  }
+
+  // Iterate through every grouping of duplicates by port to see if they 
are valid
+  duplicatePortsPartitionedByValidIps.foreach {
+case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+  if (requireDistinctPorts)
+checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   > Is that what we expected? I thought in this PR(KIP), we allow to use the 
same port only when one is in ipv4, and the other one is in ipv6. But this case 
also allow the 3rd set of same port listener. Is it correct?
   
   No its not expected, I have to explicitly check for null hosts (didn't 
realize they existed). Will work on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org