[GitHub] [kafka] clolov closed pull request #13068: KAFKA-13999: Add ProducerIdCount metric

2023-01-07 Thread GitBox


clolov closed pull request #13068: KAFKA-13999: Add ProducerIdCount metric
URL: https://github.com/apache/kafka/pull/13068


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13068: KAFKA-13999: Add ProducerIdCount metric

2023-01-07 Thread GitBox


clolov commented on PR #13068:
URL: https://github.com/apache/kafka/pull/13068#issuecomment-1374723793

   Got it, thank you, I am closing this and moving my comments to the other 
pull request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-01-07 Thread GitBox


clolov commented on PR #13078:
URL: https://github.com/apache/kafka/pull/13078#issuecomment-1374723416

   Is there a reason we don't change the HashMap to a Concurrent*Map and just 
call `size()` on it? This way it becomes very clear that we expect some sort of 
a concurrent operation to happen on the map and we would like it to be ordered 
with respect to other operations and we do not have to rely on people knowing 
they need to interact with the `producers` map via additional functions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14607) Move Scheduler/KafkaScheduler to server-common

2023-01-07 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-14607:
---

 Summary: Move Scheduler/KafkaScheduler to server-common
 Key: KAFKA-14607
 URL: https://issues.apache.org/jira/browse/KAFKA-14607
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma
Assignee: Ismael Juma
 Fix For: 3.5.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-01-07 Thread GitBox


ijuma commented on PR #13046:
URL: https://github.com/apache/kafka/pull/13046#issuecomment-1374714539

   @satishd When you have some cycles, please resolve the conflicts on this PR. 
I think we can focus on merging this next and then #13040.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

2023-01-07 Thread GitBox


ijuma commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1064083858


##
clients/src/main/java/org/apache/kafka/common/utils/FetchRequestUtils.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+public class FetchRequestUtils {

Review Comment:
   I would just add these methods to the existing `FetchRequest` class.



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2312,26 +2313,26 @@ class ReplicaManagerTest {
   }
 
   private def fetchPartitions(
-replicaManager: ReplicaManager,
-replicaId: Int,
-fetchInfos: Seq[(TopicIdPartition, PartitionData)],
-responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
-requestVersion: Short = ApiKeys.FETCH.latestVersion,
-maxWaitMs: Long = 0,
-minBytes: Int = 1,
-maxBytes: Int = 1024 * 1024,
-quota: ReplicaQuota = UnboundedQuota,
-isolation: FetchIsolation = FetchLogEnd,
-clientMetadata: Option[ClientMetadata] = None
+   replicaManager: ReplicaManager,
+   replicaId: Int,
+   fetchInfos: Seq[(TopicIdPartition, 
PartitionData)],
+   responseCallback: Seq[(TopicIdPartition, 
FetchPartitionData)] => Unit,
+   requestVersion: Short = 
ApiKeys.FETCH.latestVersion,
+   maxWaitMs: Long = 0,
+   minBytes: Int = 1,
+   maxBytes: Int = 1024 * 1024,
+   quota: ReplicaQuota = UnboundedQuota,
+   isolation: FetchIsolation = 
FetchIsolation.LOG_END,
+   clientMetadata: Option[ClientMetadata] = None

Review Comment:
   Why this indent change?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -123,12 +123,12 @@ case class LogReadResult(info: FetchDataInfo,
 this.info.records,
 this.divergingEpoch,
 this.lastStableOffset,
-this.info.abortedTransactions,
+if (this.info.abortedTransactions.isPresent) 
Some(this.info.abortedTransactions.get().asScala.toList) else None,

Review Comment:
   The last `toList` results in a copy of the collection, we'd want to avoid 
that.



##
storage/src/main/java/org/apache/kafka/server/log/internals/FetchDataInfo.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Records;
+
+import java.util.List;
+import java.util.Optional;
+
+public class FetchDataInfo {
+private final LogOffsetMetadata fetchOffsetMetadata;
+private final Records records;
+private final boolean firstEntryIncomplete;
+private final Optional> 
abortedTransactions;
+
+public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+ Records records) {
+this(fetchOffsetMetadata, records, false, Optional.empty());
+}
+
+public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+ Records records,
+ boolean firstEntryIncomplete) {
+this(fetchOffsetMetadata, records,

[GitHub] [kafka] ijuma commented on pull request #13087: KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest

2023-01-07 Thread GitBox


ijuma commented on PR #13087:
URL: https://github.com/apache/kafka/pull/13087#issuecomment-1374711202

   @jolshan you'll go ahead and merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


ijuma commented on PR #13043:
URL: https://github.com/apache/kafka/pull/13043#issuecomment-1374705681

   @satishd I submitted a follow-up PR that removes the public fields from 
`ProducerStateEntry`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma opened a new pull request, #13091: MINOR: Remove public mutable fields from ProducerAppendInfo

2023-01-07 Thread GitBox


ijuma opened a new pull request, #13091:
URL: https://github.com/apache/kafka/pull/13091

   Replace them with accessors and mutators.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


ijuma merged PR #13043:
URL: https://github.com/apache/kafka/pull/13043


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


ijuma commented on PR #13043:
URL: https://github.com/apache/kafka/pull/13043#issuecomment-1374704100

   JDK 8 build passed, the other builds had unrelated flaky failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-07 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1064077119


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) {
 
 try {
 fencableProducer.beginTransaction();
-fencableProducer.send(new ProducerRecord<>(topic, key, value));
+fencableProducer.send(new ProducerRecord<>(topic, key, value), 
(metadata, exception) -> {

Review Comment:
   I'm not sure I follow what benefit we'd be getting here by handling both the 
producer callback error as well as the one thrown by `commitTransaction`? The 
control flow would be more straightforward by removing the producer callback 
and just relying on `commitTransaction` to throw exceptions, if any. The 
producer's Javadoc itself also suggests that callbacks need not be defined when 
using the transactional producer since `commitTransaction` will throw the error 
from the last failed send in a transaction.
   
   > making them more vague to compensate
   
   We wouldn't be making it more vague. The message would state that the write 
to the config topic failed which is the cause for failure. Since the exception 
mapper used by Connect's REST server only writes the [top level exception's 
message](https://github.com/apache/kafka/blob/d798ec779c25dba31fa5ee9384d159ed54c6e07b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L72)
 to the response (i.e. nested exceptions aren't surfaced via the REST API 
response), I think it makes sense to keep the top level exception's message 
generic and allow users to debug further via the worker logs (where the entire 
exception chain's stack trace will be visible). Note that I'm suggesting a 
similar change for the non-EOS enabled case as well - i.e. don't use the 
producer error directly 
[here](https://github.com/apache/kafka/pull/12984/files#diff-c346b7b90fe30ac08b4211375fe36208139a90e40838dd3a7996021a8c4c5b13R1064),
 instead wrapping it in a `ConnectExc
 eption` which says that the write to the config topic failed. The reasoning 
here is that since a Connect user may not even know that Connect uses a 
producer under the hood to write certain requests to the config topic for 
asynchronous processing, it would make more sense to have an informative 
Connect specific exception message rather than directly throwing the producer 
exception which may or may not contain enough details to be relevant to a 
Connect user.
   
   >  If we're hiding the result from the REST calls, are we not also hiding 
the error from the herder tick thread?
   
   Hm no, the hiding issue was only for non-EOS enabled workers. Like I've 
pointed out above, for workers that have EOS enabled, the REST API does return 
a `500` response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-07 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1064077085


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -712,8 +733,16 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 }
 
 private void sendPrivileged(String key, byte[] value) {
+sendPrivileged(key, value, null);
+}
+
+private void sendPrivileged(String key, byte[] value, Callback 
callback) {
 if (!usesFencableWriter) {
-configLog.send(key, value);

Review Comment:
   Oh okay, I see what you're saying now. Sorry, I think I misunderstood you 
earlier. I agree with you, none of the `KafkaConfigBackingStore` use cases seem 
to necessarily require the performance benefit of using async send with 
callbacks. Although to make it synchronous, I think it might be better to avoid 
using the producer callback altogether and instead call `get()` on the returned 
future (which throws any exceptions that were encountered while sending the 
record), WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1064066900


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to validate the records appended by a given producer 
before they are written to the log.
+ * It is initialized with the producer's state after the last successful 
append, and transitively validates the
+ * sequence numbers and epochs of each new record. Additionally, this class 
accumulates transaction metadata
+ * as the incoming records are validated.
+ */
+public class ProducerAppendInfo {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerAppendInfo.class);
+private final TopicPartition topicPartition;
+public final long producerId;
+private final ProducerStateEntry currentEntry;
+private final AppendOrigin origin;
+
+private final List transactions = new ArrayList<>();
+private final ProducerStateEntry updatedEntry;
+
+/**
+ * @param topicPartition topic partition
+ * @param producerId The id of the producer appending to the log
+ * @param currentEntry   The current entry associated with the producer id 
which contains metadata for a fixed number of
+ *   the most recent appends made by the producer. 
Validation of the first incoming append will
+ *   be made against the latest append in the current 
entry. New appends will replace older appends
+ *   in the current entry so that the space overhead 
is constant.
+ * @param origin Indicates the origin of the append which implies 
the extent of validation. For example, offset
+ *   commits, which originate from the group 
coordinator, do not have sequence numbers and therefore
+ *   only producer epoch validation is done. Appends 
which come through replication are not validated
+ *   (we assume the validation has already been done) 
and appends from clients require full validation.
+ */
+public ProducerAppendInfo(TopicPartition topicPartition,
+  long producerId,
+  ProducerStateEntry currentEntry,
+  AppendOrigin origin) {
+this.topicPartition = topicPartition;
+this.producerId = producerId;
+this.currentEntry = currentEntry;
+this.origin = origin;
+
+updatedEntry = new ProducerStateEntry(producerId, 
currentEntry.producerEpoch(),
+currentEntry.coordinatorEpoch,
+currentEntry.lastTimestamp,
+currentEntry.currentTxnFirstOffset);
+}
+
+private void maybeValidateDataBatch(short producerEpoch, int firstSeq, 
long offset) {
+checkProducerEpoch(producerEpoch, offset);
+if (origin == AppendOrigin.CLIENT) {
+checkSequence(producerEpoch, firstSeq, offset);
+}
+}
+
+private void checkProducerEpoch(short producerEpoch, long offset) {
+if (producerEpoch < updatedEntry.producerEpoch()) {
+String message = String.format("Epoch of producer %d at offset %d 
in %s is %d, " +
+"which is smaller than the la

[GitHub] [kafka] satishd commented on pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


satishd commented on PR #13043:
URL: https://github.com/apache/kafka/pull/13043#issuecomment-1374677986

   Thanks @ijuma for the review. Addressed them with the latest commit. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1064066900


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to validate the records appended by a given producer 
before they are written to the log.
+ * It is initialized with the producer's state after the last successful 
append, and transitively validates the
+ * sequence numbers and epochs of each new record. Additionally, this class 
accumulates transaction metadata
+ * as the incoming records are validated.
+ */
+public class ProducerAppendInfo {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerAppendInfo.class);
+private final TopicPartition topicPartition;
+public final long producerId;
+private final ProducerStateEntry currentEntry;
+private final AppendOrigin origin;
+
+private final List transactions = new ArrayList<>();
+private final ProducerStateEntry updatedEntry;
+
+/**
+ * @param topicPartition topic partition
+ * @param producerId The id of the producer appending to the log
+ * @param currentEntry   The current entry associated with the producer id 
which contains metadata for a fixed number of
+ *   the most recent appends made by the producer. 
Validation of the first incoming append will
+ *   be made against the latest append in the current 
entry. New appends will replace older appends
+ *   in the current entry so that the space overhead 
is constant.
+ * @param origin Indicates the origin of the append which implies 
the extent of validation. For example, offset
+ *   commits, which originate from the group 
coordinator, do not have sequence numbers and therefore
+ *   only producer epoch validation is done. Appends 
which come through replication are not validated
+ *   (we assume the validation has already been done) 
and appends from clients require full validation.
+ */
+public ProducerAppendInfo(TopicPartition topicPartition,
+  long producerId,
+  ProducerStateEntry currentEntry,
+  AppendOrigin origin) {
+this.topicPartition = topicPartition;
+this.producerId = producerId;
+this.currentEntry = currentEntry;
+this.origin = origin;
+
+updatedEntry = new ProducerStateEntry(producerId, 
currentEntry.producerEpoch(),
+currentEntry.coordinatorEpoch,
+currentEntry.lastTimestamp,
+currentEntry.currentTxnFirstOffset);
+}
+
+private void maybeValidateDataBatch(short producerEpoch, int firstSeq, 
long offset) {
+checkProducerEpoch(producerEpoch, offset);
+if (origin == AppendOrigin.CLIENT) {
+checkSequence(producerEpoch, firstSeq, offset);
+}
+}
+
+private void checkProducerEpoch(short producerEpoch, long offset) {
+if (producerEpoch < updatedEntry.producerEpoch()) {
+String message = String.format("Epoch of producer %d at offset %d 
in %s is %d, " +
+"which is smaller than the la

[jira] [Created] (KAFKA-14606) Use virtual threads to publish Kafka records

2023-01-07 Thread Bart De Neuter (Jira)
Bart De Neuter created KAFKA-14606:
--

 Summary: Use virtual threads to publish Kafka records
 Key: KAFKA-14606
 URL: https://issues.apache.org/jira/browse/KAFKA-14606
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Reporter: Bart De Neuter


Since JDK 19, virtual threads have been added to the JDK as a preview:

[https://openjdk.org/jeps/425]

Virtual threads allows you to use the hardware optimal as it is a lightweight 
thread that runs on a carrier thread (OS thread). When IO happens, the carrier 
thread is not blocked and can continue doing other work. Currently it doesn't 
seem to be possible to make 
`org.apache.kafka.clients.producer.internals.Sender` run on a virtual thread.

An instance of `org.apache.kafka.common.utils.KafkaThread` is being started. 

Is it possible to give the possibility to use virtual threads for publishing 
records?

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


ijuma commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1064020403


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to validate the records appended by a given producer 
before they are written to the log.
+ * It is initialized with the producer's state after the last successful 
append, and transitively validates the
+ * sequence numbers and epochs of each new record. Additionally, this class 
accumulates transaction metadata
+ * as the incoming records are validated.
+ */
+public class ProducerAppendInfo {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerAppendInfo.class);
+private final TopicPartition topicPartition;
+public final long producerId;
+private final ProducerStateEntry currentEntry;
+private final AppendOrigin origin;
+
+private final List transactions = new ArrayList<>();
+private final ProducerStateEntry updatedEntry;
+
+/**
+ * @param topicPartition topic partition
+ * @param producerId The id of the producer appending to the log
+ * @param currentEntry   The current entry associated with the producer id 
which contains metadata for a fixed number of
+ *   the most recent appends made by the producer. 
Validation of the first incoming append will
+ *   be made against the latest append in the current 
entry. New appends will replace older appends
+ *   in the current entry so that the space overhead 
is constant.
+ * @param origin Indicates the origin of the append which implies 
the extent of validation. For example, offset
+ *   commits, which originate from the group 
coordinator, do not have sequence numbers and therefore
+ *   only producer epoch validation is done. Appends 
which come through replication are not validated
+ *   (we assume the validation has already been done) 
and appends from clients require full validation.
+ */
+public ProducerAppendInfo(TopicPartition topicPartition,
+  long producerId,
+  ProducerStateEntry currentEntry,
+  AppendOrigin origin) {
+this.topicPartition = topicPartition;
+this.producerId = producerId;
+this.currentEntry = currentEntry;
+this.origin = origin;
+
+updatedEntry = new ProducerStateEntry(producerId, 
currentEntry.producerEpoch(),
+currentEntry.coordinatorEpoch,
+currentEntry.lastTimestamp,
+currentEntry.currentTxnFirstOffset);
+}
+
+private void maybeValidateDataBatch(short producerEpoch, int firstSeq, 
long offset) {
+checkProducerEpoch(producerEpoch, offset);
+if (origin == AppendOrigin.CLIENT) {
+checkSequence(producerEpoch, firstSeq, offset);
+}
+}
+
+private void checkProducerEpoch(short producerEpoch, long offset) {
+if (producerEpoch < updatedEntry.producerEpoch()) {
+String message = String.format("Epoch of producer %d at offset %d 
in %s is %d, " +
+"which is smaller than the last

[GitHub] [kafka] ijuma commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


ijuma commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1064020890


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to validate the records appended by a given producer 
before they are written to the log.
+ * It is initialized with the producer's state after the last successful 
append, and transitively validates the
+ * sequence numbers and epochs of each new record. Additionally, this class 
accumulates transaction metadata
+ * as the incoming records are validated.
+ */
+public class ProducerAppendInfo {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerAppendInfo.class);
+private final TopicPartition topicPartition;
+public final long producerId;
+private final ProducerStateEntry currentEntry;
+private final AppendOrigin origin;
+
+private final List transactions = new ArrayList<>();
+private final ProducerStateEntry updatedEntry;
+
+/**
+ * @param topicPartition topic partition
+ * @param producerId The id of the producer appending to the log
+ * @param currentEntry   The current entry associated with the producer id 
which contains metadata for a fixed number of
+ *   the most recent appends made by the producer. 
Validation of the first incoming append will
+ *   be made against the latest append in the current 
entry. New appends will replace older appends
+ *   in the current entry so that the space overhead 
is constant.
+ * @param origin Indicates the origin of the append which implies 
the extent of validation. For example, offset
+ *   commits, which originate from the group 
coordinator, do not have sequence numbers and therefore
+ *   only producer epoch validation is done. Appends 
which come through replication are not validated
+ *   (we assume the validation has already been done) 
and appends from clients require full validation.
+ */
+public ProducerAppendInfo(TopicPartition topicPartition,
+  long producerId,
+  ProducerStateEntry currentEntry,
+  AppendOrigin origin) {
+this.topicPartition = topicPartition;
+this.producerId = producerId;
+this.currentEntry = currentEntry;
+this.origin = origin;
+
+updatedEntry = new ProducerStateEntry(producerId, 
currentEntry.producerEpoch(),
+currentEntry.coordinatorEpoch,
+currentEntry.lastTimestamp,
+currentEntry.currentTxnFirstOffset);
+}
+
+private void maybeValidateDataBatch(short producerEpoch, int firstSeq, 
long offset) {
+checkProducerEpoch(producerEpoch, offset);
+if (origin == AppendOrigin.CLIENT) {
+checkSequence(producerEpoch, firstSeq, offset);
+}
+}
+
+private void checkProducerEpoch(short producerEpoch, long offset) {
+if (producerEpoch < updatedEntry.producerEpoch()) {
+String message = String.format("Epoch of producer %d at offset %d 
in %s is %d, " +
+"which is smaller than the last

[GitHub] [kafka] ijuma commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


ijuma commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1064020890


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to validate the records appended by a given producer 
before they are written to the log.
+ * It is initialized with the producer's state after the last successful 
append, and transitively validates the
+ * sequence numbers and epochs of each new record. Additionally, this class 
accumulates transaction metadata
+ * as the incoming records are validated.
+ */
+public class ProducerAppendInfo {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerAppendInfo.class);
+private final TopicPartition topicPartition;
+public final long producerId;
+private final ProducerStateEntry currentEntry;
+private final AppendOrigin origin;
+
+private final List transactions = new ArrayList<>();
+private final ProducerStateEntry updatedEntry;
+
+/**
+ * @param topicPartition topic partition
+ * @param producerId The id of the producer appending to the log
+ * @param currentEntry   The current entry associated with the producer id 
which contains metadata for a fixed number of
+ *   the most recent appends made by the producer. 
Validation of the first incoming append will
+ *   be made against the latest append in the current 
entry. New appends will replace older appends
+ *   in the current entry so that the space overhead 
is constant.
+ * @param origin Indicates the origin of the append which implies 
the extent of validation. For example, offset
+ *   commits, which originate from the group 
coordinator, do not have sequence numbers and therefore
+ *   only producer epoch validation is done. Appends 
which come through replication are not validated
+ *   (we assume the validation has already been done) 
and appends from clients require full validation.
+ */
+public ProducerAppendInfo(TopicPartition topicPartition,
+  long producerId,
+  ProducerStateEntry currentEntry,
+  AppendOrigin origin) {
+this.topicPartition = topicPartition;
+this.producerId = producerId;
+this.currentEntry = currentEntry;
+this.origin = origin;
+
+updatedEntry = new ProducerStateEntry(producerId, 
currentEntry.producerEpoch(),
+currentEntry.coordinatorEpoch,
+currentEntry.lastTimestamp,
+currentEntry.currentTxnFirstOffset);
+}
+
+private void maybeValidateDataBatch(short producerEpoch, int firstSeq, 
long offset) {
+checkProducerEpoch(producerEpoch, offset);
+if (origin == AppendOrigin.CLIENT) {
+checkSequence(producerEpoch, firstSeq, offset);
+}
+}
+
+private void checkProducerEpoch(short producerEpoch, long offset) {
+if (producerEpoch < updatedEntry.producerEpoch()) {
+String message = String.format("Epoch of producer %d at offset %d 
in %s is %d, " +
+"which is smaller than the last

[GitHub] [kafka] ijuma commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


ijuma commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1064020080


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java:
##
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to validate the records appended by a given producer 
before they are written to the log.
+ * It is initialized with the producer's state after the last successful 
append, and transitively validates the
+ * sequence numbers and epochs of each new record. Additionally, this class 
accumulates transaction metadata
+ * as the incoming records are validated.
+ */
+public class ProducerAppendInfo {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerAppendInfo.class);
+private final TopicPartition topicPartition;
+private final long producerId;
+private final ProducerStateEntry currentEntry;
+private final AppendOrigin origin;
+
+private final List transactions = new ArrayList<>();
+private final ProducerStateEntry updatedEntry;
+
+/**
+ * Creates a new instance with the provided parameters.
+ *
+ * @param topicPartition topic partition
+ * @param producerId The id of the producer appending to the log
+ * @param currentEntry   The current entry associated with the producer id 
which contains metadata for a fixed number of
+ *   the most recent appends made by the producer. 
Validation of the first incoming append will
+ *   be made against the latest append in the current 
entry. New appends will replace older appends
+ *   in the current entry so that the space overhead 
is constant.
+ * @param origin Indicates the origin of the append which implies 
the extent of validation. For example, offset
+ *   commits, which originate from the group 
coordinator, do not have sequence numbers and therefore
+ *   only producer epoch validation is done. Appends 
which come through replication are not validated
+ *   (we assume the validation has already been done) 
and appends from clients require full validation.
+ */
+public ProducerAppendInfo(TopicPartition topicPartition,
+  long producerId,
+  ProducerStateEntry currentEntry,
+  AppendOrigin origin) {
+this.topicPartition = topicPartition;
+this.producerId = producerId;
+this.currentEntry = currentEntry;
+this.origin = origin;
+
+updatedEntry = new ProducerStateEntry(producerId, 
currentEntry.producerEpoch(), currentEntry.coordinatorEpoch, 
currentEntry.lastTimestamp, currentEntry.currentTxnFirstOffset, Optional.empty()
+);

Review Comment:
   Nit: it looks a bit odd to have this in its own line like this. Can we fix 
the formatting?



##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java:
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache 

[GitHub] [kafka] curie71 opened a new pull request, #13090: KAFKA-14605 Change the log level to info when logIfAllowed is set, warn when logIfDenied is set.

2023-01-07 Thread GitBox


curie71 opened a new pull request, #13090:
URL: https://github.com/apache/kafka/pull/13090

   KAFKA-14605 StandardAuthorizer log at INFO level when logIfDenied is 
set(otherwise, we log at TRACE), but at debug level when logIfAllowed is set.
   Since audit log is security log, it should be logged at default verbosity 
level, not debug or trace when logIfAllowed is set.
   So I think, log at INFO when allow, and log at WARN when deny is better.
   
   ```java
   private void logAuditMessage(
   .. ) {
   switch (rule.result()) {
   case ALLOWED:
   if (action.logIfAllowed() && auditLog.isDebugEnabled()) {
   auditLog.debug(..); // info maybe better
   } else if (auditLog.isTraceEnabled()) {
   auditLog.trace(buildAuditMessage(principal, 
requestContext, action, rule));
   }
   return;
   
   case DENIED:
   if (action.logIfDenied()) {
   auditLog.info(..); // warn maybe better
   } else if (auditLog.isTraceEnabled()) {
   auditLog.trace(buildAuditMessage(principal, 
requestContext, action, rule));
   }
   }
   }
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14605) Change the log level to info when logIfAllowed is set, warn when logIfDenied is set.

2023-01-07 Thread Beibei Zhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Beibei Zhao updated KAFKA-14605:

Summary: Change the log level to info when logIfAllowed is set, warn when 
logIfDenied is set.  (was: Change the log level to warn when logIfAllowed is 
set.)

> Change the log level to info when logIfAllowed is set, warn when logIfDenied 
> is set.
> 
>
> Key: KAFKA-14605
> URL: https://issues.apache.org/jira/browse/KAFKA-14605
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Beibei Zhao
>Priority: Major
>
> StandardAuthorizer log at INFO level when logIfDenied is set(otherwise, we 
> log at TRACE), but at debug level when logIfAllowed is set.
> Since audit log is security log, it should be logged at default verbosity 
> level, not debug or trace when logIfAllowed is set.
> So I think, log at INFO when allow, and log at WARN when deny is better.
> {code:java}
> private void logAuditMessage(
> .. ) {
> switch (rule.result()) {
> case ALLOWED:
> if (action.logIfAllowed() && auditLog.isDebugEnabled()) {
> auditLog.debug(..); // info
> } else if (auditLog.isTraceEnabled()) {
> auditLog.trace(buildAuditMessage(principal, 
> requestContext, action, rule));
> }
> return;
> case DENIED:
> if (action.logIfDenied()) {
> auditLog.info(..); // warn
> } else if (auditLog.isTraceEnabled()) {
> auditLog.trace(buildAuditMessage(principal, 
> requestContext, action, rule));
> }
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] omkreddy commented on pull request #13086: KAFKA-14535: Fix flaky EndToEndAuthorization tests which were sensitive to ACL change reordering

2023-01-07 Thread GitBox


omkreddy commented on PR #13086:
URL: https://github.com/apache/kafka/pull/13086#issuecomment-1374517980

   Test failures are unrelated, merging the PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] omkreddy merged pull request #13086: KAFKA-14535: Fix flaky EndToEndAuthorization tests which were sensitive to ACL change reordering

2023-01-07 Thread GitBox


omkreddy merged PR #13086:
URL: https://github.com/apache/kafka/pull/13086


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14605) Change the log level to warn when logIfAllowed is set.

2023-01-07 Thread Beibei Zhao (Jira)
Beibei Zhao created KAFKA-14605:
---

 Summary: Change the log level to warn when logIfAllowed is set.
 Key: KAFKA-14605
 URL: https://issues.apache.org/jira/browse/KAFKA-14605
 Project: Kafka
  Issue Type: Improvement
Reporter: Beibei Zhao


StandardAuthorizer log at INFO level when logIfDenied is set(otherwise, we log 
at TRACE), but at debug level when logIfAllowed is set.
Since audit log is security log, it should be logged at default verbosity 
level, not debug or trace when logIfAllowed is set.
So I think, log at INFO when allow, and log at WARN when deny is better.

{code:java}
private void logAuditMessage(
.. ) {
switch (rule.result()) {
case ALLOWED:
if (action.logIfAllowed() && auditLog.isDebugEnabled()) {
auditLog.debug(..); // info
} else if (auditLog.isTraceEnabled()) {
auditLog.trace(buildAuditMessage(principal, requestContext, 
action, rule));
}
return;

case DENIED:
if (action.logIfDenied()) {
auditLog.info(..); // warn
} else if (auditLog.isTraceEnabled()) {
auditLog.trace(buildAuditMessage(principal, requestContext, 
action, rule));
}
}
}
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


satishd commented on PR #13043:
URL: https://github.com/apache/kafka/pull/13043#issuecomment-1374487392

   Thanks @ijuma for the review comments. Replied inline and/or addressed with 
the latest commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1064003231


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to validate the records appended by a given producer 
before they are written to the log.
+ * It is initialized with the producer's state after the last successful 
append, and transitively validates the
+ * sequence numbers and epochs of each new record. Additionally, this class 
accumulates transaction metadata
+ * as the incoming records are validated.
+ */
+public class ProducerAppendInfo {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerAppendInfo.class);
+private final TopicPartition topicPartition;
+public final long producerId;
+private final ProducerStateEntry currentEntry;
+private final AppendOrigin origin;
+
+private final List transactions = new ArrayList<>();
+private final ProducerStateEntry updatedEntry;
+
+/**
+ * @param topicPartition topic partition
+ * @param producerId The id of the producer appending to the log
+ * @param currentEntry   The current entry associated with the producer id 
which contains metadata for a fixed number of
+ *   the most recent appends made by the producer. 
Validation of the first incoming append will
+ *   be made against the latest append in the current 
entry. New appends will replace older appends
+ *   in the current entry so that the space overhead 
is constant.
+ * @param origin Indicates the origin of the append which implies 
the extent of validation. For example, offset
+ *   commits, which originate from the group 
coordinator, do not have sequence numbers and therefore
+ *   only producer epoch validation is done. Appends 
which come through replication are not validated
+ *   (we assume the validation has already been done) 
and appends from clients require full validation.
+ */
+public ProducerAppendInfo(TopicPartition topicPartition,
+  long producerId,
+  ProducerStateEntry currentEntry,
+  AppendOrigin origin) {
+this.topicPartition = topicPartition;
+this.producerId = producerId;
+this.currentEntry = currentEntry;
+this.origin = origin;
+
+updatedEntry = new ProducerStateEntry(producerId, 
currentEntry.producerEpoch(),
+currentEntry.coordinatorEpoch,
+currentEntry.lastTimestamp,
+currentEntry.currentTxnFirstOffset);
+}
+
+private void maybeValidateDataBatch(short producerEpoch, int firstSeq, 
long offset) {
+checkProducerEpoch(producerEpoch, offset);
+if (origin == AppendOrigin.CLIENT) {
+checkSequence(producerEpoch, firstSeq, offset);
+}
+}
+
+private void checkProducerEpoch(short producerEpoch, long offset) {
+if (producerEpoch < updatedEntry.producerEpoch()) {
+String message = String.format("Epoch of producer %d at offset %d 
in %s is %d, " +
+"which is smaller than the la

[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1064003231


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to validate the records appended by a given producer 
before they are written to the log.
+ * It is initialized with the producer's state after the last successful 
append, and transitively validates the
+ * sequence numbers and epochs of each new record. Additionally, this class 
accumulates transaction metadata
+ * as the incoming records are validated.
+ */
+public class ProducerAppendInfo {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerAppendInfo.class);
+private final TopicPartition topicPartition;
+public final long producerId;
+private final ProducerStateEntry currentEntry;
+private final AppendOrigin origin;
+
+private final List transactions = new ArrayList<>();
+private final ProducerStateEntry updatedEntry;
+
+/**
+ * @param topicPartition topic partition
+ * @param producerId The id of the producer appending to the log
+ * @param currentEntry   The current entry associated with the producer id 
which contains metadata for a fixed number of
+ *   the most recent appends made by the producer. 
Validation of the first incoming append will
+ *   be made against the latest append in the current 
entry. New appends will replace older appends
+ *   in the current entry so that the space overhead 
is constant.
+ * @param origin Indicates the origin of the append which implies 
the extent of validation. For example, offset
+ *   commits, which originate from the group 
coordinator, do not have sequence numbers and therefore
+ *   only producer epoch validation is done. Appends 
which come through replication are not validated
+ *   (we assume the validation has already been done) 
and appends from clients require full validation.
+ */
+public ProducerAppendInfo(TopicPartition topicPartition,
+  long producerId,
+  ProducerStateEntry currentEntry,
+  AppendOrigin origin) {
+this.topicPartition = topicPartition;
+this.producerId = producerId;
+this.currentEntry = currentEntry;
+this.origin = origin;
+
+updatedEntry = new ProducerStateEntry(producerId, 
currentEntry.producerEpoch(),
+currentEntry.coordinatorEpoch,
+currentEntry.lastTimestamp,
+currentEntry.currentTxnFirstOffset);
+}
+
+private void maybeValidateDataBatch(short producerEpoch, int firstSeq, 
long offset) {
+checkProducerEpoch(producerEpoch, offset);
+if (origin == AppendOrigin.CLIENT) {
+checkSequence(producerEpoch, firstSeq, offset);
+}
+}
+
+private void checkProducerEpoch(short producerEpoch, long offset) {
+if (producerEpoch < updatedEntry.producerEpoch()) {
+String message = String.format("Epoch of producer %d at offset %d 
in %s is %d, " +
+"which is smaller than the la

[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063961399


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to validate the records appended by a given producer 
before they are written to the log.
+ * It is initialized with the producer's state after the last successful 
append, and transitively validates the
+ * sequence numbers and epochs of each new record. Additionally, this class 
accumulates transaction metadata
+ * as the incoming records are validated.
+ */
+public class ProducerAppendInfo {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerAppendInfo.class);
+private final TopicPartition topicPartition;
+public final long producerId;
+private final ProducerStateEntry currentEntry;
+private final AppendOrigin origin;
+
+private final List transactions = new ArrayList<>();
+private final ProducerStateEntry updatedEntry;
+
+/**
+ * @param topicPartition topic partition
+ * @param producerId The id of the producer appending to the log
+ * @param currentEntry   The current entry associated with the producer id 
which contains metadata for a fixed number of
+ *   the most recent appends made by the producer. 
Validation of the first incoming append will
+ *   be made against the latest append in the current 
entry. New appends will replace older appends
+ *   in the current entry so that the space overhead 
is constant.
+ * @param origin Indicates the origin of the append which implies 
the extent of validation. For example, offset
+ *   commits, which originate from the group 
coordinator, do not have sequence numbers and therefore
+ *   only producer epoch validation is done. Appends 
which come through replication are not validated
+ *   (we assume the validation has already been done) 
and appends from clients require full validation.
+ */
+public ProducerAppendInfo(TopicPartition topicPartition,
+  long producerId,
+  ProducerStateEntry currentEntry,
+  AppendOrigin origin) {
+this.topicPartition = topicPartition;
+this.producerId = producerId;
+this.currentEntry = currentEntry;
+this.origin = origin;
+
+updatedEntry = new ProducerStateEntry(producerId, 
currentEntry.producerEpoch(),
+currentEntry.coordinatorEpoch,
+currentEntry.lastTimestamp,
+currentEntry.currentTxnFirstOffset);
+}
+
+private void maybeValidateDataBatch(short producerEpoch, int firstSeq, 
long offset) {
+checkProducerEpoch(producerEpoch, offset);
+if (origin == AppendOrigin.CLIENT) {
+checkSequence(producerEpoch, firstSeq, offset);
+}
+}
+
+private void checkProducerEpoch(short producerEpoch, long offset) {
+if (producerEpoch < updatedEntry.producerEpoch()) {
+String message = String.format("Epoch of producer %d at offset %d 
in %s is %d, " +
+"which is smaller than the la

[jira] [Updated] (KAFKA-14604) SASL session expiration time will be overflowed when calculation

2023-01-07 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14604:
--
Affects Version/s: (was: 3.3.1)

> SASL session expiration time will be overflowed when calculation
> 
>
> Key: KAFKA-14604
> URL: https://issues.apache.org/jira/browse/KAFKA-14604
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> When sasl server of client set a large expiration time, the timeout value 
> might be overflowed, and cause the session timeout immediately.
>  
> [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s
>  the sasl server timeout's calculation
> [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s
>  the sasl client timeout's calculation
>  
> something like this:
> {code:java}
> sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * 
> sessionLifetimeMs; {code}
> So, if the configured or returned sessionLifetimeMs is a large number, after 
> the calculation, the `sessionExpirationTimeNanos` will be a negative value, 
> and cause the session timeout each check.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14604) SASL session expiration time will be overflowed when calculation

2023-01-07 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14604:
--
Description: 
When sasl server of client set a large expiration time, the timeout value might 
be overflowed, and cause the session timeout immediately.

 

[Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s
 the sasl server timeout's calculation

[Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s
 the sasl client timeout's calculation

 

something like this:
{code:java}
sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * 
sessionLifetimeMs; {code}
So, if the configured or returned sessionLifetimeMs is a large number, after 
the calculation, the `sessionExpirationTimeNanos` will be a negative value, and 
cause the session timeout each check.

 

 

  was:
When sasl server of client set a large expiration time, the timeout value might 
be overflowed, and cause the session timeout immediately.

 

[Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s
 the sasl server timeout's calculation

[Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s
 the sasl client timeout's calculation

 

something like this:
{code:java}
sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * 
sessionLifetimeMs; {code}
So, if the configured or returned sessionLifetimeMs is a large number, after 
the calculation, it'll be a negative value, and cause the session timeout each 
check.

 


> SASL session expiration time will be overflowed when calculation
> 
>
> Key: KAFKA-14604
> URL: https://issues.apache.org/jira/browse/KAFKA-14604
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> When sasl server of client set a large expiration time, the timeout value 
> might be overflowed, and cause the session timeout immediately.
>  
> [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s
>  the sasl server timeout's calculation
> [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s
>  the sasl client timeout's calculation
>  
> something like this:
> {code:java}
> sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * 
> sessionLifetimeMs; {code}
> So, if the configured or returned sessionLifetimeMs is a large number, after 
> the calculation, the `sessionExpirationTimeNanos` will be a negative value, 
> and cause the session timeout each check.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14604) SASL session expiration time will be overflowed when calculation

2023-01-07 Thread Luke Chen (Jira)
Luke Chen created KAFKA-14604:
-

 Summary: SASL session expiration time will be overflowed when 
calculation
 Key: KAFKA-14604
 URL: https://issues.apache.org/jira/browse/KAFKA-14604
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.3.1
Reporter: Luke Chen
Assignee: Luke Chen


When sasl server of client set a large expiration time, the timeout value might 
be overflowed, and cause the session timeout immediately.

 

[Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s
 the sasl server timeout's calculation

[Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s
 the sasl client timeout's calculation

 

something like this:
{code:java}
sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * 
sessionLifetimeMs; {code}
So, if the configured or returned sessionLifetimeMs is a large number, after 
the calculation, it'll be a negative value, and cause the session timeout each 
check.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14603) Move KafkaMetricsGroup to server-common module.

2023-01-07 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655673#comment-17655673
 ] 

Satish Duggana commented on KAFKA-14603:


[~ivanyu] Assigned to you as you are already working on this with 
https://github.com/apache/kafka/pull/13067/

Please feel free to reassign if needed. 

> Move KafkaMetricsGroup to server-common module.
> ---
>
> Key: KAFKA-14603
> URL: https://issues.apache.org/jira/browse/KAFKA-14603
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Ivan Yurchenko
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14603) Move KafkaMetricsGroup to server-common module.

2023-01-07 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-14603:
--

 Summary: Move KafkaMetricsGroup to server-common module.
 Key: KAFKA-14603
 URL: https://issues.apache.org/jira/browse/KAFKA-14603
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Satish Duggana






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14603) Move KafkaMetricsGroup to server-common module.

2023-01-07 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana reassigned KAFKA-14603:
--

Assignee: Ivan Yurchenko

> Move KafkaMetricsGroup to server-common module.
> ---
>
> Key: KAFKA-14603
> URL: https://issues.apache.org/jira/browse/KAFKA-14603
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Ivan Yurchenko
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri commented on pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

2023-01-07 Thread GitBox


fvaleri commented on PR #13085:
URL: https://github.com/apache/kafka/pull/13085#issuecomment-1374423774

   Let's wait for CI builds, but it should be good now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


satishd commented on PR #13043:
URL: https://github.com/apache/kafka/pull/13043#issuecomment-1374420820

   Thanks @ijuma for the review comments. Replied inline and/or addressed with 
the latest commits. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063978705


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -174,7 +174,8 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
-val numThreads = LogCleaner.cleanerConfig(newConfig).numThreads
+val newCleanerConfig = LogCleaner.cleanerConfig(newConfig)
+val numThreads = newCleanerConfig.numThreads

Review Comment:
   It is not intended to keep this change, which was made while debugging a 
test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063978512


##
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##
@@ -255,15 +259,28 @@ class ProducerStateManagerTest {
 appendData(30L, 31L, secondAppend)
 
 assertEquals(2, secondAppend.startedTransactions.size)
-assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(24L)), 
secondAppend.startedTransactions.head)
-assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(30L)), 
secondAppend.startedTransactions.last)
+assertTxnMetadataEquals(new TxnMetadata(producerId, new 
LogOffsetMetadata(24L)), secondAppend.startedTransactions.asScala.head)
+assertTxnMetadataEquals(new TxnMetadata(producerId, new 
LogOffsetMetadata(30L)), secondAppend.startedTransactions.asScala.last)
 stateManager.update(secondAppend)
 stateManager.completeTxn(firstCompletedTxn.get)
 stateManager.completeTxn(secondCompletedTxn.get)
 stateManager.onHighWatermarkUpdated(32L)
 assertEquals(Some(new LogOffsetMetadata(30L)), 
stateManager.firstUnstableOffset)
   }
 
+  def assertTxnMetadataEquals(expected: java.util.List[TxnMetadata], actual: 
java.util.List[TxnMetadata]): Unit = {
+val expectedIter = expected.iterator()
+val actualIter = actual.iterator()
+while(expectedIter.hasNext && actualIter.hasNext) {
+  assertTxnMetadataEquals(expectedIter.next(), actualIter.next())
+}
+  }
+
+  def assertTxnMetadataEquals(expected: TxnMetadata, actual:TxnMetadata) : 
Unit = {
+assertEquals(expected.producerId, actual.producerId)
+assertEquals(expected.firstOffset, actual.firstOffset)

Review Comment:
   I see that `lastOffset` is empty for all the cases. I was not sure whether 
that is the intended check. Added this check with the updated PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-07 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063978324


##
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##
@@ -255,15 +259,28 @@ class ProducerStateManagerTest {
 appendData(30L, 31L, secondAppend)
 
 assertEquals(2, secondAppend.startedTransactions.size)
-assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(24L)), 
secondAppend.startedTransactions.head)
-assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(30L)), 
secondAppend.startedTransactions.last)
+assertTxnMetadataEquals(new TxnMetadata(producerId, new 
LogOffsetMetadata(24L)), secondAppend.startedTransactions.asScala.head)
+assertTxnMetadataEquals(new TxnMetadata(producerId, new 
LogOffsetMetadata(30L)), secondAppend.startedTransactions.asScala.last)
 stateManager.update(secondAppend)
 stateManager.completeTxn(firstCompletedTxn.get)
 stateManager.completeTxn(secondCompletedTxn.get)
 stateManager.onHighWatermarkUpdated(32L)
 assertEquals(Some(new LogOffsetMetadata(30L)), 
stateManager.firstUnstableOffset)
   }
 
+  def assertTxnMetadataEquals(expected: java.util.List[TxnMetadata], actual: 
java.util.List[TxnMetadata]): Unit = {

Review Comment:
   aah, that was my bad. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org