[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-03-02 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r585800691



##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -1813,17 +1813,25 @@ class AuthorizerIntegrationTest extends BaseRequestTest 
{
 producer.beginTransaction()
 producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, 
"1".getBytes)).get
 
+def assertListTransactionResult(
+  expectedTransactionalIds: Set[String]
+): Unit = {
+  val listTransactionsRequest = new ListTransactionsRequest.Builder(new 
ListTransactionsRequestData()).build()
+  val listTransactionsResponse = 
connectAndReceive[ListTransactionsResponse](listTransactionsRequest)
+  assertEquals(Errors.NONE, 
Errors.forCode(listTransactionsResponse.data.errorCode))
+  assertEquals(expectedTransactionalIds, 
listTransactionsResponse.data.transactionStates.asScala.map(_.transactionalId).toSet)
+}
+
 // First verify that we can list the transaction
-val listTransactionsRequest = new ListTransactionsRequest.Builder(new 
ListTransactionsRequestData()).build()
-val authorizedResponse = 
connectAndReceive[ListTransactionsResponse](listTransactionsRequest)
-assertEquals(Errors.NONE, 
Errors.forCode(authorizedResponse.data.errorCode))
-assertEquals(Set(transactionalId), 
authorizedResponse.data.transactionStates.asScala.map(_.transactionalId).toSet)
+assertListTransactionResult(expectedTransactionalIds = 
Set(transactionalId))
 
 // Now revoke authorization and verify that the transaction is no longer 
listable
 removeAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)), transactionalIdResource)
-val unauthorizedResponse = 
connectAndReceive[ListTransactionsResponse](listTransactionsRequest)
-assertEquals(Errors.NONE, 
Errors.forCode(unauthorizedResponse.data.errorCode))
-assertEquals(Set(), 
unauthorizedResponse.data.transactionStates.asScala.map(_.transactionalId).toSet)
+assertListTransactionResult(expectedTransactionalIds = Set())
+
+// The minimum permission needed is `Describe`
+addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)), transactionalIdResource)

Review comment:
   Ugh, yes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-03-02 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r585654890



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3303,6 +3304,28 @@ class KafkaApis(val requestChannel: RequestChannel,
   new 
DescribeTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs)))
   }
 
+  def handleListTransactionsRequest(request: RequestChannel.Request): Unit = {
+val listTransactionsRequest = request.body[ListTransactionsRequest]
+val filteredProducerIds = 
listTransactionsRequest.data.producerIdFilter.asScala.map(Long.unbox).toSet
+val filteredStates = 
listTransactionsRequest.data.statesFilter.asScala.toSet
+val response = txnCoordinator.handleListTransactions(filteredProducerIds, 
filteredStates)
+
+// The response should contain only transactionalIds that the principal
+// has `Describe` permission to access.
+if (response.transactionStates != null) {
+  val transactionStateIter = response.transactionStates.iterator()
+  while (transactionStateIter.hasNext) {

Review comment:
   We have not generally taken the view that cluster permission implies 
other permissions as far as I know. For example, cluster permission is not 
checked in the `Metadata` API to be able to list all topics. 
   
   We do have the following check in the `ListGroups` API, but the code notes 
that this was for compatibility.
   ```scala
   if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, 
CLUSTER_NAME))
 // With describe cluster access all groups are returned. We keep this 
alternative for backward compatibility.
 requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
   createResponse(requestThrottleMs, groups, error))
   else {
 val filteredGroups = groups.filter(group => 
authHelper.authorize(request.context, DESCRIBE, GROUP, group.groupId))
 requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
   createResponse(requestThrottleMs, filteredGroups, error))
   }
   ```
   It's worth keeping in mind that the `ListTransactions` API should be 
seldom-used. I don't think we need to get too crazy with optimizations. We can 
always broaden the allowed ACLs in the future.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-03-02 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r585622686



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -223,6 +224,63 @@ class TransactionStateManager(brokerId: Int,
   throw new IllegalStateException(s"Unexpected empty transaction metadata 
returned while putting $txnMetadata")))
   }
 
+  def listTransactionStates(
+filterProducerIds: Set[Long],
+filterStateNames: Set[String]
+  ): ListTransactionsResponseData = {
+inReadLock(stateLock) {
+  val response = new ListTransactionsResponseData()
+  if (loadingPartitions.nonEmpty) {
+response.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code)
+  } else {
+val filterStates = mutable.Set.empty[TransactionState]
+val unknownStates = new java.util.ArrayList[String]
+filterStateNames.foreach { stateName =>
+  TransactionState.fromName(stateName) match {
+case Some(state) => filterStates += state
+case None => unknownStates.add(stateName)

Review comment:
   Yeah, I agree. It is simpler. I guess I don't have a strong reason to 
make this field nullable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-03-01 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r585128257



##
File path: 
clients/src/main/resources/common/message/ListTransactionsResponse.json
##
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 66,
+  "type": "response",
+  "name": "ListTransactionsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+  { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+"about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+  { "name": "ErrorCode", "type": "int16", "versions": "0+" },
+  { "name": "TransactionStates", "type": "[]TransactionState", "versions": 
"0+", "fields": [
+{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId" },
+{ "name": "ProducerId", "type": "int64", "versions": "0+", 
"entityType": "producerId" },
+{ "name": "TransactionState", "type": "string", "versions": "0+" }

Review comment:
   I added an `about` tag for `TransactionState`. I think the `entityType` 
tag is enough for the other two fields.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-03-01 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r585123307



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int,
   throw new IllegalStateException(s"Unexpected empty transaction metadata 
returned while putting $txnMetadata")))
   }
 
+  def listTransactionStates(
+filterProducerIds: Set[Long],
+filterStateNames: Set[String]
+  ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = {
+inReadLock(stateLock) {
+  if (loadingPartitions.nonEmpty) {
+Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+  } else {
+val filterStates = filterStateNames.flatMap(TransactionState.fromName)

Review comment:
   You can see `listConsumerGroups` to see how this was handled in the 
analogous case for consumer groups.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-26 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r583877084



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int,
   throw new IllegalStateException(s"Unexpected empty transaction metadata 
returned while putting $txnMetadata")))
   }
 
+  def listTransactionStates(
+filterProducerIds: Set[Long],
+filterStateNames: Set[String]
+  ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = {
+inReadLock(stateLock) {
+  if (loadingPartitions.nonEmpty) {
+Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+  } else {
+val filterStates = filterStateNames.flatMap(TransactionState.fromName)

Review comment:
   It's a reasonable suggestion. Let me give it a try.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-25 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r583287853



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3303,6 +3304,28 @@ class KafkaApis(val requestChannel: RequestChannel,
   new 
DescribeTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs)))
   }
 
+  def handleListTransactionsRequest(request: RequestChannel.Request): Unit = {
+val listTransactionsRequest = request.body[ListTransactionsRequest]
+val response = new ListTransactionsResponseData()
+
+val filteredProducerIds = 
listTransactionsRequest.data.producerIdFilter.asScala.map(Long.unbox).toSet
+val filteredStates = 
listTransactionsRequest.data.statesFilter.asScala.toSet
+
+txnCoordinator.handleListTransactions(filteredProducerIds, filteredStates) 
match {
+  case Left(error) =>
+response.setErrorCode(error.code)
+  case Right(transactions) =>
+val authorizedTransactions = transactions.filter { state =>
+  authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
state.transactionalId)

Review comment:
   In the case of `DescribeTransactions`, the set of transaction states to 
include in the response are explicitly defined in the request. With 
`ListTransactions`, we only have filters. The only error state to convey is at 
the top level. For individual transaction states, they either match the filters 
or they do not.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582568344



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
##
@@ -25,8 +25,50 @@ import org.apache.kafka.common.record.RecordBatch
 
 import scala.collection.{immutable, mutable}
 
+
+object TransactionState {
+  val AllStates = Set(
+Empty,
+Ongoing,
+PrepareCommit,
+PrepareAbort,
+CompleteCommit,
+CompleteAbort,
+Dead,
+PrepareEpochFence
+  )
+
+  def fromName(name: String): Option[TransactionState] = {
+name match {
+  case "Empty" => Some(Empty)
+  case "Ongoing" => Some(Ongoing)
+  case "PrepareCommit" => Some(PrepareCommit)
+  case "PrepareAbort" => Some(PrepareAbort)
+  case "CompleteCommit" => Some(CompleteCommit)
+  case "CompleteAbort" => Some(CompleteAbort)
+  case "PrepareEpochFence" => Some(PrepareEpochFence)
+  case "Dead" => Some(Dead)
+  case _ => None
+}
+  }
+
+  def fromId(id: Byte): TransactionState = {

Review comment:
   I left this one as it was since I am not 100% sure whether it has any 
impact on the performance of transaction loading.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582567640



##
File path: 
clients/src/main/resources/common/message/ListTransactionsRequest.json
##
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 66,
+  "type": "request",
+  "listeners": ["zkBroker", "broker"],
+  "name": "ListTransactionsRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "StatesFilter", "type": "[]string", "versions": "0+",
+  "about": "The transaction states to filter by: if empty, all 
transactions are returned; if non-empty, then only transactions matching one of 
the filtered states will be returned"
+},
+{ "name": "ProducerIdFilter", "type": "[]int64", "versions": "0+",
+  "about": "The producerIds to filter by: if empty, no transactions will 
be returned; if non-empty, only transactions which match one of the filtered 
producerIds will be returned"

Review comment:
   Yeah, my bad. On a side note, do you think it would be clearer if we 
used `null` to indicate the absence of the filter?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582566067



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int,
   throw new IllegalStateException(s"Unexpected empty transaction metadata 
returned while putting $txnMetadata")))
   }
 
+  def listTransactionStates(
+filterProducerIds: Set[Long],
+filterStateNames: Set[String]
+  ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = {
+inReadLock(stateLock) {
+  if (loadingPartitions.nonEmpty) {

Review comment:
   The problem is that we don't have a way to map from the producerId to 
the partition. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582565318



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int,
   throw new IllegalStateException(s"Unexpected empty transaction metadata 
returned while putting $txnMetadata")))
   }
 
+  def listTransactionStates(
+filterProducerIds: Set[Long],
+filterStateNames: Set[String]
+  ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = {
+inReadLock(stateLock) {
+  if (loadingPartitions.nonEmpty) {
+Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+  } else {
+val filterStates = filterStateNames.flatMap(TransactionState.fromName)

Review comment:
   I could be persuaded probably. Suppose we add a new state in the future. 
If a user tried to query that state on an old broker, definitely no 
transactions would exist in that state. From that perspective, the 
implementation would return an accurate result. On the other hand, I can see 
how that might be misleading. If we decide to return an error code, then I 
think we'll have to treat the state filter a little more carefully from a 
versioning perspective. A new state would probably demand a new version. I 
think we tried to view the states a little bit more loosely in the `ListGroups` 
API (which this is modeled after) because we considered the state machine more 
of an internal implementation detail. This, by the way, is why we went with the 
string representation rather than numeric ids. This became more of a grey area 
though when the state filter was added to the request...
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582558759



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ListTransactionsRequestData;
+import org.apache.kafka.common.message.ListTransactionsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class ListTransactionsRequest extends AbstractRequest {
+public static class Builder extends 
AbstractRequest.Builder {
+public final ListTransactionsRequestData data;
+
+public Builder(ListTransactionsRequestData data) {
+super(ApiKeys.LIST_TRANSACTIONS);
+this.data = data;
+}
+
+@Override
+public ListTransactionsRequest build(short version) {
+return new ListTransactionsRequest(data, version);
+}
+
+@Override
+public String toString() {
+return data.toString();
+}
+}
+
+private final ListTransactionsRequestData data;
+
+private ListTransactionsRequest(ListTransactionsRequestData data, short 
version) {
+super(ApiKeys.LIST_TRANSACTIONS, version);
+this.data = data;
+}
+
+public ListTransactionsRequestData data() {
+return data;
+}
+
+@Override
+public ListTransactionsResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
+Errors error = Errors.forException(e);
+ListTransactionsResponseData response = new 
ListTransactionsResponseData()

Review comment:
   Haha, I should really have checked for that after the last PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org