[jira] [Commented] (KAFKA-14184) Kafka streams application crashes due to "UnsupportedOperationException: this should not happen: timestamp() is not supported in standby tasks."

2022-11-17 Thread Suresh Rukmangathan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635723#comment-17635723
 ] 

Suresh Rukmangathan commented on KAFKA-14184:
-

Few updates:-
 # We are using ProcessorSupplier & it will return a "new instance" every time. 
So, the problem of same instance is not applicable.
 # After moving to Kafka Streams version: 3.2.2, we are not seeing this 
exception any longer. There were no other application side properties or design 
changes. It has been stable for about a week of continuous testing. So, it 
might a good idea to correlate to any fix that went around standby code in this 
context and tag this problem to the relevant release notes as the fix - which 
can help others like me. I lost quite some time chasing this issue and had to 
keep the app instance to 1 to avoid this exception - which meant I had to stall 
the scale testing as well.

> Kafka streams application crashes due to "UnsupportedOperationException: this 
> should not happen: timestamp() is not supported in standby tasks."
> 
>
> Key: KAFKA-14184
> URL: https://issues.apache.org/jira/browse/KAFKA-14184
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Suresh Rukmangathan
>Priority: Critical
>
> Kafka streams application is crashing with following stack trace with 3 
> frames from the app removed that are process/state-store related functions.
>  
> {code:java}
> java.lang.UnsupportedOperationException: this should not happen: timestamp() 
> is not supported in standby tasks.\n\n\tat 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.throwUnsupportedOperationExceptionIfStandby(ProcessorContextImpl.java:352)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.timestamp(ProcessorContextImpl.java:328)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.log(ChangeLoggingKeyValueBytesStore.java:136)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:78)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:197)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:197)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120)\n\n\tat
>  // app-calls to process & save to state store - 3 frames 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)\n"
> {code}
>  
> Key Kafka streams application configuration details are as below:-
> {code:java}
> {replication.factor=1, num.standby.replicas=1, topology.optimization=all, 
> max.request.size=1048576, auto.offset.reset=earliest}{code}
>  
> If Kafka streams replication factor = 1 and standby replicas=1, is 

[GitHub] [kafka] dajac commented on pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-17 Thread GitBox


dajac commented on PR #12845:
URL: https://github.com/apache/kafka/pull/12845#issuecomment-1319647546

   @jeffkbkim Thanks for the review. I have addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-17 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026102820


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2524,196 +2530,208 @@ class KafkaApisTest {
 assertEquals(MemoryRecords.EMPTY, 
FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-val protocols = List(
-  ("first", "first".getBytes()),
-  ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+  .setRebalanceTimeoutMs(1000)
+  .setSessionTimeoutMs(2000)
+
+val requestChannelRequest = buildRequest(new 
JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+val expectedRequestContext = new GroupCoordinatorRequestContext(
+  version,
+  requestChannelRequest.context.clientId,
+  requestChannelRequest.context.clientAddress,
+  RequestLocal.NoCaching.bufferSupplier
 )
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
-val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = 
ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+val expectedJoinGroupRequest = new JoinGroupRequestData()
+  .setGroupId(joinGroupRequest.groupId)
+  .setMemberId(joinGroupRequest.memberId)
+  .setProtocolType(joinGroupRequest.protocolType)
+  .setRebalanceTimeoutMs(if (version >= 1) 
joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+  .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-createKafkaApis().handleJoinGroupRequest(
-  buildRequest(
-new JoinGroupRequest.Builder(
-  new JoinGroupRequestData()
-.setGroupId(groupId)
-.setMemberId(memberId)
-.setProtocolType(protocolType)
-.setRebalanceTimeoutMs(rebalanceTimeoutMs)
-.setSessionTimeoutMs(sessionTimeoutMs)
-.setProtocols(new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-  protocols.map { case (name, protocol) => new 
JoinGroupRequestProtocol()
-.setName(name).setMetadata(protocol)
-  }.iterator.asJava))
-).build()
-  ),
-  RequestLocal.withThreadConfinedCaching)
+val future = new CompletableFuture[JoinGroupResponseData]()
+when(newGroupCoordinator.joinGroup(
+  ArgumentMatchers.eq(expectedRequestContext),
+  ArgumentMatchers.eq(expectedJoinGroupRequest)
+)).thenReturn(future)
 
-verify(groupCoordinator).handleJoinGroup(
-  ArgumentMatchers.eq(groupId),
-  ArgumentMatchers.eq(memberId),
-  ArgumentMatchers.eq(None),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(clientId),
-  ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-  ArgumentMatchers.eq(rebalanceTimeoutMs),
-  ArgumentMatchers.eq(sessionTimeoutMs),
-  ArgumentMatchers.eq(protocolType),
-  capturedProtocols.capture(),
-  any(),
-  any(),
-  any()
+createKafkaApis().handleJoinGroupRequest(
+  requestChannelRequest,
+  RequestLocal.NoCaching
 )
-val capturedProtocolsList = capturedProtocols.getValue
-assertEquals(protocols.size, capturedProtocolsList.size)
-protocols.zip(capturedProtocolsList).foreach { case ((expectedName, 
expectedBytes), (name, bytes)) =>
-  assertEquals(expectedName, name)
-  assertArrayEquals(expectedBytes, bytes)
-}
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-for (version <- ApiKeys.JOIN_GROUP.oldestVersion to 
ApiKeys.JOIN_GROUP.latestVersion) {
-  testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-}
-  }
+val expectedJoinGroupResponse = new JoinGroupResponseData()
+  .setMemberId("member")
+  .setGenerationId(0)
+  .setLeader("leader")
+  .setProtocolType("consumer")
+  .setProtocolName("range")
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-reset(groupCoordinator, clientRequestQuotaManager, requestChannel, 
replicaManager)
+future.complete(expectedJoinGroupResponse)
+val capturedResponse = verifyNoThrottling(requestChannelRequest)
+val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+assertEquals(expectedJoinGroupResponse, response.data)
+  }
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolNameBackwardCompatibility(version: Short): 

[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-17 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026101492


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2524,196 +2530,208 @@ class KafkaApisTest {
 assertEquals(MemoryRecords.EMPTY, 
FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-val protocols = List(
-  ("first", "first".getBytes()),
-  ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+  .setRebalanceTimeoutMs(1000)
+  .setSessionTimeoutMs(2000)
+
+val requestChannelRequest = buildRequest(new 
JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+val expectedRequestContext = new GroupCoordinatorRequestContext(
+  version,
+  requestChannelRequest.context.clientId,
+  requestChannelRequest.context.clientAddress,
+  RequestLocal.NoCaching.bufferSupplier
 )
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
-val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = 
ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+val expectedJoinGroupRequest = new JoinGroupRequestData()
+  .setGroupId(joinGroupRequest.groupId)
+  .setMemberId(joinGroupRequest.memberId)
+  .setProtocolType(joinGroupRequest.protocolType)
+  .setRebalanceTimeoutMs(if (version >= 1) 
joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+  .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-createKafkaApis().handleJoinGroupRequest(
-  buildRequest(
-new JoinGroupRequest.Builder(
-  new JoinGroupRequestData()
-.setGroupId(groupId)
-.setMemberId(memberId)
-.setProtocolType(protocolType)
-.setRebalanceTimeoutMs(rebalanceTimeoutMs)
-.setSessionTimeoutMs(sessionTimeoutMs)
-.setProtocols(new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-  protocols.map { case (name, protocol) => new 
JoinGroupRequestProtocol()
-.setName(name).setMetadata(protocol)
-  }.iterator.asJava))
-).build()
-  ),
-  RequestLocal.withThreadConfinedCaching)
+val future = new CompletableFuture[JoinGroupResponseData]()
+when(newGroupCoordinator.joinGroup(
+  ArgumentMatchers.eq(expectedRequestContext),
+  ArgumentMatchers.eq(expectedJoinGroupRequest)
+)).thenReturn(future)
 
-verify(groupCoordinator).handleJoinGroup(
-  ArgumentMatchers.eq(groupId),
-  ArgumentMatchers.eq(memberId),
-  ArgumentMatchers.eq(None),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(clientId),
-  ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-  ArgumentMatchers.eq(rebalanceTimeoutMs),
-  ArgumentMatchers.eq(sessionTimeoutMs),
-  ArgumentMatchers.eq(protocolType),
-  capturedProtocols.capture(),
-  any(),
-  any(),
-  any()
+createKafkaApis().handleJoinGroupRequest(
+  requestChannelRequest,
+  RequestLocal.NoCaching
 )
-val capturedProtocolsList = capturedProtocols.getValue
-assertEquals(protocols.size, capturedProtocolsList.size)
-protocols.zip(capturedProtocolsList).foreach { case ((expectedName, 
expectedBytes), (name, bytes)) =>
-  assertEquals(expectedName, name)
-  assertArrayEquals(expectedBytes, bytes)
-}
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-for (version <- ApiKeys.JOIN_GROUP.oldestVersion to 
ApiKeys.JOIN_GROUP.latestVersion) {
-  testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-}
-  }
+val expectedJoinGroupResponse = new JoinGroupResponseData()
+  .setMemberId("member")
+  .setGenerationId(0)
+  .setLeader("leader")
+  .setProtocolType("consumer")
+  .setProtocolName("range")
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-reset(groupCoordinator, clientRequestQuotaManager, requestChannel, 
replicaManager)
+future.complete(expectedJoinGroupResponse)
+val capturedResponse = verifyNoThrottling(requestChannelRequest)
+val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+assertEquals(expectedJoinGroupResponse, response.data)
+  }
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolNameBackwardCompatibility(version: Short): 

[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-17 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026099763


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+new GroupCoordinatorRequestContext(
+  request.context.header.data.requestApiVersion,
+  request.context.header.data.clientId,
+  request.context.clientAddress,
+  requestLocal.bufferSupplier
+)
+  }
 
-// the callback for sending a join-group response
-def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-val responseBody = new JoinGroupResponse(
-  new JoinGroupResponseData()
-.setThrottleTimeMs(requestThrottleMs)
-.setErrorCode(joinResult.error.code)
-.setGenerationId(joinResult.generationId)
-.setProtocolType(joinResult.protocolType.orNull)
-.setProtocolName(joinResult.protocolName.orNull)
-.setLeader(joinResult.leaderId)
-.setSkipAssignment(joinResult.skipAssignment)
-.setMemberId(joinResult.memberId)
-.setMembers(joinResult.members.asJava),
-  request.context.apiVersion
-)
+  def handleJoinGroupRequest(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {

Review Comment:
   1. Yeah, that's right. I return a future to catch any other errors. It could 
be from `sendResponse` but it could also be other issues.
   2. No, it would not because the underlying API uses a callback.
   3. All APIs will be async for this reason but not that most of them are 
already async today. The difference is that we use a future here instead of a 
callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-17 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026098011


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -161,6 +166,12 @@ class KafkaApis(val requestChannel: RequestChannel,
* Top-level method that handles all requests and multiplexes to the right 
api
*/
   override def handle(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
+def handleError(e: Throwable): Unit = {
+  trace(s"Unexpected error handling request ${request.requestDesc(true)} " 
+

Review Comment:
   it is a mistake. reverted to error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14184) Kafka streams application crashes due to "UnsupportedOperationException: this should not happen: timestamp() is not supported in standby tasks."

2022-11-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635680#comment-17635680
 ] 

Matthias J. Sax commented on KAFKA-14184:
-

The stack trace is highly suspicious. Based on the error message and stack 
trace, it seems that a standby task is trying to write into a changelog topic, 
what is fundamentally wrong. A standby task should only read changelogs to 
update RocksDB.

We did see similar error in the past which had a totally different root cause. 
Do you use any suppliers (eg, `ProcessorSupplier` or similar)? If yes, to you 
return a new instance/object on each `get()` call? Returning the same 
instance/object is not allowed and may lead to very weird errors like the one 
you describe.

> Kafka streams application crashes due to "UnsupportedOperationException: this 
> should not happen: timestamp() is not supported in standby tasks."
> 
>
> Key: KAFKA-14184
> URL: https://issues.apache.org/jira/browse/KAFKA-14184
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Suresh Rukmangathan
>Priority: Critical
>
> Kafka streams application is crashing with following stack trace with 3 
> frames from the app removed that are process/state-store related functions.
>  
> {code:java}
> java.lang.UnsupportedOperationException: this should not happen: timestamp() 
> is not supported in standby tasks.\n\n\tat 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.throwUnsupportedOperationExceptionIfStandby(ProcessorContextImpl.java:352)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.timestamp(ProcessorContextImpl.java:328)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.log(ChangeLoggingKeyValueBytesStore.java:136)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:78)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:197)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:197)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120)\n\n\tat
>  // app-calls to process & save to state store - 3 frames 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)\n"
> {code}
>  
> Key Kafka streams application configuration details are as below:-
> {code:java}
> {replication.factor=1, num.standby.replicas=1, topology.optimization=all, 
> max.request.size=1048576, auto.offset.reset=earliest}{code}
>  
> If Kafka streams replication factor = 1 and standby replicas=1, is that an 
> issue? Do we expect that the replication factor should be at least n+1, if 
> standby replicas=1 (or) there is no relationship?
>  
> Couple of more 

[jira] [Commented] (KAFKA-14374) Kafka streams losing messages in State Store during first launch of app

2022-11-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635667#comment-17635667
 ] 

Matthias J. Sax commented on KAFKA-14374:
-

Thanks for creating a ticket. I am not sure right now if I can follow all 
details. Couple of questions:
{quote}we should expect that #table2=#repartition-topic=#state-store
{quote}
What do you mean by this? Do you expect that an input record from partition 0 
(table2 topic) will go to repartition-topic partition 0? But default, this 
won't be guaranteed, because the default hash-partitioning may compute a 
different partition for an integer vs a string.
{quote}which actually is not verified.
{quote}
Not sure if I can follow.
{quote}What we end up with is the following #table2=#repartition-topic, but  
#repartition-topic>#state-store.
{quote}
Not sure what this means. Can you elaborate?
{quote}Please note that there is no insertion in `table2` as we paused the 
connector to verify the cardinality.
{quote}
How do you do the verification?
{quote}So by creating the internal topics before, we avoid the rebalance and we 
end up by #table2=#repartition-topic=#state-store.
{quote}
There will _always_ be a rebalance on first startup...

Your understanding of the rebalance protocol seems not to be correct. The group 
coordinator only "moderates" a rebalance, but it does not compute any 
assignment. One of the consumer inside the group is elected as "group leader" 
and this group leader computes the partition assignment. During assignment 
computation, the group leader also checks if internal topic exists, and creates 
them if necessary. (This topic creations happens before the assignment is 
computed.)
{quote}which triggers the delete of those messages without flushing them to 
changelog topic.
{quote}
What do you mean by "triggers the delete"? Also, why do think that changelog 
writes are not flushed?

> Kafka streams losing messages in State Store during first launch of app
> ---
>
> Key: KAFKA-14374
> URL: https://issues.apache.org/jira/browse/KAFKA-14374
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 3.2.0
>Reporter: Youcef Sebiat
>Priority: Major
> Attachments: Screenshot 2022-11-09 at 14.56.00.png
>
>
> We have been using Kafka Streams to implement a CDC based app. Attached is 
> the sub topology of interest.
> `table2` topic is created by Debezium who is connected to a SQL DB. It 
> contains 26K lines. We take `table2` and create a key which is only a 
> conversion of the key from `string` to `int`. This means that we should 
> expect that #table2=#repartition-topic=#state-store; which actually is not 
> verified. What we end up with is the following #table2=#repartition-topic, 
> but  #repartition-topic>#state-store. We actually lose messages and thus 
> corrupt the state store, which makes the app live in incorrect state. (Please 
> note that there is no insertion in `table2` as we paused the connector to 
> verify the cardinality.)
> The above happens only during the first launch, i.e. the app has never been 
> launched before, so internal topics do not exist yet. Restarts of 
> pre-existing apps do not yield any problems.
> We have:
> 1. Broker on Kafka 3.2.
> 2. Client app on 2.8|3.2 (we tried both and we faced the same issue).
> 2. All parameters are by default except `CACHE_MAX_BYTES_BUFFERING_CONFIG` 
> set to `0` and `NUM_STREAM_THREADS_CONFIG` set to `>1`.
>  
> *What actually worked*
> 1. Use a monothread at first launch: using one thread solves the problem. The 
> #table2=#repartition-topic=#state-store is verified.
> 2. Pre-creating kafka internal topics: we noticed that whenever there is 
> rebalance during the first launch of Kafka Streams app, the state stores 
> ended up missing values. This also happens when you launch multiple pods in 
> K8s for example. When we read through the logs, we noticed that there is a 
> rebalance that is triggered when we first launch the app. This comes from the 
> fact that the internal topics get created and assigned, thus the rebalance. 
> So by creating the internal topics before, we avoid the rebalance and we end 
> up by #table2=#repartition-topic=#state-store.
> *What we noticed from the logs*
> On multi-thread mode, we noticed that it is the partitions that are assigned 
> to the thread chosen by the Coordinator to be the Leader of consumers that 
> suffer the data loss. What we think is happening is the following:
> 1. Consumers threads are launched and inform the coordinator.
> 2. Coordinator assign topics and choses the Leader among the threads.
> 3. App create internal topics.
> 4. Consumers/producers process data. Specifically the Consumer leader 
> consumes from the repartition topic, which triggers the 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-17 Thread GitBox


jeffkbkim commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1025980136


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -161,6 +166,12 @@ class KafkaApis(val requestChannel: RequestChannel,
* Top-level method that handles all requests and multiplexes to the right 
api
*/
   override def handle(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
+def handleError(e: Throwable): Unit = {
+  trace(s"Unexpected error handling request ${request.requestDesc(true)} " 
+

Review Comment:
   it looks like this message was an error level log and this change affects 
all other apis. what's the reason for changing it to trace?



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2524,196 +2530,208 @@ class KafkaApisTest {
 assertEquals(MemoryRecords.EMPTY, 
FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-val protocols = List(
-  ("first", "first".getBytes()),
-  ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+  .setRebalanceTimeoutMs(1000)
+  .setSessionTimeoutMs(2000)
+
+val requestChannelRequest = buildRequest(new 
JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+val expectedRequestContext = new GroupCoordinatorRequestContext(
+  version,
+  requestChannelRequest.context.clientId,
+  requestChannelRequest.context.clientAddress,
+  RequestLocal.NoCaching.bufferSupplier
 )
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
-val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = 
ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+val expectedJoinGroupRequest = new JoinGroupRequestData()
+  .setGroupId(joinGroupRequest.groupId)
+  .setMemberId(joinGroupRequest.memberId)
+  .setProtocolType(joinGroupRequest.protocolType)
+  .setRebalanceTimeoutMs(if (version >= 1) 
joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+  .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-createKafkaApis().handleJoinGroupRequest(
-  buildRequest(
-new JoinGroupRequest.Builder(
-  new JoinGroupRequestData()
-.setGroupId(groupId)
-.setMemberId(memberId)
-.setProtocolType(protocolType)
-.setRebalanceTimeoutMs(rebalanceTimeoutMs)
-.setSessionTimeoutMs(sessionTimeoutMs)
-.setProtocols(new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-  protocols.map { case (name, protocol) => new 
JoinGroupRequestProtocol()
-.setName(name).setMetadata(protocol)
-  }.iterator.asJava))
-).build()
-  ),
-  RequestLocal.withThreadConfinedCaching)
+val future = new CompletableFuture[JoinGroupResponseData]()
+when(newGroupCoordinator.joinGroup(
+  ArgumentMatchers.eq(expectedRequestContext),
+  ArgumentMatchers.eq(expectedJoinGroupRequest)
+)).thenReturn(future)
 
-verify(groupCoordinator).handleJoinGroup(
-  ArgumentMatchers.eq(groupId),
-  ArgumentMatchers.eq(memberId),
-  ArgumentMatchers.eq(None),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(clientId),
-  ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-  ArgumentMatchers.eq(rebalanceTimeoutMs),
-  ArgumentMatchers.eq(sessionTimeoutMs),
-  ArgumentMatchers.eq(protocolType),
-  capturedProtocols.capture(),
-  any(),
-  any(),
-  any()
+createKafkaApis().handleJoinGroupRequest(
+  requestChannelRequest,
+  RequestLocal.NoCaching
 )
-val capturedProtocolsList = capturedProtocols.getValue
-assertEquals(protocols.size, capturedProtocolsList.size)
-protocols.zip(capturedProtocolsList).foreach { case ((expectedName, 
expectedBytes), (name, bytes)) =>
-  assertEquals(expectedName, name)
-  assertArrayEquals(expectedBytes, bytes)
-}
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-for (version <- ApiKeys.JOIN_GROUP.oldestVersion to 
ApiKeys.JOIN_GROUP.latestVersion) {
-  testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-}
-  }
+val expectedJoinGroupResponse = new JoinGroupResponseData()
+  .setMemberId("member")
+  .setGenerationId(0)
+  .setLeader("leader")
+  .setProtocolType("consumer")
+  .setProtocolName("range")
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-reset(groupCoordinator, 

[GitHub] [kafka] showuon merged pull request #12868: MINOR: fix syntax typo

2022-11-17 Thread GitBox


showuon merged PR #12868:
URL: https://github.com/apache/kafka/pull/12868


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji opened a new pull request, #12873: WIP: Idempotent producer simulation

2022-11-17 Thread GitBox


hachikuji opened a new pull request, #12873:
URL: https://github.com/apache/kafka/pull/12873

   As many have noticed, the idempotent producer implementation is complex. 
This patch attempts to introduce a new simulation-based testing methodology 
which which enables testing of more complex scenarios, which are randomly 
generated. We did something similar for the raft implementation.
   
   I'm still working through some of the mechanics, polishing the code, and 
trying out new scenarios. But it works well enough that it has already begun to 
yield some interesting results, such as 
https://issues.apache.org/jira/browse/KAFKA-14397.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #12869: KAFKA-14382: wait for current rebalance to complete before triggering followup

2022-11-17 Thread GitBox


ableegoldman commented on code in PR #12869:
URL: https://github.com/apache/kafka/pull/12869#discussion_r1025891177


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -611,9 +616,11 @@ boolean runLoop() {
 cacheResizer.accept(size);
 }
 runOnce();
-if (nextProbingRebalanceMs.get() < time.milliseconds()) {
+
+// Check for a scheduled rebalance but don't trigger it until 
the current rebalance is done
+if (!taskManager.rebalanceInProgress() && 
nextProbingRebalanceMs.get() < time.milliseconds()) {

Review Comment:
   Yes, but in those cases we should actually not have triggered the rebalance 
anyway -- this should always reflect the latest status of whether a rebalance 
is actually needed.
   
   And yeah, I'll remove all the thread id stuff but since  there's not much 
other junk left once that's removed, I would rather keep the minor renaming of 
`isRebalanceInProgress` if that's ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #12869: KAFKA-14382: wait for current rebalance to complete before triggering followup

2022-11-17 Thread GitBox


ableegoldman commented on code in PR #12869:
URL: https://github.com/apache/kafka/pull/12869#discussion_r1025887660


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -298,6 +298,7 @@ public boolean isRunning() {
 private volatile ThreadMetadata threadMetadata;
 private StreamThread.StateListener stateListener;
 private final Optional getGroupInstanceID;
+private final String threadIdSuffix; // shortened version of the threadId: 
{processUUID}-StreamThread-{threadIdx}

Review Comment:
   Ah shoot, good point :/ This was my attempt at a compromise between staying 
relatively short, and containing enough info to actually be useful. Maybe I'm 
being overly paranoid, but we have twice had to revert the commit which added 
the `reason` message to `#enforceRebalance` due to perf issues related to 
strings/string length -- and I've seen some VERY long application/client ids.
   
   I'm not sure how long is too long, but if you look into the underlying 
consumer/coordinator methods, everywhere else we pass in a "full reason" for 
logging plus a "short reason" for actually embedding in the request, due to 
said perf regression with longer strings. 
   
   But maybe we don't actually need the client id here after all? I'm not sure 
how this is ultimately used but I would hope the broker would log the client id 
for a given `reason` string in the JoinGroup request..I'm just going to remove 
this part for now and if we do find it would be useful, I can do a separate PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12295: KAFKA-13586: Prevent exception thrown during connector update from crashing distributed herder

2022-11-17 Thread GitBox


C0urante commented on PR #12295:
URL: https://github.com/apache/kafka/pull/12295#issuecomment-1319344550

   @dstelljes Good news! Now that we've eliminated the need to mock the static 
APIs for the `RestClient` and `Plugins` classes, we don't have to worry about 
the issues posed by them in the past.
   
   I've rebased locally and, because the merge conflicts were extremely 
minimal, pushed the change to your branch. If this passes CI, I'll merge 
sometime tomorrow.
   
   Thanks again for your help and patience with this fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] soarez opened a new pull request, #12872: KAFKA-14303 Producer.send without record key and batch.size=0 goes into infinite loop

2022-11-17 Thread GitBox


soarez opened a new pull request, #12872:
URL: https://github.com/apache/kafka/pull/12872

   Cherry-picked and slightly modified commit 5bd556a49b. The change was made 
in line in  
`core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala: 61` 
to remove the `threadMode` argument, unsupported in junit-jupiter-api:5.8.2.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #12866: KAFKA-14346: Remove hard-to-mock javax.crypto calls

2022-11-17 Thread GitBox


C0urante merged PR #12866:
URL: https://github.com/apache/kafka/pull/12866


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12866: KAFKA-14346: Remove hard-to-mock javax.crypto calls

2022-11-17 Thread GitBox


C0urante commented on PR #12866:
URL: https://github.com/apache/kafka/pull/12866#issuecomment-1319327985

   Test failures are unrelated; merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-17 Thread GitBox


C0urante merged PR #12828:
URL: https://github.com/apache/kafka/pull/12828


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-17 Thread GitBox


C0urante commented on PR #12828:
URL: https://github.com/apache/kafka/pull/12828#issuecomment-1319311409

   Test failures are unrelated; merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #12752: KAFKA-14303 Fix batch.size=0 in BuiltInPartitioner

2022-11-17 Thread GitBox


junrao commented on PR #12752:
URL: https://github.com/apache/kafka/pull/12752#issuecomment-1319300532

   @soarez : I tried to cherry-pick the PR to the 3.3. branch, but got a 
compilation error. Do you think you could submit a separate PR for 3.3? Thanks.
   
   ```
   [Error] 
/Users/jun/intellij/kafka/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala:61:60:
 unknown annotation argument name: threadMode
   one error found
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao merged pull request #12752: KAFKA-14303 Fix batch.size=0 in BuiltInPartitioner

2022-11-17 Thread GitBox


junrao merged PR #12752:
URL: https://github.com/apache/kafka/pull/12752


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14372) RackAwareReplicaSelector should choose a replica from the isr

2022-11-17 Thread Jeff Kim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim updated KAFKA-14372:
-
Description: 
The default replica selector chooses a replica solely on whether the 
broker.rack matches the client.rack in the fetch request (and whether the 
offset exists in the follower). Here's a scenario where the consumer would not 
be able to fetch:
 # Cluster is undergoing a rolling upgrade and the follower is shutting down 
while the consumer is fetching from follower.  
 # The connection will gracefully shutdown and the consumer will receive an 
error but it will still consider this follower as the preferred read replica
 # At the next metadata.max.age.ms (5min default) interval, the follower will 
no longer be in the client's metadata so the consumer will reach out to the 
leader.
 # The leader will redirect the fetch request to the follower since the offline 
follower is still part of the replicas set, and no progress is made.

Choosing a replica from the ISR will allow the consumer to make progress at 4.

  was:
The default replica selector chooses a replica solely on whether the 
broker.rack matches the client.rack in the fetch request (and whether the 
offset exists in the follower). Here's a scenario where the consumer would not 
be able to fetch:
 # Cluster is undergoing a rolling upgrade and the follower is shutting down 
while the consumer is fetching from follower.  
 # The connection will gracefully shutdown and the consumer will receive an 
error but it will still consider this follower as the preferred read replica
 # At the next metadata.max.age.ms (5min default) interval, the follower will 
no longer be in the client's metadata so the consumer will reach out to the 
leader.
 # The leader will redirect the fetch request to the follower, and no progress 
is made.

Choosing a replica from the ISR will allow the consumer to make progress at 4.


> RackAwareReplicaSelector should choose a replica from the isr
> -
>
> Key: KAFKA-14372
> URL: https://issues.apache.org/jira/browse/KAFKA-14372
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
>
> The default replica selector chooses a replica solely on whether the 
> broker.rack matches the client.rack in the fetch request (and whether the 
> offset exists in the follower). Here's a scenario where the consumer would 
> not be able to fetch:
>  # Cluster is undergoing a rolling upgrade and the follower is shutting down 
> while the consumer is fetching from follower.  
>  # The connection will gracefully shutdown and the consumer will receive an 
> error but it will still consider this follower as the preferred read replica
>  # At the next metadata.max.age.ms (5min default) interval, the follower will 
> no longer be in the client's metadata so the consumer will reach out to the 
> leader.
>  # The leader will redirect the fetch request to the follower since the 
> offline follower is still part of the replicas set, and no progress is made.
> Choosing a replica from the ISR will allow the consumer to make progress at 4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14354) Add 'isDeleted' parameter when stopping a Connector

2022-11-17 Thread Hector Geraldino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hector Geraldino updated KAFKA-14354:
-
Description: It would be useful for Connectors to know when its instance is 
being deleted. This will give a chance to connectors to perform any cleanup 
tasks (e.g. deleting external resources, or deleting offsets) before the 
connector is completely removed from the cluster.  (was: It would be useful to 
have a callback method added to the Connector API, so connectors extending the 
SourceConnector and SinkConnector classes can be notified when their connector 
instance is being deleted. This will give a chance to connectors to perform any 
cleanup tasks (e.g. deleting external resources, or deleting offsets) before 
the connector is completely removed from the cluster.)

> Add 'isDeleted' parameter when stopping a Connector
> ---
>
> Key: KAFKA-14354
> URL: https://issues.apache.org/jira/browse/KAFKA-14354
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>
> It would be useful for Connectors to know when its instance is being deleted. 
> This will give a chance to connectors to perform any cleanup tasks (e.g. 
> deleting external resources, or deleting offsets) before the connector is 
> completely removed from the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14354) Add 'isDeleted' parameter when stopping a Connector

2022-11-17 Thread Hector Geraldino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hector Geraldino updated KAFKA-14354:
-
Summary: Add 'isDeleted' parameter when stopping a Connector  (was: Add 
'destroyed()' callback method to Connector API)

> Add 'isDeleted' parameter when stopping a Connector
> ---
>
> Key: KAFKA-14354
> URL: https://issues.apache.org/jira/browse/KAFKA-14354
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>
> It would be useful to have a callback method added to the Connector API, so 
> connectors extending the SourceConnector and SinkConnector classes can be 
> notified when their connector instance is being deleted. This will give a 
> chance to connectors to perform any cleanup tasks (e.g. deleting external 
> resources, or deleting offsets) before the connector is completely removed 
> from the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14372) RackAwareReplicaSelector should choose a replica from the isr

2022-11-17 Thread Jeff Kim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim updated KAFKA-14372:
-
Description: 
The default replica selector chooses a replica solely on whether the 
broker.rack matches the client.rack in the fetch request (and whether the 
offset exists in the follower). Here's a scenario where the consumer would not 
be able to fetch:
 # Cluster is undergoing a rolling upgrade and the follower is shutting down 
while the consumer is fetching from follower.  
 # The connection will gracefully shutdown and the consumer will receive an 
error but it will still consider this follower as the preferred read replica
 # At the next metadata.max.age.ms (5min default) interval, the follower will 
no longer be in the client's metadata so the consumer will reach out to the 
leader.
 # The leader will redirect the fetch request to the follower, and no progress 
is made.

Choosing a replica from the ISR will allow the consumer to make progress at 4.

  was:The default replica selector chooses a replica solely on whether the 
broker.rack matches the client.rack in the fetch request (and whether the 
offset exists in the follower). Even if the broker matching the rack is 
unavailable, the consumer will go to that broker. The selector should choose a 
broker from the isr.


> RackAwareReplicaSelector should choose a replica from the isr
> -
>
> Key: KAFKA-14372
> URL: https://issues.apache.org/jira/browse/KAFKA-14372
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
>
> The default replica selector chooses a replica solely on whether the 
> broker.rack matches the client.rack in the fetch request (and whether the 
> offset exists in the follower). Here's a scenario where the consumer would 
> not be able to fetch:
>  # Cluster is undergoing a rolling upgrade and the follower is shutting down 
> while the consumer is fetching from follower.  
>  # The connection will gracefully shutdown and the consumer will receive an 
> error but it will still consider this follower as the preferred read replica
>  # At the next metadata.max.age.ms (5min default) interval, the follower will 
> no longer be in the client's metadata so the consumer will reach out to the 
> leader.
>  # The leader will redirect the fetch request to the follower, and no 
> progress is made.
> Choosing a replica from the ISR will allow the consumer to make progress at 4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #12800: KAFKA-14342: KafkaOffsetBackingStore should clear offsets for source partitions on tombstone messages

2022-11-17 Thread GitBox


C0urante commented on code in PR #12800:
URL: https://github.com/apache/kafka/pull/12800#discussion_r1025529528


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##
@@ -325,11 +325,12 @@ public Future set(final Map 
values, final Callback
 return producerCallback;
 }
 
-protected final Callback> consumedCallback 
= new Callback>() {
-@Override
-public void onCompletion(Throwable error, ConsumerRecord record) {
-ByteBuffer key = record.key() != null ? 
ByteBuffer.wrap(record.key()) : null;
-ByteBuffer value = record.value() != null ? 
ByteBuffer.wrap(record.value()) : null;
+protected final Callback> consumedCallback 
= (error, record) -> {
+ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) 
: null;
+ByteBuffer value = record.value() != null ? 
ByteBuffer.wrap(record.value()) : null;
+if (value == null) {
+data.remove(key);

Review Comment:
   It's a little strange to keep the `value` initializer the way it was before. 
IMO this would be more readable:
   ```java
   if (record.value() == null)
   data.remove(key);
   else
   data.put(key, ByteBuffer.wrap(record.value()));
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12800: KAFKA-14342: KafkaOffsetBackingStore should clear offsets for source partitions on tombstone messages

2022-11-17 Thread GitBox


C0urante commented on code in PR #12800:
URL: https://github.com/apache/kafka/pull/12800#discussion_r1025541314


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##
@@ -329,6 +329,60 @@ public void testGetSetNull() throws Exception {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testTombstoneOffset() throws Exception {
+expectConfigure();
+expectStart(Collections.singletonList(new ConsumerRecord<>(TOPIC, 0, 
0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
+new RecordHeaders(), Optional.empty(;
+
+Capture producerCallback = 
EasyMock.newCapture();
+storeLog.send(EasyMock.aryEq(TP0_KEY.array()), 
EasyMock.isNull(byte[].class), EasyMock.capture(producerCallback));
+PowerMock.expectLastCall();
+
+final Capture> readToEndCallback = 
EasyMock.newCapture();
+storeLog.readToEnd(EasyMock.capture(readToEndCallback));
+PowerMock.expectLastCall().andAnswer(() -> {
+capturedConsumedCallback.getValue().onCompletion(null,
+new ConsumerRecord<>(TOPIC, 1, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), null,
+new RecordHeaders(), Optional.empty()));
+readToEndCallback.getValue().onCompletion(null, null);
+return null;
+});
+
+expectStop();
+expectClusterId();
+
+PowerMock.replayAll();
+
+store.configure(DEFAULT_DISTRIBUTED_CONFIG);
+store.start();
+
+// Write tombstone offset
+Map toSet = new HashMap<>();
+toSet.put(TP0_KEY, null);
+
+final AtomicBoolean invoked = new AtomicBoolean(false);
+Future setFuture = store.set(toSet, (error, result) -> 
invoked.set(true));
+assertFalse(setFuture.isDone());
+producerCallback.getValue().onCompletion(null, null);
+setFuture.get(1, TimeUnit.MILLISECONDS);
+assertTrue(invoked.get());
+
+// Getting data should read to end of our published data and return it
+Map offsets = 
store.get(Collections.singletonList(TP0_KEY)).get(1, TimeUnit.MILLISECONDS);
+assertNull(offsets.get(TP0_KEY));
+
+// Just verifying that KafkaOffsetBackingStore::get returns null isn't 
enough, we also need to verify that the mapping for the source partition key is 
removed.
+// This is because KafkaOffsetBackingStore::get returns null if either 
there is no existing offset for the source partition or if there is an offset 
with null value.
+// We need to make sure that tombstoned offsets are removed completely 
(i.e. that the mapping for the corresponding source partition is removed).
+HashMap data = 
Whitebox.getInternalState(store, "data");
+assertFalse(data.containsKey(TP0_KEY));

Review Comment:
   This is going to have to be rewritten to account for the recent switch to 
Mockito in this test class.
   
   If it helps, though, I don't think we need an entire new test case for this 
PR. We can probably just add this bit to the end of the existing 
`testGetSetNull` test case (with whatever comments you feel are appropriate 
explaining the necessity for the assertion):
   
   ```java
   assertFalse(store.data.containsKey(TP0_KEY));
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##
@@ -325,11 +325,12 @@ public Future set(final Map 
values, final Callback
 return producerCallback;
 }
 
-protected final Callback> consumedCallback 
= new Callback>() {
-@Override
-public void onCompletion(Throwable error, ConsumerRecord record) {
-ByteBuffer key = record.key() != null ? 
ByteBuffer.wrap(record.key()) : null;
-ByteBuffer value = record.value() != null ? 
ByteBuffer.wrap(record.value()) : null;
+protected final Callback> consumedCallback 
= (error, record) -> {
+ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) 
: null;

Review Comment:
   I know this isn't a new bug introduced by this change, but we should 
probably add error-handling logic here:
   ```suggestion
   if (error != null) {
   log.error("Failed to read from offsets topic", error);
   return;
   }
   ByteBuffer key = record.key() != null ? 
ByteBuffer.wrap(record.key()) : null;
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##
@@ -325,11 +325,12 @@ public Future set(final Map 
values, final Callback
 return producerCallback;
 }
 
-protected final Callback> consumedCallback 
= new Callback>() {
-@Override
-public void onCompletion(Throwable error, ConsumerRecord record) {
-ByteBuffer key = record.key() != null ? 
ByteBuffer.wrap(record.key()) : 

[jira] [Assigned] (KAFKA-14396) Flaky memory leak tests rely on System.gc for correctness

2022-11-17 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-14396:
---

Assignee: Greg Harris

> Flaky memory leak tests rely on System.gc for correctness
> -
>
> Key: KAFKA-14396
> URL: https://issues.apache.org/jira/browse/KAFKA-14396
> Project: Kafka
>  Issue Type: Test
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>  Labels: flaky-test
>
> There are a few tests which currently call System.gc to help verify that code 
> running during a test does not leak memory. These tests are:
> * 
> org.apache.kafka.common.memory.GarbageCollectedMemoryPoolTest#testBuffersGarbageCollected
> * 
> org.apache.kafka.common.record.MemoryRecordsBuilderTest#testBuffersDereferencedOnClose
> * 
> org.apache.kafka.streams.state.internals.ThreadCacheTest#cacheOverheadsSmallValues
> * 
> org.apache.kafka.streams.state.internals.ThreadCacheTest#cacheOverheadsLargeValues
> Unfortunately the System.gc call is only an advisory call to the JVM, as 
> documented:
> > When control returns from the method call, the Java Virtual Machine has 
> > made a best effort to reclaim space from all discarded objects.
> This means that the System.gc call may not have performed any garbage 
> collection at all, and so tests which expect garbage collection to have 
> happened will not always succeed. For example, a no-op is an implementation 
> of the System.gc method which would fulfill the method contract.
> To reproduce this class of failures:
> 1. Comment out the System.gc calls
> 2. Run the test
> We should try to find an alternative method of verifying that these 
> components do not have memory leaks that does not rely on the 
> implementation-specific behavior of the containing JVM runtime. For example, 
> verifying that buffers have been closed may be a proxy for the literal memory 
> references being released and garbage collected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #12866: KAFKA-14346: Remove hard-to-mock javax.crypto calls

2022-11-17 Thread GitBox


C0urante commented on code in PR #12866:
URL: https://github.com/apache/kafka/pull/12866#discussion_r1025503951


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/CryptoLibrary.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;

Review Comment:
   Ahh, thanks for the explanation. I guess we can leave this as-is for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12847: KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` interface

2022-11-17 Thread GitBox


dajac commented on code in PR #12847:
URL: https://github.com/apache/kafka/pull/12847#discussion_r1025502783


##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##
@@ -135,4 +133,71 @@ class GroupCoordinatorAdapterTest {
 assertEquals(expectedData, future.get())
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
+  def testSyncGroup(version: Short): Unit = {
+val groupCoordinator = mock(classOf[GroupCoordinator])
+val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+val ctx = makeContext(version)
+val data = new SyncGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member1")
+  .setGroupInstanceId("instance")
+  .setProtocolType("consumer")
+  .setProtocolName("range")
+  .setGenerationId(10)
+  .setAssignments(List(
+new SyncGroupRequestData.SyncGroupRequestAssignment()
+  .setMemberId("member1")
+  .setAssignment("member1".getBytes()),
+new SyncGroupRequestData.SyncGroupRequestAssignment()
+  .setMemberId("member2")
+  .setAssignment("member2".getBytes())
+  ).asJava)
+
+val future = adapter.syncGroup(ctx, data)
+assertFalse(future.isDone)
+
+val capturedAssignment: ArgumentCaptor[Map[String, Array[Byte]]] =
+  ArgumentCaptor.forClass(classOf[Map[String, Array[Byte]]])
+val capturedCallback: ArgumentCaptor[SyncGroupCallback] =
+  ArgumentCaptor.forClass(classOf[SyncGroupCallback])
+
+verify(groupCoordinator).handleSyncGroup(
+  ArgumentMatchers.eq(data.groupId),
+  ArgumentMatchers.eq(data.generationId),
+  ArgumentMatchers.eq(data.memberId),
+  ArgumentMatchers.eq(Some(data.protocolType)),
+  ArgumentMatchers.eq(Some(data.protocolName)),
+  ArgumentMatchers.eq(Some(data.groupInstanceId)),
+  capturedAssignment.capture(),
+  capturedCallback.capture(),
+  ArgumentMatchers.eq(RequestLocal(ctx.bufferSupplier))
+)
+
+assertEquals(Map(
+  "member1" -> "member1",
+  "member2" -> "member2",
+), capturedAssignment.getValue.map { case (member, metadata) =>
+  (member, new String(metadata))
+}.toMap)
+
+capturedCallback.getValue.apply(SyncGroupResult(
+  error = Errors.NONE,
+  protocolType = Some("consumer"),
+  protocolName = Some("range"),
+  memberAssignment = "member1".getBytes()
+))
+
+val expectedResponseData = new SyncGroupResponseData()
+  .setErrorCode(Errors.NONE.code)
+  .setProtocolType("consumer")
+  .setProtocolName("range")

Review Comment:
   It is not. In this case, I felt like defining variables for all of them was 
not helping much.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-17 Thread GitBox


C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1025489778


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {

Review Comment:
   Sounds good. This shouldn't be any less performant than the current `trunk` 
and we can always revisit later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12803: KAFKA-13602: Adding ability to multicast records.

2022-11-17 Thread GitBox


vamossagar12 commented on code in PR #12803:
URL: https://github.com/apache/kafka/pull/12803#discussion_r1025489083


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##
@@ -459,12 +461,23 @@ private List 
rebuildMetadataForSingleTopology(final Map>, Integer> getPartition = 
maybeMulticastPartitions -> {
+if (!maybeMulticastPartitions.isPresent()) {
+return null;
+}
+if (maybeMulticastPartitions.get().size() != 1) {
+throw new IllegalArgumentException("The partitions returned by 
StreamPartitioner#partitions method when used for fetching KeyQueryMetadata for 
key should be a singleton set");

Review Comment:
   Thanks for the suggestion. Hmm I had assumed all along that even IQ would be 
based off of a single partition. Infact we had discussed it during the KIP 
design and this was a suggestion from Matthias:
   
   ```
   Btw: The `StreamPartitioner` interface is also used for IQ. For both IQ
   and FK-join, it seem ok to just add a runtime check that the returned
   list is a singleton (in case we don't add a new class)?
   ```
   
   But I can see a mismatch in the partitioners used. IIUC, the 
`StreamPartitioner` used here is to be used for finding the partitioner for the 
key and I am trying to enforce a single partition check here. But, I don't 
think I added any such conditions at the time of writing the keys. So, that's 
something which clearly can causes confusion and even yield wrong results and 
looks like that's what you are also pointing to.  So, would need to enhance the 
`KeyQueryMetadata` class. And yeah, deprecating `partition()` method can be 
avoided at this point.
   
   On a different note, do you think a similar condition can happen even for FK 
joins?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #12866: KAFKA-14346: Remove hard-to-mock javax.crypto calls

2022-11-17 Thread GitBox


gharris1727 commented on code in PR #12866:
URL: https://github.com/apache/kafka/pull/12866#discussion_r1025479365


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/CryptoLibrary.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;

Review Comment:
   I looked into that, and it required adding a new import-control statement 
for that package.
   There is a SecurityUtils class in the `common.utils` package, but it doesn't 
reference the javax.crypto package, and would need a new import-control.
   There is also a `common.security.ssl` package which already references 
javax.crypto, and would not need a new import-control.
   Do you have a preference between these two?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-11-17 Thread GitBox


dajac commented on PR #12870:
URL: https://github.com/apache/kafka/pull/12870#issuecomment-1318954178

   I have to add a few unit tests in KafkaApisTest. There are none at the 
moment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14398) Update EndToEndAuthorizerTest.scala to test with ZK and KRAFT quorum servers

2022-11-17 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-14398:
--
Fix Version/s: 3.4.0

> Update EndToEndAuthorizerTest.scala to test with ZK and KRAFT quorum servers
> 
>
> Key: KAFKA-14398
> URL: https://issues.apache.org/jira/browse/KAFKA-14398
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft, unit tests
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
> Fix For: 3.4.0
>
>
> KRAFT is a replacement for ZK for storing metadata.
> We should validate that ACLs work with KRAFT for the supported authentication 
> mechanizms. 
> I will update EndToEndAuthorizerTest.scala to test with ZK and KRAFT.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14375) Remove use of "authorizer-properties" in EndToEndAuthorizationTest.scala

2022-11-17 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-14375:
--
Fix Version/s: 3.4.0

> Remove use of "authorizer-properties" in EndToEndAuthorizationTest.scala
> 
>
> Key: KAFKA-14375
> URL: https://issues.apache.org/jira/browse/KAFKA-14375
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
> Fix For: 3.4.0
>
>
> The use of {{authorizer-properties}} in AclCommand is deprecated and 
> EndToEndAuthroiztionTest.scala should be updated to not use it. 
> I will instead set {{kafkaPrincipal}} as a super user and set up the brokers 
> with AclAuthorzier. This will allow {{kafkaPrincipal}} to set ACLs and 
> clientPrincipal to validate them as per the tests.
> This update is a precursor to updating  EndToEndAuthroiztionTest.scala to run 
> in KRAFT mode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14375) Remove use of "authorizer-properties" in EndToEndAuthorizationTest.scala

2022-11-17 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano resolved KAFKA-14375.
---
Resolution: Fixed

> Remove use of "authorizer-properties" in EndToEndAuthorizationTest.scala
> 
>
> Key: KAFKA-14375
> URL: https://issues.apache.org/jira/browse/KAFKA-14375
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
>
> The use of {{authorizer-properties}} in AclCommand is deprecated and 
> EndToEndAuthroiztionTest.scala should be updated to not use it. 
> I will instead set {{kafkaPrincipal}} as a super user and set up the brokers 
> with AclAuthorzier. This will allow {{kafkaPrincipal}} to set ACLs and 
> clientPrincipal to validate them as per the tests.
> This update is a precursor to updating  EndToEndAuthroiztionTest.scala to run 
> in KRAFT mode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-17 Thread GitBox


gharris1727 commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1025464638


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {

Review Comment:
   I wasn't able to verify the thread-safety of the HttpClient in all versions 
of Jetty, so I think i'll be leaving the existing pattern of 
one-restclient-per-request in place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14398) Update EndToEndAuthorizerTest.scala to test with ZK and KRAFT quorum servers

2022-11-17 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano reassigned KAFKA-14398:
-

Assignee: Proven Provenzano

> Update EndToEndAuthorizerTest.scala to test with ZK and KRAFT quorum servers
> 
>
> Key: KAFKA-14398
> URL: https://issues.apache.org/jira/browse/KAFKA-14398
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft, unit tests
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
>
> KRAFT is a replacement for ZK for storing metadata.
> We should validate that ACLs work with KRAFT for the supported authentication 
> mechanizms. 
> I will update EndToEndAuthorizerTest.scala to test with ZK and KRAFT.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14398) Update EndToEndAuthorizerTest.scala to test with ZK and KRAFT quorum servers

2022-11-17 Thread Proven Provenzano (Jira)
Proven Provenzano created KAFKA-14398:
-

 Summary: Update EndToEndAuthorizerTest.scala to test with ZK and 
KRAFT quorum servers
 Key: KAFKA-14398
 URL: https://issues.apache.org/jira/browse/KAFKA-14398
 Project: Kafka
  Issue Type: Improvement
  Components: kraft, unit tests
Reporter: Proven Provenzano


KRAFT is a replacement for ZK for storing metadata.

We should validate that ACLs work with KRAFT for the supported authentication 
mechanizms. 

I will update EndToEndAuthorizerTest.scala to test with ZK and KRAFT.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah merged pull request #12860: Add RPC changes, records, and config from KIP-866

2022-11-17 Thread GitBox


mumrah merged PR #12860:
URL: https://github.com/apache/kafka/pull/12860


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] omkreddy merged pull request #12843: KAFKA-14375: Remove use of "authorizer-properties" from EndToEndAuthorizerTest

2022-11-17 Thread GitBox


omkreddy merged PR #12843:
URL: https://github.com/apache/kafka/pull/12843


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12866: KAFKA-14346: Remove hard-to-mock javax.crypto calls

2022-11-17 Thread GitBox


C0urante commented on code in PR #12866:
URL: https://github.com/apache/kafka/pull/12866#discussion_r1025383865


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/CryptoLibrary.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.Mac;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * An interface to allow the dependency injection of {@link Mac} and {@link 
KeyGenerator} instances for testing.
+ *
+ * Implementations of this class should be thread-safe.
+ */
+public interface CryptoLibrary {
+
+CryptoLibrary SYSTEM = new SystemLibrary();
+
+Mac getMacInstance(String algorithm) throws NoSuchAlgorithmException;

Review Comment:
   I see the symmetry between this name and `Mac::getInstance` but IMO it's 
clear enough to just call this `macInstance` or even just `mac`.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java:
##
@@ -60,12 +63,14 @@ public static void addToRequest(SecretKey key, byte[] 
requestBody, String signat
 
 /**
  * Extract a signature from a request.
- * @param requestBody the body of the request; may not be null
- * @param headers the headers for the request; may be null
+ *
+ * @param cryptoLibrary

Review Comment:
   ```suggestion
* @param cryptoLibrary the cryptography library to use to generate 
{@link Mac Mac instances};
*   may not be null
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/CryptoLibrary.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.Mac;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * An interface to allow the dependency injection of {@link Mac} and {@link 
KeyGenerator} instances for testing.
+ *
+ * Implementations of this class should be thread-safe.
+ */
+public interface CryptoLibrary {

Review Comment:
   I think `Crypto` is fine here, and it aligns with the brevity of similar 
classes like `Exit`, `Java`, `Time`, etc.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/CryptoLibrary.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.Mac;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * An 

[GitHub] [kafka] mumrah commented on a diff in pull request #12815: KIP-866 Part 1

2022-11-17 Thread GitBox


mumrah commented on code in PR #12815:
URL: https://github.com/apache/kafka/pull/12815#discussion_r1025397641


##
core/src/main/scala/kafka/migration/ZkMigrationClient.scala:
##
@@ -0,0 +1,359 @@
+package kafka.migration
+
+import kafka.api.LeaderAndIsr
+import kafka.cluster.{Broker, EndPoint}
+import kafka.controller.{ControllerChannelManager, 
LeaderIsrAndControllerEpoch, ReplicaAssignment}
+import kafka.migration.ZkMigrationClient.brokerToBrokerRegistration
+import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager}
+import kafka.utils.Logging
+import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk._
+import kafka.zookeeper._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.metadata._
+import org.apache.kafka.common.requests.{AbstractControlRequest, 
AbstractResponse}
+import org.apache.kafka.common.{Endpoint, TopicPartition, Uuid}
+import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, 
VersionRange}
+import org.apache.kafka.migration._
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.zookeeper.CreateMode
+
+import java.util
+import java.util.function.Consumer
+import java.util.{Collections, Optional}
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters._
+
+object ZkMigrationClient {
+  def brokerToBrokerRegistration(broker: Broker, epoch: Long): 
ZkBrokerRegistration = {
+  val registration = new BrokerRegistration(broker.id, epoch, 
Uuid.ZERO_UUID,
+Collections.emptyList[Endpoint], Collections.emptyMap[String, 
VersionRange],
+Optional.empty(), false, false)
+  new ZkBrokerRegistration(registration, null, null, false)
+  }
+}
+
+class ZkMigrationClient(zkClient: KafkaZkClient,
+controllerChannelManager: ControllerChannelManager) 
extends MigrationClient with Logging {
+
+  def claimControllerLeadership(kraftControllerId: Int, kraftControllerEpoch: 
Int): ZkControllerState = {
+val epochZkVersionOpt = 
zkClient.tryRegisterKRaftControllerAsActiveController(kraftControllerId, 
kraftControllerEpoch)
+if (epochZkVersionOpt.isDefined) {
+  new ZkControllerState(kraftControllerId, kraftControllerEpoch, 
epochZkVersionOpt.get)
+} else {
+  throw new ControllerMovedException("Cannot claim controller leadership, 
the controller has moved.")
+}
+  }
+
+  def migrateTopics(metadataVersion: MetadataVersion,
+recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
+brokerIdConsumer: Consumer[Integer]): Unit = {
+val topics = zkClient.getAllTopicsInCluster()
+val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
+val replicaAssignmentAndTopicIds = 
zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
+replicaAssignmentAndTopicIds.foreach { case 
TopicIdReplicaAssignment(topic, topicIdOpt, assignments) =>
+  val partitions = assignments.keys.toSeq
+  val leaderIsrAndControllerEpochs = 
zkClient.getTopicPartitionStates(partitions)
+  val topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+  topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+.setName(topic)
+.setTopicId(topicIdOpt.get), TopicRecord.HIGHEST_SUPPORTED_VERSION))
+
+  assignments.foreach { case (topicPartition, replicaAssignment) =>
+replicaAssignment.replicas.foreach(brokerIdConsumer.accept(_))
+replicaAssignment.addingReplicas.foreach(brokerIdConsumer.accept(_))
+
+val leaderIsrAndEpoch = leaderIsrAndControllerEpochs(topicPartition)
+topicBatch.add(new ApiMessageAndVersion(new PartitionRecord()
+  .setTopicId(topicIdOpt.get)
+  .setPartitionId(topicPartition.partition)
+  .setReplicas(replicaAssignment.replicas.map(Integer.valueOf).asJava)
+  
.setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava)
+  
.setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava)
+  
.setIsr(leaderIsrAndEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setLeader(leaderIsrAndEpoch.leaderAndIsr.leader)
+  .setLeaderEpoch(leaderIsrAndEpoch.leaderAndIsr.leaderEpoch)
+  .setPartitionEpoch(leaderIsrAndEpoch.leaderAndIsr.partitionEpoch)
+  
.setLeaderRecoveryState(leaderIsrAndEpoch.leaderAndIsr.leaderRecoveryState.value()),
 PartitionRecord.HIGHEST_SUPPORTED_VERSION))
+  }
+
+  val props = topicConfigs(topic)
+  props.forEach { case (key: Object, value: Object) =>
+topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
+  .setResourceType(ConfigResource.Type.TOPIC.id)
+  .setResourceName(topic)
+  .setName(key.toString)
+  

[GitHub] [kafka] vvcephei commented on a diff in pull request #12869: KAFKA-14382: wait for current rebalance to complete before triggering followup

2022-11-17 Thread GitBox


vvcephei commented on code in PR #12869:
URL: https://github.com/apache/kafka/pull/12869#discussion_r1025368358


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -298,6 +298,7 @@ public boolean isRunning() {
 private volatile ThreadMetadata threadMetadata;
 private StreamThread.StateListener stateListener;
 private final Optional getGroupInstanceID;
+private final String threadIdSuffix; // shortened version of the threadId: 
{processUUID}-StreamThread-{threadIdx}

Review Comment:
   This isn't exactly a suffix. If users supply their own client id, we just 
use it directly:
   ```
   final String userClientId = 
applicationConfigs.getString(StreamsConfig.CLIENT_ID_CONFIG);
   final String applicationId = 
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG);
   if (userClientId.length() <= 0) {
   clientId = applicationId + "-" + processId;
   } else {
   clientId = userClientId;
   }
   ```
   
   It seems like what you're really after is either a thread id that's 
independent of the application id (in which case, why?) or a thread id that's 
independent of the user-configured client id (also, why?)



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -611,9 +616,11 @@ boolean runLoop() {
 cacheResizer.accept(size);
 }
 runOnce();
-if (nextProbingRebalanceMs.get() < time.milliseconds()) {
+
+// Check for a scheduled rebalance but don't trigger it until 
the current rebalance is done
+if (!taskManager.rebalanceInProgress() && 
nextProbingRebalanceMs.get() < time.milliseconds()) {

Review Comment:
   I'm sure you thought of it, but just to make sure, is there any circumstance 
where the "nextProbingRebalance" could get cleared by the time we reach 
"rebalanceInProgress == false"?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -611,9 +616,11 @@ boolean runLoop() {
 cacheResizer.accept(size);
 }
 runOnce();
-if (nextProbingRebalanceMs.get() < time.milliseconds()) {
+
+// Check for a scheduled rebalance but don't trigger it until 
the current rebalance is done
+if (!taskManager.rebalanceInProgress() && 
nextProbingRebalanceMs.get() < time.milliseconds()) {

Review Comment:
   It seems like this alone is the fix we needed, right?
   
   Maybe a nitpick, but I'm wondering if we really needed the other 
refactoring/renaming to be part of the same PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-17 Thread GitBox


C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1025340543


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
+import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@SuppressWarnings("unchecked")
+@Category(IntegrationTest.class)
+public class RestForwardingIntegrationTest {
+
+private Map sslConfig;
+@Mock
+private Plugins plugins;
+private RestClient followerClient;
+private RestServer followerServer;
+@Mock
+private Herder followerHerder;
+private RestClient leaderClient;
+private RestServer leaderServer;
+@Mock
+private Herder leaderHerder;
+
+private SslContextFactory factory;
+private CloseableHttpClient httpClient;
+private Collection responses = new ArrayList<>();
+
+@Before
+public void setUp() throws IOException, GeneralSecurityException {
+sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, 
TestUtils.tempFile(), "testCert");
+}
+
+@After
+public void tearDown() throws IOException {
+for (CloseableHttpResponse response: responses) {
+response.close();
+}
+AtomicReference firstException = new AtomicReference<>();
+Utils.closeAllQuietly(
+firstException,
+"clientsAndServers",
+httpClient,
+followerServer != null ? followerServer::stop : null,
+leaderServer != null ? leaderServer::stop : null,
+factory != null ? factory::stop : null
+);
+if (firstException.get() != null) {
+ 

[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-17 Thread GitBox


C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1025336265


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
+import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@SuppressWarnings("unchecked")
+@Category(IntegrationTest.class)
+public class RestForwardingIntegrationTest {
+
+private Map sslConfig;
+@Mock
+private Plugins plugins;
+private RestClient followerClient;
+private RestServer followerServer;
+@Mock
+private Herder followerHerder;
+private RestClient leaderClient;
+private RestServer leaderServer;
+@Mock
+private Herder leaderHerder;
+
+private SslContextFactory factory;
+private CloseableHttpClient httpClient;
+private Collection responses = new ArrayList<>();
+
+@Before
+public void setUp() throws IOException, GeneralSecurityException {
+sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, 
TestUtils.tempFile(), "testCert");
+}
+
+@After
+public void tearDown() throws IOException {
+for (CloseableHttpResponse response: responses) {
+response.close();
+}
+AtomicReference firstException = new AtomicReference<>();
+Utils.closeAllQuietly(
+firstException,
+"clientsAndServers",
+httpClient,
+followerServer != null ? followerServer::stop : null,
+leaderServer != null ? leaderServer::stop : null,
+factory != null ? factory::stop : null
+);
+if (firstException.get() != null) {
+ 

[GitHub] [kafka] dajac opened a new pull request, #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-11-17 Thread GitBox


dajac opened a new pull request, #12870:
URL: https://github.com/apache/kafka/pull/12870

   This patch adds `OffsetFetch` to the new `GroupCoordinator` interface and 
updates `KafkaApis` to use it. The changes in `KafkaApis` are larger than what 
I was hoping for. I think that we should refactor this part of the code even 
further but I leave this for a further PR.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2022-11-17 Thread GitBox


divijvaidya commented on PR #12590:
URL: https://github.com/apache/kafka/pull/12590#issuecomment-1318726600

   @dajac please take a look!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-17 Thread Mikael (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635370#comment-17635370
 ] 

Mikael commented on KAFKA-14362:


When comparing successful test runs with those that generate duplicate 
consumption of messages, I noticed that in the success case, for the consumer 
that gets some of its partitions revoked, there is FIRST this log message:
{code:java}
2022-11-17 07:27:27,141 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[ConsumerCoordinator.java:395] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
Updating assignment with
        Assigned partitions:                       
[messages.xms.mt.batch.report-3, messages.xms.mt.batch.report-0, 
messages.xms.mt.batch.report-2, messages.xms.mt.batch.report-1]
        Current owned partitions:                  
[messages.xms.mt.batch.report-4, messages.xms.mt.batch.report-3, 
messages.xms.mt.batch.report-6, messages.xms.mt.batch.report-5, 
messages.xms.mt.batch.report-0, messages.xms.mt.batch.report-2, m
essages.xms.mt.batch.report-1, messages.xms.mt.batch.report-7]
        Added partitions (assigned - owned):       []
        Revoked partitions (owned - assigned):     
[messages.xms.mt.batch.report-4, messages.xms.mt.batch.report-6, 
messages.xms.mt.batch.report-5, messages.xms.mt.batch.report-7] {code}
and THEN a large number of this type of log message:
{code:java}
2022-11-17 07:27:27,149 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[ConsumerCoordinator.java:1156] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
Failing OffsetCommit request since the consumer is not part of an active group 
{code}
whereas in the duplication case, at least one of the latter message is logged 
BEFORE the former one.

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation 

[jira] [Updated] (KAFKA-9156) LazyTimeIndex & LazyOffsetIndex may cause niobufferoverflow in concurrent state

2022-11-17 Thread Bob Tiernay (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Tiernay updated KAFKA-9156:
---
Attachment: image-2022-11-17-09-02-20-774.png

> LazyTimeIndex & LazyOffsetIndex may cause niobufferoverflow in concurrent 
> state
> ---
>
> Key: KAFKA-9156
> URL: https://issues.apache.org/jira/browse/KAFKA-9156
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: shilin Lu
>Assignee: Alex Mironov
>Priority: Blocker
>  Labels: regression
> Fix For: 2.4.0, 2.3.2
>
> Attachments: image-2019-11-07-17-42-13-852.png, 
> image-2019-11-07-17-44-05-357.png, image-2019-11-07-17-46-53-650.png, 
> image-2022-11-17-09-02-20-774.png
>
>
> !image-2019-11-07-17-42-13-852.png!
> this timeindex get function is not thread safe ,may cause create some 
> timeindex.
> !image-2019-11-07-17-44-05-357.png!
> When create timeindex not exactly one ,may cause mappedbytebuffer position to 
> end. Then write index entry to this mmap file will cause 
> java.nio.BufferOverflowException.
>  
> !image-2019-11-07-17-46-53-650.png!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9156) LazyTimeIndex & LazyOffsetIndex may cause niobufferoverflow in concurrent state

2022-11-17 Thread Bob Tiernay (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635367#comment-17635367
 ] 

Bob Tiernay commented on KAFKA-9156:


We too are hitting this on AWS MSK Kafka version 2.8.1:
{code:java}
ERROR Uncaught exception in scheduled task 'kafka-log-retention' 
(kafka.utils.KafkaScheduler)
java.nio.BufferOverflowException
at java.base/java.nio.Buffer.nextPutIndex(Buffer.java:674)
at java.base/java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:882)
at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134)
at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)
at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:506)
at kafka.log.Log.$anonfun$roll$8(Log.scala:2066)
at kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:2066)
at scala.Option.foreach(Option.scala:437)
at kafka.log.Log.$anonfun$roll$2(Log.scala:2066)
at kafka.log.Log.roll(Log.scala:2482)
at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1859)
at kafka.log.Log.deleteSegments(Log.scala:2482)
at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1847)
at kafka.log.Log.deleteOldSegments(Log.scala:1916)
at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:1092)
at 
kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:1089)
at scala.collection.immutable.List.foreach(List.scala:333)
at kafka.log.LogManager.cleanupLogs(LogManager.scala:1089)
at 
kafka.log.LogManager.$anonfun$startupWithConfigOverrides$2(LogManager.scala:429)
at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at 
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
when this happens, disk starts to fill up and exhaust until a restart:
!image-2022-11-17-09-02-20-774.png!

> LazyTimeIndex & LazyOffsetIndex may cause niobufferoverflow in concurrent 
> state
> ---
>
> Key: KAFKA-9156
> URL: https://issues.apache.org/jira/browse/KAFKA-9156
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: shilin Lu
>Assignee: Alex Mironov
>Priority: Blocker
>  Labels: regression
> Fix For: 2.4.0, 2.3.2
>
> Attachments: image-2019-11-07-17-42-13-852.png, 
> image-2019-11-07-17-44-05-357.png, image-2019-11-07-17-46-53-650.png, 
> image-2022-11-17-09-02-20-774.png
>
>
> !image-2019-11-07-17-42-13-852.png!
> this timeindex get function is not thread safe ,may cause create some 
> timeindex.
> !image-2019-11-07-17-44-05-357.png!
> When create timeindex not exactly one ,may cause mappedbytebuffer position to 
> end. Then write index entry to this mmap file will cause 
> java.nio.BufferOverflowException.
>  
> !image-2019-11-07-17-46-53-650.png!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2022-11-17 Thread Ahmed Sobeh (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635340#comment-17635340
 ] 

Ahmed Sobeh edited comment on KAFKA-6579 at 11/17/22 12:44 PM:
---

will give this a shot if it's ok [~teamurko] 


was (Author: JIRAUSER295920):
will give this a shoft if it's ok [~teamurko] 

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2022-11-17 Thread Ahmed Sobeh (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635340#comment-17635340
 ] 

Ahmed Sobeh commented on KAFKA-6579:


will give this a shoft if it's ok [~teamurko] 

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-10409) Refactor Kafka Streams RocksDb iterators

2022-11-17 Thread Ahmed Sobeh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Sobeh reassigned KAFKA-10409:
---

Assignee: (was: Ahmed Sobeh)

> Refactor Kafka Streams RocksDb iterators 
> -
>
> Key: KAFKA-10409
> URL: https://issues.apache.org/jira/browse/KAFKA-10409
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Minor
>  Labels: newbie
>
> From [https://github.com/apache/kafka/pull/9137#discussion_r470345513] :
> [~ableegoldman] : 
> > Kind of unrelated, but WDYT about renaming {{RocksDBDualCFIterator}} to 
> > {{RocksDBDualCFAllIterator}} or something on the side? I feel like these 
> > iterators could be cleaned up a bit in general to be more understandable – 
> > for example, it's weird that we do the {{iterator#seek}}-ing in the actual 
> > {{all()}} method but for range queries we do the seeking inside the 
> > iterator constructor.
> and [https://github.com/apache/kafka/pull/9137#discussion_r470361726] :
> > Personally I found the {{RocksDBDualCFIterator}} logic a bit difficult to 
> > follow even before the reverse iteration, so it would be nice to have some 
> > tests specifically covering reverse iterators over multi-column-family 
> > timestamped stores



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-10409) Refactor Kafka Streams RocksDb iterators

2022-11-17 Thread Ahmed Sobeh (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-10409 ]


Ahmed Sobeh deleted comment on KAFKA-10409:
-

was (Author: JIRAUSER295920):
WIll pick this up and take a look if it's ok with everyone watching

> Refactor Kafka Streams RocksDb iterators 
> -
>
> Key: KAFKA-10409
> URL: https://issues.apache.org/jira/browse/KAFKA-10409
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Ahmed Sobeh
>Priority: Minor
>  Labels: newbie
>
> From [https://github.com/apache/kafka/pull/9137#discussion_r470345513] :
> [~ableegoldman] : 
> > Kind of unrelated, but WDYT about renaming {{RocksDBDualCFIterator}} to 
> > {{RocksDBDualCFAllIterator}} or something on the side? I feel like these 
> > iterators could be cleaned up a bit in general to be more understandable – 
> > for example, it's weird that we do the {{iterator#seek}}-ing in the actual 
> > {{all()}} method but for range queries we do the seeking inside the 
> > iterator constructor.
> and [https://github.com/apache/kafka/pull/9137#discussion_r470361726] :
> > Personally I found the {{RocksDBDualCFIterator}} logic a bit difficult to 
> > follow even before the reverse iteration, so it would be nice to have some 
> > tests specifically covering reverse iterators over multi-column-family 
> > timestamped stores



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-10409) Refactor Kafka Streams RocksDb iterators

2022-11-17 Thread Ahmed Sobeh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Sobeh reassigned KAFKA-10409:
---

Assignee: Ahmed Sobeh

> Refactor Kafka Streams RocksDb iterators 
> -
>
> Key: KAFKA-10409
> URL: https://issues.apache.org/jira/browse/KAFKA-10409
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Ahmed Sobeh
>Priority: Minor
>  Labels: newbie
>
> From [https://github.com/apache/kafka/pull/9137#discussion_r470345513] :
> [~ableegoldman] : 
> > Kind of unrelated, but WDYT about renaming {{RocksDBDualCFIterator}} to 
> > {{RocksDBDualCFAllIterator}} or something on the side? I feel like these 
> > iterators could be cleaned up a bit in general to be more understandable – 
> > for example, it's weird that we do the {{iterator#seek}}-ing in the actual 
> > {{all()}} method but for range queries we do the seeking inside the 
> > iterator constructor.
> and [https://github.com/apache/kafka/pull/9137#discussion_r470361726] :
> > Personally I found the {{RocksDBDualCFIterator}} logic a bit difficult to 
> > follow even before the reverse iteration, so it would be nice to have some 
> > tests specifically covering reverse iterators over multi-column-family 
> > timestamped stores



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10409) Refactor Kafka Streams RocksDb iterators

2022-11-17 Thread Ahmed Sobeh (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635338#comment-17635338
 ] 

Ahmed Sobeh commented on KAFKA-10409:
-

WIll pick this up and take a look if it's ok with everyone watching

> Refactor Kafka Streams RocksDb iterators 
> -
>
> Key: KAFKA-10409
> URL: https://issues.apache.org/jira/browse/KAFKA-10409
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Minor
>  Labels: newbie
>
> From [https://github.com/apache/kafka/pull/9137#discussion_r470345513] :
> [~ableegoldman] : 
> > Kind of unrelated, but WDYT about renaming {{RocksDBDualCFIterator}} to 
> > {{RocksDBDualCFAllIterator}} or something on the side? I feel like these 
> > iterators could be cleaned up a bit in general to be more understandable – 
> > for example, it's weird that we do the {{iterator#seek}}-ing in the actual 
> > {{all()}} method but for range queries we do the seeking inside the 
> > iterator constructor.
> and [https://github.com/apache/kafka/pull/9137#discussion_r470361726] :
> > Personally I found the {{RocksDBDualCFIterator}} logic a bit difficult to 
> > follow even before the reverse iteration, so it would be nice to have some 
> > tests specifically covering reverse iterators over multi-column-family 
> > timestamped stores



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on pull request #12802: KAFKA-14311: Connect Worker clean shutdown does not cleanly stop connectors/tasks

2022-11-17 Thread GitBox


vamossagar12 commented on PR #12802:
URL: https://github.com/apache/kafka/pull/12802#issuecomment-1318504959

   @gharris1727 , i believe this ticket was created by you. Can you take a look 
at this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 closed pull request #12826: Using Timer class to track expiry in IncrementalCooperativeAssignor

2022-11-17 Thread GitBox


vamossagar12 closed pull request #12826: Using Timer class to track expiry in 
IncrementalCooperativeAssignor
URL: https://github.com/apache/kafka/pull/12826


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #12826: Using Timer class to track expiry in IncrementalCooperativeAssignor

2022-11-17 Thread GitBox


vamossagar12 commented on PR #12826:
URL: https://github.com/apache/kafka/pull/12826#issuecomment-1318417587

   @gharris1727 Thanks for the confirmation! Let me close this PR. 
   
   BTW: `we can leave this long as-is for a little longer.` this was nice :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] patrik-marton commented on a diff in pull request #12846: KAFKA-14293: Basic Auth filter should set the SecurityContext after a…

2022-11-17 Thread GitBox


patrik-marton commented on code in PR #12846:
URL: https://github.com/apache/kafka/pull/12846#discussion_r1024994387


##
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##
@@ -174,4 +153,84 @@ public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
 ));
 }
 }
+
+private void setSecurityContextForRequest(ContainerRequestContext 
requestContext, BasicAuthenticationCredential credential) {
+requestContext.setSecurityContext(new SecurityContext() {
+@Override
+public Principal getUserPrincipal() {
+return () -> credential.getUsername();
+}
+
+@Override
+public boolean isUserInRole(String role) {
+return false;
+}
+
+@Override
+public boolean isSecure() {
+return 
requestContext.getUriInfo().getRequestUri().getScheme().equalsIgnoreCase("https");
+}
+
+@Override
+public String getAuthenticationScheme() {
+return BASIC_AUTH;
+}
+});
+}
+
+
+public static class BasicAuthenticationCredential {
+private String username;
+private String password;
+
+public BasicAuthenticationCredential(String authorizationHeader) {
+final char colon = ':';
+final char space = ' ';
+final String authType = "basic";

Review Comment:
   Since these are only used inside the constructor, I moved them inside the 
method where they cannot be static. Using the SecurityContext.BASIC_AUTH would 
make sense, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] patrik-marton commented on a diff in pull request #12846: KAFKA-14293: Basic Auth filter should set the SecurityContext after a…

2022-11-17 Thread GitBox


patrik-marton commented on code in PR #12846:
URL: https://github.com/apache/kafka/pull/12846#discussion_r1024987213


##
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##
@@ -174,4 +153,84 @@ public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
 ));
 }
 }
+
+private void setSecurityContextForRequest(ContainerRequestContext 
requestContext, BasicAuthenticationCredential credential) {
+requestContext.setSecurityContext(new SecurityContext() {
+@Override
+public Principal getUserPrincipal() {
+return () -> credential.getUsername();
+}
+
+@Override
+public boolean isUserInRole(String role) {
+return false;
+}
+
+@Override
+public boolean isSecure() {
+return 
requestContext.getUriInfo().getRequestUri().getScheme().equalsIgnoreCase("https");
+}
+
+@Override
+public String getAuthenticationScheme() {
+return BASIC_AUTH;
+}
+});
+}
+
+
+public static class BasicAuthenticationCredential {
+private String username;
+private String password;
+
+public BasicAuthenticationCredential(String authorizationHeader) {
+final char colon = ':';
+final char space = ' ';
+final String authType = "basic";
+
+initializeEmptyCredentials();
+
+if (authorizationHeader == null) {
+log.trace("No credentials were provided with the request");
+return;
+}
+
+int spaceIndex = authorizationHeader.indexOf(space);
+if (spaceIndex <= 0) {
+log.trace("Request credentials were malformed; no space 
present in value for authorization header");
+return;
+}
+
+String method = authorizationHeader.substring(0, spaceIndex);
+if (!authType.equalsIgnoreCase(method)) {
+log.trace("Request credentials used {} authentication, but 
only {} supported; ignoring", method, authType);
+return;
+}
+
+authorizationHeader = authorizationHeader.substring(spaceIndex + 
1);
+authorizationHeader = new 
String(Base64.getDecoder().decode(authorizationHeader),
+StandardCharsets.UTF_8);
+int i = authorizationHeader.indexOf(colon);
+if (i <= 0) {
+log.trace("Request credentials were malformed; no colon 
present between username and password");
+return;
+}
+
+this.username = authorizationHeader.substring(0, i);
+this.password = authorizationHeader.substring(i + 1);
+}
+
+private void initializeEmptyCredentials() {
+this.username = "";
+this.password = "";
+}

Review Comment:
   I modified it to avoid null checks where the `BasicAuthenticationCredential` 
is used. I know the `BasicAuthCallBackHandler` would be fine with null, but 
setting the user in the security context would require additional null check. ( 
Theoretically it cannot be null there after the successful login, but I just 
felt like it is safer to have a default value) What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #12839: KAFKA-14346: Replace static mocking of WorkerConfig::lookupKafkaClusterId

2022-11-17 Thread GitBox


mimaison merged PR #12839:
URL: https://github.com/apache/kafka/pull/12839


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-17 Thread GitBox


dajac commented on PR #12845:
URL: https://github.com/apache/kafka/pull/12845#issuecomment-1318257181

   Rebased the PR. Ready for second round.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org