[GitHub] [kafka] clolov closed pull request #13068: KAFKA-13999: Add ProducerIdCount metric
clolov closed pull request #13068: KAFKA-13999: Add ProducerIdCount metric URL: https://github.com/apache/kafka/pull/13068 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13068: KAFKA-13999: Add ProducerIdCount metric
clolov commented on PR #13068: URL: https://github.com/apache/kafka/pull/13068#issuecomment-1374723793 Got it, thank you, I am closing this and moving my comments to the other pull request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
clolov commented on PR #13078: URL: https://github.com/apache/kafka/pull/13078#issuecomment-1374723416 Is there a reason we don't change the HashMap to a Concurrent*Map and just call `size()` on it? This way it becomes very clear that we expect some sort of a concurrent operation to happen on the map and we would like it to be ordered with respect to other operations and we do not have to rely on people knowing they need to interact with the `producers` map via additional functions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14607) Move Scheduler/KafkaScheduler to server-common
Ismael Juma created KAFKA-14607: --- Summary: Move Scheduler/KafkaScheduler to server-common Key: KAFKA-14607 URL: https://issues.apache.org/jira/browse/KAFKA-14607 Project: Kafka Issue Type: Sub-task Reporter: Ismael Juma Assignee: Ismael Juma Fix For: 3.5.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
ijuma commented on PR #13046: URL: https://github.com/apache/kafka/pull/13046#issuecomment-1374714539 @satishd When you have some cycles, please resolve the conflicts on this PR. I think we can focus on merging this next and then #13040. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module
ijuma commented on code in PR #13085: URL: https://github.com/apache/kafka/pull/13085#discussion_r1064083858 ## clients/src/main/java/org/apache/kafka/common/utils/FetchRequestUtils.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +public class FetchRequestUtils { Review Comment: I would just add these methods to the existing `FetchRequest` class. ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2312,26 +2313,26 @@ class ReplicaManagerTest { } private def fetchPartitions( -replicaManager: ReplicaManager, -replicaId: Int, -fetchInfos: Seq[(TopicIdPartition, PartitionData)], -responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, -requestVersion: Short = ApiKeys.FETCH.latestVersion, -maxWaitMs: Long = 0, -minBytes: Int = 1, -maxBytes: Int = 1024 * 1024, -quota: ReplicaQuota = UnboundedQuota, -isolation: FetchIsolation = FetchLogEnd, -clientMetadata: Option[ClientMetadata] = None + replicaManager: ReplicaManager, + replicaId: Int, + fetchInfos: Seq[(TopicIdPartition, PartitionData)], + responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, + requestVersion: Short = ApiKeys.FETCH.latestVersion, + maxWaitMs: Long = 0, + minBytes: Int = 1, + maxBytes: Int = 1024 * 1024, + quota: ReplicaQuota = UnboundedQuota, + isolation: FetchIsolation = FetchIsolation.LOG_END, + clientMetadata: Option[ClientMetadata] = None Review Comment: Why this indent change? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -123,12 +123,12 @@ case class LogReadResult(info: FetchDataInfo, this.info.records, this.divergingEpoch, this.lastStableOffset, -this.info.abortedTransactions, +if (this.info.abortedTransactions.isPresent) Some(this.info.abortedTransactions.get().asScala.toList) else None, Review Comment: The last `toList` results in a copy of the collection, we'd want to avoid that. ## storage/src/main/java/org/apache/kafka/server/log/internals/FetchDataInfo.java: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Records; + +import java.util.List; +import java.util.Optional; + +public class FetchDataInfo { +private final LogOffsetMetadata fetchOffsetMetadata; +private final Records records; +private final boolean firstEntryIncomplete; +private final Optional> abortedTransactions; + +public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, + Records records) { +this(fetchOffsetMetadata, records, false, Optional.empty()); +} + +public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, + Records records, + boolean firstEntryIncomplete) { +this(fetchOffsetMetadata, records,
[GitHub] [kafka] ijuma commented on pull request #13087: KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest
ijuma commented on PR #13087: URL: https://github.com/apache/kafka/pull/13087#issuecomment-1374711202 @jolshan you'll go ahead and merge this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
ijuma commented on PR #13043: URL: https://github.com/apache/kafka/pull/13043#issuecomment-1374705681 @satishd I submitted a follow-up PR that removes the public fields from `ProducerStateEntry`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request, #13091: MINOR: Remove public mutable fields from ProducerAppendInfo
ijuma opened a new pull request, #13091: URL: https://github.com/apache/kafka/pull/13091 Replace them with accessors and mutators. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
ijuma merged PR #13043: URL: https://github.com/apache/kafka/pull/13043 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
ijuma commented on PR #13043: URL: https://github.com/apache/kafka/pull/13043#issuecomment-1374704100 JDK 8 build passed, the other builds had unrelated flaky failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1064077119 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) { try { fencableProducer.beginTransaction(); -fencableProducer.send(new ProducerRecord<>(topic, key, value)); +fencableProducer.send(new ProducerRecord<>(topic, key, value), (metadata, exception) -> { Review Comment: I'm not sure I follow what benefit we'd be getting here by handling both the producer callback error as well as the one thrown by `commitTransaction`? The control flow would be more straightforward by removing the producer callback and just relying on `commitTransaction` to throw exceptions, if any. The producer's Javadoc itself also suggests that callbacks need not be defined when using the transactional producer since `commitTransaction` will throw the error from the last failed send in a transaction. > making them more vague to compensate We wouldn't be making it more vague. The message would state that the write to the config topic failed which is the cause for failure. Since the exception mapper used by Connect's REST server only writes the [top level exception's message](https://github.com/apache/kafka/blob/d798ec779c25dba31fa5ee9384d159ed54c6e07b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L72) to the response (i.e. nested exceptions aren't surfaced via the REST API response), I think it makes sense to keep the top level exception's message generic and allow users to debug further via the worker logs (where the entire exception chain's stack trace will be visible). Note that I'm suggesting a similar change for the non-EOS enabled case as well - i.e. don't use the producer error directly [here](https://github.com/apache/kafka/pull/12984/files#diff-c346b7b90fe30ac08b4211375fe36208139a90e40838dd3a7996021a8c4c5b13R1064), instead wrapping it in a `ConnectExc eption` which says that the write to the config topic failed. The reasoning here is that since a Connect user may not even know that Connect uses a producer under the hood to write certain requests to the config topic for asynchronous processing, it would make more sense to have an informative Connect specific exception message rather than directly throwing the producer exception which may or may not contain enough details to be relevant to a Connect user. > If we're hiding the result from the REST calls, are we not also hiding the error from the herder tick thread? Hm no, the hiding issue was only for non-EOS enabled workers. Like I've pointed out above, for workers that have EOS enabled, the REST API does return a `500` response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1064077085 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -712,8 +733,16 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo } private void sendPrivileged(String key, byte[] value) { +sendPrivileged(key, value, null); +} + +private void sendPrivileged(String key, byte[] value, Callback callback) { if (!usesFencableWriter) { -configLog.send(key, value); Review Comment: Oh okay, I see what you're saying now. Sorry, I think I misunderstood you earlier. I agree with you, none of the `KafkaConfigBackingStore` use cases seem to necessarily require the performance benefit of using async send with callbacks. Although to make it synchronous, I think it might be better to avoid using the producer callback altogether and instead call `get()` on the returned future (which throws any exceptions that were encountered while sending the record), WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1064066900 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { +private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); +private final TopicPartition topicPartition; +public final long producerId; +private final ProducerStateEntry currentEntry; +private final AppendOrigin origin; + +private final List transactions = new ArrayList<>(); +private final ProducerStateEntry updatedEntry; + +/** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ +public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { +this.topicPartition = topicPartition; +this.producerId = producerId; +this.currentEntry = currentEntry; +this.origin = origin; + +updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), +currentEntry.coordinatorEpoch, +currentEntry.lastTimestamp, +currentEntry.currentTxnFirstOffset); +} + +private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { +checkProducerEpoch(producerEpoch, offset); +if (origin == AppendOrigin.CLIENT) { +checkSequence(producerEpoch, firstSeq, offset); +} +} + +private void checkProducerEpoch(short producerEpoch, long offset) { +if (producerEpoch < updatedEntry.producerEpoch()) { +String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + +"which is smaller than the la
[GitHub] [kafka] satishd commented on pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on PR #13043: URL: https://github.com/apache/kafka/pull/13043#issuecomment-1374677986 Thanks @ijuma for the review. Addressed them with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1064066900 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { +private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); +private final TopicPartition topicPartition; +public final long producerId; +private final ProducerStateEntry currentEntry; +private final AppendOrigin origin; + +private final List transactions = new ArrayList<>(); +private final ProducerStateEntry updatedEntry; + +/** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ +public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { +this.topicPartition = topicPartition; +this.producerId = producerId; +this.currentEntry = currentEntry; +this.origin = origin; + +updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), +currentEntry.coordinatorEpoch, +currentEntry.lastTimestamp, +currentEntry.currentTxnFirstOffset); +} + +private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { +checkProducerEpoch(producerEpoch, offset); +if (origin == AppendOrigin.CLIENT) { +checkSequence(producerEpoch, firstSeq, offset); +} +} + +private void checkProducerEpoch(short producerEpoch, long offset) { +if (producerEpoch < updatedEntry.producerEpoch()) { +String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + +"which is smaller than the la
[jira] [Created] (KAFKA-14606) Use virtual threads to publish Kafka records
Bart De Neuter created KAFKA-14606: -- Summary: Use virtual threads to publish Kafka records Key: KAFKA-14606 URL: https://issues.apache.org/jira/browse/KAFKA-14606 Project: Kafka Issue Type: Improvement Components: producer Reporter: Bart De Neuter Since JDK 19, virtual threads have been added to the JDK as a preview: [https://openjdk.org/jeps/425] Virtual threads allows you to use the hardware optimal as it is a lightweight thread that runs on a carrier thread (OS thread). When IO happens, the carrier thread is not blocked and can continue doing other work. Currently it doesn't seem to be possible to make `org.apache.kafka.clients.producer.internals.Sender` run on a virtual thread. An instance of `org.apache.kafka.common.utils.KafkaThread` is being started. Is it possible to give the possibility to use virtual threads for publishing records? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
ijuma commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1064020403 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { +private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); +private final TopicPartition topicPartition; +public final long producerId; +private final ProducerStateEntry currentEntry; +private final AppendOrigin origin; + +private final List transactions = new ArrayList<>(); +private final ProducerStateEntry updatedEntry; + +/** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ +public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { +this.topicPartition = topicPartition; +this.producerId = producerId; +this.currentEntry = currentEntry; +this.origin = origin; + +updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), +currentEntry.coordinatorEpoch, +currentEntry.lastTimestamp, +currentEntry.currentTxnFirstOffset); +} + +private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { +checkProducerEpoch(producerEpoch, offset); +if (origin == AppendOrigin.CLIENT) { +checkSequence(producerEpoch, firstSeq, offset); +} +} + +private void checkProducerEpoch(short producerEpoch, long offset) { +if (producerEpoch < updatedEntry.producerEpoch()) { +String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + +"which is smaller than the last
[GitHub] [kafka] ijuma commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
ijuma commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1064020890 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { +private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); +private final TopicPartition topicPartition; +public final long producerId; +private final ProducerStateEntry currentEntry; +private final AppendOrigin origin; + +private final List transactions = new ArrayList<>(); +private final ProducerStateEntry updatedEntry; + +/** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ +public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { +this.topicPartition = topicPartition; +this.producerId = producerId; +this.currentEntry = currentEntry; +this.origin = origin; + +updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), +currentEntry.coordinatorEpoch, +currentEntry.lastTimestamp, +currentEntry.currentTxnFirstOffset); +} + +private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { +checkProducerEpoch(producerEpoch, offset); +if (origin == AppendOrigin.CLIENT) { +checkSequence(producerEpoch, firstSeq, offset); +} +} + +private void checkProducerEpoch(short producerEpoch, long offset) { +if (producerEpoch < updatedEntry.producerEpoch()) { +String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + +"which is smaller than the last
[GitHub] [kafka] ijuma commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
ijuma commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1064020890 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { +private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); +private final TopicPartition topicPartition; +public final long producerId; +private final ProducerStateEntry currentEntry; +private final AppendOrigin origin; + +private final List transactions = new ArrayList<>(); +private final ProducerStateEntry updatedEntry; + +/** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ +public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { +this.topicPartition = topicPartition; +this.producerId = producerId; +this.currentEntry = currentEntry; +this.origin = origin; + +updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), +currentEntry.coordinatorEpoch, +currentEntry.lastTimestamp, +currentEntry.currentTxnFirstOffset); +} + +private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { +checkProducerEpoch(producerEpoch, offset); +if (origin == AppendOrigin.CLIENT) { +checkSequence(producerEpoch, firstSeq, offset); +} +} + +private void checkProducerEpoch(short producerEpoch, long offset) { +if (producerEpoch < updatedEntry.producerEpoch()) { +String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + +"which is smaller than the last
[GitHub] [kafka] ijuma commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
ijuma commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1064020080 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { +private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); +private final TopicPartition topicPartition; +private final long producerId; +private final ProducerStateEntry currentEntry; +private final AppendOrigin origin; + +private final List transactions = new ArrayList<>(); +private final ProducerStateEntry updatedEntry; + +/** + * Creates a new instance with the provided parameters. + * + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ +public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { +this.topicPartition = topicPartition; +this.producerId = producerId; +this.currentEntry = currentEntry; +this.origin = origin; + +updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), currentEntry.coordinatorEpoch, currentEntry.lastTimestamp, currentEntry.currentTxnFirstOffset, Optional.empty() +); Review Comment: Nit: it looks a bit odd to have this in its own line like this. Can we fix the formatting? ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java: ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache
[GitHub] [kafka] curie71 opened a new pull request, #13090: KAFKA-14605 Change the log level to info when logIfAllowed is set, warn when logIfDenied is set.
curie71 opened a new pull request, #13090: URL: https://github.com/apache/kafka/pull/13090 KAFKA-14605 StandardAuthorizer log at INFO level when logIfDenied is set(otherwise, we log at TRACE), but at debug level when logIfAllowed is set. Since audit log is security log, it should be logged at default verbosity level, not debug or trace when logIfAllowed is set. So I think, log at INFO when allow, and log at WARN when deny is better. ```java private void logAuditMessage( .. ) { switch (rule.result()) { case ALLOWED: if (action.logIfAllowed() && auditLog.isDebugEnabled()) { auditLog.debug(..); // info maybe better } else if (auditLog.isTraceEnabled()) { auditLog.trace(buildAuditMessage(principal, requestContext, action, rule)); } return; case DENIED: if (action.logIfDenied()) { auditLog.info(..); // warn maybe better } else if (auditLog.isTraceEnabled()) { auditLog.trace(buildAuditMessage(principal, requestContext, action, rule)); } } } ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14605) Change the log level to info when logIfAllowed is set, warn when logIfDenied is set.
[ https://issues.apache.org/jira/browse/KAFKA-14605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Beibei Zhao updated KAFKA-14605: Summary: Change the log level to info when logIfAllowed is set, warn when logIfDenied is set. (was: Change the log level to warn when logIfAllowed is set.) > Change the log level to info when logIfAllowed is set, warn when logIfDenied > is set. > > > Key: KAFKA-14605 > URL: https://issues.apache.org/jira/browse/KAFKA-14605 > Project: Kafka > Issue Type: Improvement >Reporter: Beibei Zhao >Priority: Major > > StandardAuthorizer log at INFO level when logIfDenied is set(otherwise, we > log at TRACE), but at debug level when logIfAllowed is set. > Since audit log is security log, it should be logged at default verbosity > level, not debug or trace when logIfAllowed is set. > So I think, log at INFO when allow, and log at WARN when deny is better. > {code:java} > private void logAuditMessage( > .. ) { > switch (rule.result()) { > case ALLOWED: > if (action.logIfAllowed() && auditLog.isDebugEnabled()) { > auditLog.debug(..); // info > } else if (auditLog.isTraceEnabled()) { > auditLog.trace(buildAuditMessage(principal, > requestContext, action, rule)); > } > return; > case DENIED: > if (action.logIfDenied()) { > auditLog.info(..); // warn > } else if (auditLog.isTraceEnabled()) { > auditLog.trace(buildAuditMessage(principal, > requestContext, action, rule)); > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] omkreddy commented on pull request #13086: KAFKA-14535: Fix flaky EndToEndAuthorization tests which were sensitive to ACL change reordering
omkreddy commented on PR #13086: URL: https://github.com/apache/kafka/pull/13086#issuecomment-1374517980 Test failures are unrelated, merging the PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy merged pull request #13086: KAFKA-14535: Fix flaky EndToEndAuthorization tests which were sensitive to ACL change reordering
omkreddy merged PR #13086: URL: https://github.com/apache/kafka/pull/13086 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14605) Change the log level to warn when logIfAllowed is set.
Beibei Zhao created KAFKA-14605: --- Summary: Change the log level to warn when logIfAllowed is set. Key: KAFKA-14605 URL: https://issues.apache.org/jira/browse/KAFKA-14605 Project: Kafka Issue Type: Improvement Reporter: Beibei Zhao StandardAuthorizer log at INFO level when logIfDenied is set(otherwise, we log at TRACE), but at debug level when logIfAllowed is set. Since audit log is security log, it should be logged at default verbosity level, not debug or trace when logIfAllowed is set. So I think, log at INFO when allow, and log at WARN when deny is better. {code:java} private void logAuditMessage( .. ) { switch (rule.result()) { case ALLOWED: if (action.logIfAllowed() && auditLog.isDebugEnabled()) { auditLog.debug(..); // info } else if (auditLog.isTraceEnabled()) { auditLog.trace(buildAuditMessage(principal, requestContext, action, rule)); } return; case DENIED: if (action.logIfDenied()) { auditLog.info(..); // warn } else if (auditLog.isTraceEnabled()) { auditLog.trace(buildAuditMessage(principal, requestContext, action, rule)); } } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on PR #13043: URL: https://github.com/apache/kafka/pull/13043#issuecomment-1374487392 Thanks @ijuma for the review comments. Replied inline and/or addressed with the latest commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1064003231 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { +private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); +private final TopicPartition topicPartition; +public final long producerId; +private final ProducerStateEntry currentEntry; +private final AppendOrigin origin; + +private final List transactions = new ArrayList<>(); +private final ProducerStateEntry updatedEntry; + +/** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ +public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { +this.topicPartition = topicPartition; +this.producerId = producerId; +this.currentEntry = currentEntry; +this.origin = origin; + +updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), +currentEntry.coordinatorEpoch, +currentEntry.lastTimestamp, +currentEntry.currentTxnFirstOffset); +} + +private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { +checkProducerEpoch(producerEpoch, offset); +if (origin == AppendOrigin.CLIENT) { +checkSequence(producerEpoch, firstSeq, offset); +} +} + +private void checkProducerEpoch(short producerEpoch, long offset) { +if (producerEpoch < updatedEntry.producerEpoch()) { +String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + +"which is smaller than the la
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1064003231 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { +private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); +private final TopicPartition topicPartition; +public final long producerId; +private final ProducerStateEntry currentEntry; +private final AppendOrigin origin; + +private final List transactions = new ArrayList<>(); +private final ProducerStateEntry updatedEntry; + +/** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ +public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { +this.topicPartition = topicPartition; +this.producerId = producerId; +this.currentEntry = currentEntry; +this.origin = origin; + +updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), +currentEntry.coordinatorEpoch, +currentEntry.lastTimestamp, +currentEntry.currentTxnFirstOffset); +} + +private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { +checkProducerEpoch(producerEpoch, offset); +if (origin == AppendOrigin.CLIENT) { +checkSequence(producerEpoch, firstSeq, offset); +} +} + +private void checkProducerEpoch(short producerEpoch, long offset) { +if (producerEpoch < updatedEntry.producerEpoch()) { +String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + +"which is smaller than the la
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063961399 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { +private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); +private final TopicPartition topicPartition; +public final long producerId; +private final ProducerStateEntry currentEntry; +private final AppendOrigin origin; + +private final List transactions = new ArrayList<>(); +private final ProducerStateEntry updatedEntry; + +/** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ +public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { +this.topicPartition = topicPartition; +this.producerId = producerId; +this.currentEntry = currentEntry; +this.origin = origin; + +updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), +currentEntry.coordinatorEpoch, +currentEntry.lastTimestamp, +currentEntry.currentTxnFirstOffset); +} + +private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { +checkProducerEpoch(producerEpoch, offset); +if (origin == AppendOrigin.CLIENT) { +checkSequence(producerEpoch, firstSeq, offset); +} +} + +private void checkProducerEpoch(short producerEpoch, long offset) { +if (producerEpoch < updatedEntry.producerEpoch()) { +String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + +"which is smaller than the la
[jira] [Updated] (KAFKA-14604) SASL session expiration time will be overflowed when calculation
[ https://issues.apache.org/jira/browse/KAFKA-14604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-14604: -- Affects Version/s: (was: 3.3.1) > SASL session expiration time will be overflowed when calculation > > > Key: KAFKA-14604 > URL: https://issues.apache.org/jira/browse/KAFKA-14604 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > When sasl server of client set a large expiration time, the timeout value > might be overflowed, and cause the session timeout immediately. > > [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s > the sasl server timeout's calculation > [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s > the sasl client timeout's calculation > > something like this: > {code:java} > sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * > sessionLifetimeMs; {code} > So, if the configured or returned sessionLifetimeMs is a large number, after > the calculation, the `sessionExpirationTimeNanos` will be a negative value, > and cause the session timeout each check. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14604) SASL session expiration time will be overflowed when calculation
[ https://issues.apache.org/jira/browse/KAFKA-14604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-14604: -- Description: When sasl server of client set a large expiration time, the timeout value might be overflowed, and cause the session timeout immediately. [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s the sasl server timeout's calculation [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s the sasl client timeout's calculation something like this: {code:java} sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * sessionLifetimeMs; {code} So, if the configured or returned sessionLifetimeMs is a large number, after the calculation, the `sessionExpirationTimeNanos` will be a negative value, and cause the session timeout each check. was: When sasl server of client set a large expiration time, the timeout value might be overflowed, and cause the session timeout immediately. [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s the sasl server timeout's calculation [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s the sasl client timeout's calculation something like this: {code:java} sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * sessionLifetimeMs; {code} So, if the configured or returned sessionLifetimeMs is a large number, after the calculation, it'll be a negative value, and cause the session timeout each check. > SASL session expiration time will be overflowed when calculation > > > Key: KAFKA-14604 > URL: https://issues.apache.org/jira/browse/KAFKA-14604 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.1 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > When sasl server of client set a large expiration time, the timeout value > might be overflowed, and cause the session timeout immediately. > > [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s > the sasl server timeout's calculation > [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s > the sasl client timeout's calculation > > something like this: > {code:java} > sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * > sessionLifetimeMs; {code} > So, if the configured or returned sessionLifetimeMs is a large number, after > the calculation, the `sessionExpirationTimeNanos` will be a negative value, > and cause the session timeout each check. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14604) SASL session expiration time will be overflowed when calculation
Luke Chen created KAFKA-14604: - Summary: SASL session expiration time will be overflowed when calculation Key: KAFKA-14604 URL: https://issues.apache.org/jira/browse/KAFKA-14604 Project: Kafka Issue Type: Bug Affects Versions: 3.3.1 Reporter: Luke Chen Assignee: Luke Chen When sasl server of client set a large expiration time, the timeout value might be overflowed, and cause the session timeout immediately. [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s the sasl server timeout's calculation [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s the sasl client timeout's calculation something like this: {code:java} sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * sessionLifetimeMs; {code} So, if the configured or returned sessionLifetimeMs is a large number, after the calculation, it'll be a negative value, and cause the session timeout each check. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14603) Move KafkaMetricsGroup to server-common module.
[ https://issues.apache.org/jira/browse/KAFKA-14603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655673#comment-17655673 ] Satish Duggana commented on KAFKA-14603: [~ivanyu] Assigned to you as you are already working on this with https://github.com/apache/kafka/pull/13067/ Please feel free to reassign if needed. > Move KafkaMetricsGroup to server-common module. > --- > > Key: KAFKA-14603 > URL: https://issues.apache.org/jira/browse/KAFKA-14603 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Ivan Yurchenko >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14603) Move KafkaMetricsGroup to server-common module.
Satish Duggana created KAFKA-14603: -- Summary: Move KafkaMetricsGroup to server-common module. Key: KAFKA-14603 URL: https://issues.apache.org/jira/browse/KAFKA-14603 Project: Kafka Issue Type: Sub-task Components: core Reporter: Satish Duggana -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14603) Move KafkaMetricsGroup to server-common module.
[ https://issues.apache.org/jira/browse/KAFKA-14603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana reassigned KAFKA-14603: -- Assignee: Ivan Yurchenko > Move KafkaMetricsGroup to server-common module. > --- > > Key: KAFKA-14603 > URL: https://issues.apache.org/jira/browse/KAFKA-14603 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Ivan Yurchenko >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] fvaleri commented on pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module
fvaleri commented on PR #13085: URL: https://github.com/apache/kafka/pull/13085#issuecomment-1374423774 Let's wait for CI builds, but it should be good now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on PR #13043: URL: https://github.com/apache/kafka/pull/13043#issuecomment-1374420820 Thanks @ijuma for the review comments. Replied inline and/or addressed with the latest commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063978705 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -174,7 +174,8 @@ class LogCleaner(initialConfig: CleanerConfig, } override def validateReconfiguration(newConfig: KafkaConfig): Unit = { -val numThreads = LogCleaner.cleanerConfig(newConfig).numThreads +val newCleanerConfig = LogCleaner.cleanerConfig(newConfig) +val numThreads = newCleanerConfig.numThreads Review Comment: It is not intended to keep this change, which was made while debugging a test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063978512 ## core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala: ## @@ -255,15 +259,28 @@ class ProducerStateManagerTest { appendData(30L, 31L, secondAppend) assertEquals(2, secondAppend.startedTransactions.size) -assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.head) -assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.last) +assertTxnMetadataEquals(new TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.asScala.head) +assertTxnMetadataEquals(new TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.asScala.last) stateManager.update(secondAppend) stateManager.completeTxn(firstCompletedTxn.get) stateManager.completeTxn(secondCompletedTxn.get) stateManager.onHighWatermarkUpdated(32L) assertEquals(Some(new LogOffsetMetadata(30L)), stateManager.firstUnstableOffset) } + def assertTxnMetadataEquals(expected: java.util.List[TxnMetadata], actual: java.util.List[TxnMetadata]): Unit = { +val expectedIter = expected.iterator() +val actualIter = actual.iterator() +while(expectedIter.hasNext && actualIter.hasNext) { + assertTxnMetadataEquals(expectedIter.next(), actualIter.next()) +} + } + + def assertTxnMetadataEquals(expected: TxnMetadata, actual:TxnMetadata) : Unit = { +assertEquals(expected.producerId, actual.producerId) +assertEquals(expected.firstOffset, actual.firstOffset) Review Comment: I see that `lastOffset` is empty for all the cases. I was not sure whether that is the intended check. Added this check with the updated PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063978324 ## core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala: ## @@ -255,15 +259,28 @@ class ProducerStateManagerTest { appendData(30L, 31L, secondAppend) assertEquals(2, secondAppend.startedTransactions.size) -assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.head) -assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.last) +assertTxnMetadataEquals(new TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.asScala.head) +assertTxnMetadataEquals(new TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.asScala.last) stateManager.update(secondAppend) stateManager.completeTxn(firstCompletedTxn.get) stateManager.completeTxn(secondCompletedTxn.get) stateManager.onHighWatermarkUpdated(32L) assertEquals(Some(new LogOffsetMetadata(30L)), stateManager.firstUnstableOffset) } + def assertTxnMetadataEquals(expected: java.util.List[TxnMetadata], actual: java.util.List[TxnMetadata]): Unit = { Review Comment: aah, that was my bad. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org