Re: [PR] [No Review] Kafka-14563 [kafka]

2024-08-19 Thread via GitHub


github-actions[bot] commented on PR #15657:
URL: https://github.com/apache/kafka/pull/15657#issuecomment-2297941327

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-24 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1578348187


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##
@@ -40,6 +40,7 @@
 import static org.apache.kafka.common.requests.ProduceResponse.INVALID_OFFSET;
 
 public class ProduceRequest extends AbstractRequest {
+private static final short TRANSACTION_V2_MINIMAL_VERSION = 12;

Review Comment:
   nit: should we have the last version before v2 to avoid the -1s in the 
methods? For isTransactionalV2Requested, we can have strictly greater than the 
version. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-24 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1578346148


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -979,6 +1013,13 @@ void handleCoordinatorReady() {
 null;
 this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null &&
 initProducerIdVersion.maxVersion() >= 3;
+
+// TODO(caliu) use feature version.
+ApiVersion produceVersion = nodeApiVersions != null ?
+nodeApiVersions.apiVersion(ApiKeys.PRODUCE) :
+null;
+this.coordinatorSupportsTransactionV2 = produceVersion != null &&

Review Comment:
   Sorry I think I used some unclear wording. When I said "checking TV", I 
meant using the nodeApiVersions to check the TV on requests -- this will be in 
ApiVersionsResponseData.finalizedFeatures. We will also want to check 
ApiVersionsResponseData.finalizedFeaturesEpoch when we update the feature 
version. I was thinking the transaction manager could have some sort of central 
mechanism that checks the api version and updates on the various requests. This 
could be done in a helper. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-24 Thread via GitHub


jolshan commented on PR #15657:
URL: https://github.com/apache/kafka/pull/15657#issuecomment-2075537428

   Thanks @CalvinConfluent I will try to take a look sometime today đź‘Ť 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-24 Thread via GitHub


CalvinConfluent commented on PR #15657:
URL: https://github.com/apache/kafka/pull/15657#issuecomment-2075521654

   @jolshan Updated the PR
   1. Reverted the KafkaApi changes. Now if the produce request with the 
TransactionV2 supported version, the server will process it no matter what 
Transaction version is in use.
   2. Now the client will use the correct version depending on whether 
Transaction V2 is enabled.
   3. Added Produce request UT and corrected send produce request logic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-24 Thread via GitHub


CalvinConfluent commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1578298683


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -109,7 +109,7 @@ class AddPartitionsToTxnManager(
 .setTransactionalId(transactionalId)
 .setProducerId(producerId)
 .setProducerEpoch(producerEpoch)
-.setVerifyOnly(true)
+.setVerifyOnly(supportedOperation != addPartition)

Review Comment:
   Reverted the kafka api changes, anywhere else needs to use 
supportedOperation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-24 Thread via GitHub


CalvinConfluent commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1578296803


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -402,6 +412,30 @@ public synchronized void maybeAddPartition(TopicPartition 
topicPartition) {
 }
 }
 
+public synchronized void maybeHandlePartitionAdded(TopicPartition 
topicPartition) {

Review Comment:
   Refactored this part, and removed the previous handling.
   This method is called as a part of the produce callback and if there is any 
error in the produceResponse, it will not be called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-09 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1558104084


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 } else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId)) {
   
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
   CompletableFuture.completedFuture[Unit](())
+} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested) {
+  // If the client requests to use transaction V2 but server side does not 
supports it, return unsupported version.

Review Comment:
   This can cause issues during upgrades where we flip flop the version we are 
using. This is why we decided not to use this approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-09 Thread via GitHub


CalvinConfluent commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1558097863


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 } else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId)) {
   
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
   CompletableFuture.completedFuture[Unit](())
+} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested) {
+  // If the client requests to use transaction V2 but server side does not 
supports it, return unsupported version.

Review Comment:
   > TV is downgraded -- in this case, we can still handle the old request and 
we should do so.
   If I understand correctly, we should continue to serve the V2 requests but 
let the downstream code return errors. In the txnOffsetCommitRequest example, 
when the broker is on TV 1 and with TV 2 enabled image, if the broker receives 
a new version txnOffsetCommitRequest, it should continue and verify the 
transaction. If the offset partition has not been added to the transaction, we 
should return the error. Also, when the client receives this error, it should 
refresh the API versions.



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 } else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId)) {
   
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
   CompletableFuture.completedFuture[Unit](())
+} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested) {
+  // If the client requests to use transaction V2 but server side does not 
supports it, return unsupported version.

Review Comment:
   > TV is downgraded -- in this case, we can still handle the old request and 
we should do so.
   
   If I understand correctly, we should continue to serve the V2 requests but 
let the downstream code return errors. In the txnOffsetCommitRequest example, 
when the broker is on TV 1 and with TV 2 enabled image, if the broker receives 
a new version txnOffsetCommitRequest, it should continue and verify the 
transaction. If the offset partition has not been added to the transaction, we 
should return the error. Also, when the client receives this error, it should 
refresh the API versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-09 Thread via GitHub


CalvinConfluent commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1557881030


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -979,6 +1013,13 @@ void handleCoordinatorReady() {
 null;
 this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null &&
 initProducerIdVersion.maxVersion() >= 3;
+
+// TODO(caliu) use feature version.
+ApiVersion produceVersion = nodeApiVersions != null ?
+nodeApiVersions.apiVersion(ApiKeys.PRODUCE) :
+null;
+this.coordinatorSupportsTransactionV2 = produceVersion != null &&

Review Comment:
   Good point! Will check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556614475


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -109,7 +109,7 @@ class AddPartitionsToTxnManager(
 .setTransactionalId(transactionalId)
 .setProducerId(producerId)
 .setProducerEpoch(producerEpoch)
-.setVerifyOnly(true)
+.setVerifyOnly(supportedOperation != addPartition)

Review Comment:
   nit -- we should also add the supportedOperation in KafkaApis and/or the 
other files based on the request version. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556609642


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -979,6 +1013,13 @@ void handleCoordinatorReady() {
 null;
 this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null &&
 initProducerIdVersion.maxVersion() >= 3;
+
+// TODO(caliu) use feature version.
+ApiVersion produceVersion = nodeApiVersions != null ?
+nodeApiVersions.apiVersion(ApiKeys.PRODUCE) :
+null;
+this.coordinatorSupportsTransactionV2 = produceVersion != null &&

Review Comment:
   We should also be checking the TV on the various requests and making sure we 
check the epoch when we update the cluster's latest TV.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556603619


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 } else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId)) {
   
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
   CompletableFuture.completedFuture[Unit](())
+} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested) {
+  // If the client requests to use transaction V2 but server side does not 
supports it, return unsupported version.

Review Comment:
   This was not in the design. If we send a request version the server can 
handle, we can handle it.
   
   Ie --> if ApiVersions advertises TV 1, then the server has the code to 
handle V1, and we send the new request version. There are two cases where 
`!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested` is true. 
   
   1) TV is downgraded only (no image changes) -- in this case, we can still 
handle the old request and we should do so. 
   2) TV is downgraded + image version is downgraded. In this case, when the 
server receives v5 request, it will return unsupported version automatically 
since it doesn't recognize this version.
   
   The reason we do this is there is no way to guarantee that downgrades happen 
immediately due to the way ApiVersions requests propagate. (The only way to 
ensure it happens is to restart a broker) Thus, we took this strategy:
   
   > The downgrade case is a bit different. When we downgrade TV, it is 
possible to not receive an update communicating this from any broker for a long 
time. We could even start rolling an incompatible image to the cluster. Once we 
do this roll however, the brokers will reconnect and update the TV with the 
newest epoch. As we are checking the TV on every request, we can abort the 
transaction and restart with the new epoch of TV and the old protocol. However, 
in the edge case where we somehow send a request to an older image broker, we 
know that the new protocol is gated by the Produce/TxnOffsetCommit and 
AddPartitionsToTxn versions. If we encounter a broker that is unable to handle 
the protocol, it is also unable to handle the request version. In this case, we 
will return UnsupportedVersionException which is fatal to the client. In most 
cases, we shouldn’t hit this scenario.
   
   We also chose this approach as to not cause flip-flopping during the upgrade 
case. 
   



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 } else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId)) {
   
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
   CompletableFuture.completedFuture[Unit](())
+} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested) {
+  // If the client requests to use transaction V2 but server side does not 
supports it, return unsupported version.

Review Comment:
   This was not in the design. If we send a request version the server can 
handle, we can handle it.
   
   Ie --> if ApiVersions advertises TV 1, then the server has the code to 
handle V1, and we send the new request version. There are two cases where 
`!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested` is true. 
   
   1) TV is downgraded only (no image changes) -- in this case, we can still 
handle the old request and we should do so. 
   2) TV is downgraded + image version is downgraded. In this case, when the 
server receives v5 request, it will return unsupported version automatically 
since it doesn't recognize this version.
   
   The reason we do this is there is no way to guarantee that downgrades happen 
immediately due to the way ApiVersions requests propagate. (The only way to 
ensure it happens immediately is to restart a broker) Thus, we took this 
strategy:
   
   > The downgrade case is a bit different. When we downgrade TV, it is 
possible to not receive an update communicating this from any broker for a long 
time. We could even start rolling an incompatible image to the cluster. Once we 
do this roll however, the brokers will reconnect and update the TV with the 
newest epoch. As we are checking the TV on every request, we can abort the 
transaction and restart with the new epoch of TV and the old protocol. However, 
in the edge case where we somehow send a request to an older image broker, we 
know that the new protocol is gated by the Produce/TxnOffsetCommit and 
AddPartitionsToTxn 

Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556603619


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 } else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId)) {
   
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
   CompletableFuture.completedFuture[Unit](())
+} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested) {
+  // If the client requests to use transaction V2 but server side does not 
supports it, return unsupported version.

Review Comment:
   This was not in the design. If we send a request version the server can 
handle, we can handle it.
   
   Ie --> if ApiVersions advertises TV 1, then the server has the code to 
handle V1, and we send the new request version. There are two cases where 
`!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested` is true. 
   
   1) TV is downgraded -- in this case, we can still handle the old request and 
we should do so. 
   2) TV is downgraded + image version is downgraded. In this case, when the 
server receives v5 request, it will return unsupported version automatically 
since it doesn't recognize this version.
   
   The reason we do this is there is no way to guarantee that downgrades happen 
immediately due to the way ApiVersions requests propagate. (The only way to 
ensure it happens is to restart a broker) Thus, we took this strategy:
   
   > The downgrade case is a bit different. When we downgrade TV, it is 
possible to not receive an update communicating this from any broker for a long 
time. We could even start rolling an incompatible image to the cluster. Once we 
do this roll however, the brokers will reconnect and update the TV with the 
newest epoch. As we are checking the TV on every request, we can abort the 
transaction and restart with the new epoch of TV and the old protocol. However, 
in the edge case where we somehow send a request to an older image broker, we 
know that the new protocol is gated by the Produce/TxnOffsetCommit and 
AddPartitionsToTxn versions. If we encounter a broker that is unable to handle 
the protocol, it is also unable to handle the request version. In this case, we 
will return UnsupportedVersionException which is fatal to the client. In most 
cases, we shouldn’t hit this scenario.
   
   We also chose this approach as to not cause flip-flopping during the upgrade 
case. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556603619


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 } else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId)) {
   
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
   CompletableFuture.completedFuture[Unit](())
+} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested) {
+  // If the client requests to use transaction V2 but server side does not 
supports it, return unsupported version.

Review Comment:
   This was not in the design. If we send a request version the server can 
handle, we can handle it.
   
   Ie --> if ApiVersions advertises TV 1, then the server has the code to 
handle V1, and we send the new request version. There are two cases where 
`!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested` is false. 
   
   1) TV is downgraded -- in this case, we can still handle the old request and 
we should do so. 
   2) TV is downgraded + image version is downgraded. In this case, when the 
server receives v5 request, it will return unsupported version automatically 
since it doesn't recognize this version.
   
   The reason we do this is there is no way to guarantee that downgrades happen 
immediately due to the way ApiVersions requests propagate. (The only way to 
ensure it happens is to restart a broker) Thus, we took this strategy:
   
   > The downgrade case is a bit different. When we downgrade TV, it is 
possible to not receive an update communicating this from any broker for a long 
time. We could even start rolling an incompatible image to the cluster. Once we 
do this roll however, the brokers will reconnect and update the TV with the 
newest epoch. As we are checking the TV on every request, we can abort the 
transaction and restart with the new epoch of TV and the old protocol. However, 
in the edge case where we somehow send a request to an older image broker, we 
know that the new protocol is gated by the Produce/TxnOffsetCommit and 
AddPartitionsToTxn versions. If we encounter a broker that is unable to handle 
the protocol, it is also unable to handle the request version. In this case, we 
will return UnsupportedVersionException which is fatal to the client. In most 
cases, we shouldn’t hit this scenario.
   
   We also chose this approach as to not cause flip-flopping during the upgrade 
case. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556584820


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -979,6 +1013,13 @@ void handleCoordinatorReady() {
 null;
 this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null &&
 initProducerIdVersion.maxVersion() >= 3;
+
+// TODO(caliu) use feature version.
+ApiVersion produceVersion = nodeApiVersions != null ?
+nodeApiVersions.apiVersion(ApiKeys.PRODUCE) :
+null;
+this.coordinatorSupportsTransactionV2 = produceVersion != null &&

Review Comment:
   I think the other thing we are looking for is in the txn offset commit 
request and the produce request, setting the version to 11 and 4 until 
v2/feature version is enabled. I didn't see that here yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556576980


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -402,6 +412,30 @@ public synchronized void maybeAddPartition(TopicPartition 
topicPartition) {
 }
 }
 
+public synchronized void maybeHandlePartitionAdded(TopicPartition 
topicPartition) {

Review Comment:
   For my understanding, this was the previous handling for the add partition 
call, but since add partition is now part of the produce request, we have a 
separate block?
   
   I wonder if we should change some of these error messages to reflect that 
the record was not written either?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556556192


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -202,7 +202,9 @@ public enum MetadataVersion {
 IBP_3_7_IV4(19, "3.7", "IV4", false),
 
 // Add ELR related supports (KIP-966).
-IBP_3_8_IV0(20, "3.8", "IV0", true);
+IBP_3_8_IV0(20, "3.8", "IV0", true),
+
+IBP_100_1_IV0(100, "100.1", "IV0", false);

Review Comment:
   hehe this is a unique way to have a placeholder for feature version :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org