[GitHub] [kafka] lamberken commented on a change in pull request #10469: KAFKA-12611: Fix using random payload in ProducerPerformance incorrectly

2021-04-09 Thread GitBox


lamberken commented on a change in pull request #10469:
URL: https://github.com/apache/kafka/pull/10469#discussion_r610999261



##
File path: tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
##
@@ -127,15 +122,22 @@ public static void main(String[] args) throws Exception {
 int currentTransactionSize = 0;
 long transactionStartTime = 0;
 for (long i = 0; i < numRecords; i++) {
+
+if (payloadFilePath != null) {
+payload = 
payloadByteList.get(random.nextInt(payloadByteList.size()));
+} else if (recordSize != null) {
+payload = new byte[recordSize];

Review comment:
   ok, I think you can test it locally.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995466



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2154,8 +2182,14 @@ private boolean maybeCompleteShutdown(long 
currentTimeMs) {
 return false;
 }
 
-private void maybeUpdateOldestSnapshotId() {
-log.latestSnapshotId().ifPresent(log::deleteBeforeSnapshot);
+private void maybeUpdateEarliestSnapshotId() {

Review comment:
   I changed the method name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995406



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -326,6 +336,14 @@ private void updateListenersProgress(List 
listenerContexts, lon
 }
 }
 
+private Optional> latestSnapshot() {
+return log.latestSnapshotId().flatMap(snapshoId -> {
+return log

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995402



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -326,6 +336,14 @@ private void updateListenersProgress(List 
listenerContexts, lon
 }
 }
 
+private Optional> latestSnapshot() {
+return log.latestSnapshotId().flatMap(snapshoId -> {

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995361



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -68,20 +70,65 @@ public synchronized void increment() {
 @Override
 public synchronized void handleCommit(BatchReader reader) {
 try {
-int initialValue = this.committed;
+int initialValue = committed;
 while (reader.hasNext()) {
 BatchReader.Batch batch = reader.next();
 log.debug("Handle commit of batch with records {} at base 
offset {}",
 batch.records(), batch.baseOffset());
 for (Integer value : batch.records()) {
-if (value != this.committed + 1) {
-throw new AssertionError("Expected next committed 
value to be " +
-(this.committed + 1) + ", but instead found " + 
value + " on node " + nodeId);
+if (value != committed + 1) {

Review comment:
   Done. Not sure if it is any better :smile: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995276



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -68,20 +70,65 @@ public synchronized void increment() {
 @Override
 public synchronized void handleCommit(BatchReader reader) {
 try {
-int initialValue = this.committed;
+int initialValue = committed;
 while (reader.hasNext()) {
 BatchReader.Batch batch = reader.next();
 log.debug("Handle commit of batch with records {} at base 
offset {}",
 batch.records(), batch.baseOffset());
 for (Integer value : batch.records()) {
-if (value != this.committed + 1) {
-throw new AssertionError("Expected next committed 
value to be " +
-(this.committed + 1) + ", but instead found " + 
value + " on node " + nodeId);
+if (value != committed + 1) {
+throw new AssertionError(
+String.format(
+"Expected next committed value to be %s, but 
instead found %s on node %s",
+committed + 1,
+value,
+nodeId
+)
+);
 }
-this.committed = value;
+committed = value;
 }
+
+nextReadOffset = batch.lastOffset() + 1;
+readEpoch = batch.epoch();
 }
 log.debug("Counter incremented from {} to {}", initialValue, 
committed);
+
+if (lastSnapshotEndOffset + 10 < nextReadOffset) {

Review comment:
   Made it a variable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995276



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -68,20 +70,65 @@ public synchronized void increment() {
 @Override
 public synchronized void handleCommit(BatchReader reader) {
 try {
-int initialValue = this.committed;
+int initialValue = committed;
 while (reader.hasNext()) {
 BatchReader.Batch batch = reader.next();
 log.debug("Handle commit of batch with records {} at base 
offset {}",
 batch.records(), batch.baseOffset());
 for (Integer value : batch.records()) {
-if (value != this.committed + 1) {
-throw new AssertionError("Expected next committed 
value to be " +
-(this.committed + 1) + ", but instead found " + 
value + " on node " + nodeId);
+if (value != committed + 1) {
+throw new AssertionError(
+String.format(
+"Expected next committed value to be %s, but 
instead found %s on node %s",
+committed + 1,
+value,
+nodeId
+)
+);
 }
-this.committed = value;
+committed = value;
 }
+
+nextReadOffset = batch.lastOffset() + 1;
+readEpoch = batch.epoch();
 }
 log.debug("Counter incremented from {} to {}", initialValue, 
committed);
+
+if (lastSnapshotEndOffset + 10 < nextReadOffset) {

Review comment:
   Made it configurable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995124



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -355,24 +373,29 @@ private void fireHandleResign(int epoch) {
 }
 
 @Override
-public void initialize() throws IOException {
-quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
+public void initialize() {
+try {
+quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
 
-long currentTimeMs = time.milliseconds();
-if (quorum.isLeader()) {
-throw new IllegalStateException("Voter cannot initialize as a 
Leader");
-} else if (quorum.isCandidate()) {
-onBecomeCandidate(currentTimeMs);
-} else if (quorum.isFollower()) {
-onBecomeFollower(currentTimeMs);
-}
+long currentTimeMs = time.milliseconds();
+if (quorum.isLeader()) {
+throw new IllegalStateException("Voter cannot initialize as a 
Leader");
+} else if (quorum.isCandidate()) {
+onBecomeCandidate(currentTimeMs);
+} else if (quorum.isFollower()) {
+onBecomeFollower(currentTimeMs);
+}
 
-// When there is only a single voter, become candidate immediately
-if (quorum.isVoter()
-&& quorum.remoteVoters().isEmpty()
-&& !quorum.isLeader()
-&& !quorum.isCandidate()) {
-transitionToCandidate(currentTimeMs);
+// When there is only a single voter, become candidate immediately
+if (quorum.isVoter()
+&& quorum.remoteVoters().isEmpty()
+&& !quorum.isLeader()

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995053



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
##
@@ -163,48 +85,46 @@ public OptionalLong lastOffset() {
 
 @Override
 public void close() {
-isClosed = true;
+if (!isClosed) {
+isClosed = true;
 
-if (allocatedBuffer != null) {
-bufferSupplier.release(allocatedBuffer);
+iterator.close();
+closeListener.onClose(this);
 }
-
-closeListener.onClose(this);
 }
 
-public T readRecord(Readable input) {
-// Read size of body in bytes
-input.readVarint();
-
-// Read unused attributes
-input.readByte();
-
-long timestampDelta = input.readVarlong();
-if (timestampDelta != 0) {
-throw new IllegalArgumentException();
-}
-
-// Read offset delta
-input.readVarint();
-
-int keySize = input.readVarint();
-if (keySize != -1) {
-throw new IllegalArgumentException("Unexpected key size " + 
keySize);
-}
+public static  RecordsBatchReader of(
+long baseOffset,
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize,
+CloseListener> closeListener
+) {
+return new RecordsBatchReader<>(
+baseOffset,
+new SerdeRecordsIterator<>(records, serde, bufferSupplier, 
maxBatchSize),
+closeListener
+);
+}
 
-int valueSize = input.readVarint();
-if (valueSize < 0) {
-throw new IllegalArgumentException();
+private void checkIfClosed() {

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#issuecomment-817076435


   @mumrah @hachikuji @dengziming thanks for the feedback. I should have 
addressed all of your suggestions. Let me know if I missed anything.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-09 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r610990019



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -202,6 +203,25 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
+public void addControlBatch(MemoryRecords records) {
+appendLock.lock();
+try {
+drainStatus = DrainStatus.STARTED;
+completed.add(new CompletedBatch<>(
+nextOffset,
+null,

Review comment:
   Was your question rhetorical? Should we use Optional.empty?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #10510: KAFKA-12607: Test case for resigned state vote granting

2021-04-09 Thread GitBox


hachikuji merged pull request #10510:
URL: https://github.com/apache/kafka/pull/10510


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-04-09 Thread GitBox


dengziming commented on pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#issuecomment-817051364


   @rajinisivaram @jolshan , Currently we have startd merge pr for 3.0, so it's 
time to take a look at this again, also #9684 since part of it has been 
reverted, thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10478: KAFKA-12553: Refactor recovery logic to introduce LogLoader

2021-04-09 Thread GitBox


junrao commented on a change in pull request #10478:
URL: https://github.com/apache/kafka/pull/10478#discussion_r610968190



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -255,19 +261,21 @@ case object SnapshotGenerated extends 
LogStartOffsetIncrementReason {
 @threadsafe
 class Log(@volatile private var _dir: File,
   @volatile var config: LogConfig,
+  val segments: LogSegments,
   @volatile var logStartOffset: Long,
   @volatile var recoveryPoint: Long,
+  @volatile var nextOffsetMetadata: LogOffsetMetadata,
   scheduler: Scheduler,
   brokerTopicStats: BrokerTopicStats,
   val time: Time,
   val maxProducerIdExpirationMs: Int,
   val producerIdExpirationCheckIntervalMs: Int,
   val topicPartition: TopicPartition,
+  @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
   val producerStateManager: ProducerStateManager,
   logDirFailureChannel: LogDirFailureChannel,
-  private val hadCleanShutdown: Boolean = true,
   @volatile var topicId: Option[Uuid],
-  val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
+  val keepPartitionMetadataFile: Boolean = true) extends Logging with 
KafkaMetricsGroup {

Review comment:
   Is the default value needed? 

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2586,11 +1999,15 @@ object Log {
 logDirFailureChannel: LogDirFailureChannel,
 lastShutdownClean: Boolean = true,
 topicId: Option[Uuid],
-keepPartitionMetadataFile: Boolean): Log = {
+keepPartitionMetadataFile: Boolean = true): Log = {
+// create the log directory if it doesn't exist
+Files.createDirectories(dir.toPath)
 val topicPartition = Log.parseTopicPartitionName(dir)
-val producerStateManager = new ProducerStateManager(topicPartition, dir, 
maxProducerIdExpirationMs)
-new Log(dir, config, logStartOffset, recoveryPoint, scheduler, 
brokerTopicStats, time, maxProducerIdExpirationMs,
-  producerIdExpirationCheckIntervalMs, topicPartition, 
producerStateManager, logDirFailureChannel, lastShutdownClean, topicId, 
keepPartitionMetadataFile)
+val logLoader = new LogLoader(dir, topicPartition, config, scheduler, 
time, logDirFailureChannel)

Review comment:
   Do we need LogLoader to be a class? Another option is to have LogLoader 
as an Object with a single public method `load()`.

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2586,11 +1999,15 @@ object Log {
 logDirFailureChannel: LogDirFailureChannel,
 lastShutdownClean: Boolean = true,
 topicId: Option[Uuid],
-keepPartitionMetadataFile: Boolean): Log = {
+keepPartitionMetadataFile: Boolean = true): Log = {

Review comment:
   Is the default value needed? If so, it seems many tests are not taking 
advantage of this.

##
File path: core/src/test/scala/unit/kafka/log/LogTestUtils.scala
##
@@ -37,4 +47,246 @@ object LogTestUtils {
 
 new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, 
time)
   }
+
+  def createLogConfig(segmentMs: Long = Defaults.SegmentMs,
+  segmentBytes: Int = Defaults.SegmentSize,
+  retentionMs: Long = Defaults.RetentionMs,
+  retentionBytes: Long = Defaults.RetentionSize,
+  segmentJitterMs: Long = Defaults.SegmentJitterMs,
+  cleanupPolicy: String = Defaults.CleanupPolicy,
+  maxMessageBytes: Int = Defaults.MaxMessageSize,
+  indexIntervalBytes: Int = Defaults.IndexInterval,
+  segmentIndexBytes: Int = Defaults.MaxIndexSize,
+  messageFormatVersion: String = 
Defaults.MessageFormatVersion,
+  fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs): 
LogConfig = {
+val logProps = new Properties()
+
+logProps.put(LogConfig.SegmentMsProp, segmentMs: java.lang.Long)
+logProps.put(LogConfig.SegmentBytesProp, segmentBytes: Integer)
+logProps.put(LogConfig.RetentionMsProp, retentionMs: java.lang.Long)
+logProps.put(LogConfig.RetentionBytesProp, retentionBytes: java.lang.Long)
+logProps.put(LogConfig.SegmentJitterMsProp, segmentJitterMs: 
java.lang.Long)
+logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
+logProps.put(LogConfig.MaxMessageBytesProp, maxMessageBytes: Integer)
+logProps.put(LogConfig.IndexIntervalBytesProp, indexIntervalBytes: Integer)
+logProps.put(LogConfig.SegmentIndexBytesProp, segmentIndexBytes: Integer)
+logProps.put(LogConfig.MessageFormatVersionProp, messageFormatVersion)
+logProps.put(LogConfig.FileDeleteDelayMsProp, fileDeleteDelayMs: 
java.lang.Long)
+LogConfig(logProps)
+  }
+
+  def createLog(dir: F

[GitHub] [kafka] tang7526 closed pull request #10515: MINOR: Remove deprecated checksum method

2021-04-09 Thread GitBox


tang7526 closed pull request #10515:
URL: https://github.com/apache/kafka/pull/10515


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10411: KAFKA-7606: Remove deprecated options from StreamsResetter

2021-04-09 Thread GitBox


ableegoldman commented on pull request #10411:
URL: https://github.com/apache/kafka/pull/10411#issuecomment-817046460


   Unrelated test failure in 
`InternalTopicManagerTest.shouldOnlyRetryNotSuccessfulFuturesDuringSetup()` 
(filed https://issues.apache.org/jira/browse/KAFKA-12650)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12650) NPE in InternalTopicManager#cleanUpCreatedTopics

2021-04-09 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17318404#comment-17318404
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12650:


This was from 
InternalTopicManagerTest.shouldOnlyRetryNotSuccessfulFuturesDuringSetup() btw.

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10411/12/testReport/junit/org.apache.kafka.streams.processor.internals/InternalTopicManagerTest/shouldOnlyRetryNotSuccessfulFuturesDuringSetup/

> NPE in InternalTopicManager#cleanUpCreatedTopics
> 
>
> Key: KAFKA-12650
> URL: https://issues.apache.org/jira/browse/KAFKA-12650
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0
>
>
> {code:java}
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.cleanUpCreatedTopics(InternalTopicManager.java:675)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.maybeThrowTimeoutExceptionDuringSetup(InternalTopicManager.java:755)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.processCreateTopicResults(InternalTopicManager.java:652)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.setup(InternalTopicManager.java:599)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldOnlyRetryNotSuccessfulFuturesDuringSetup(InternalTopicManagerTest.java:180)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-09 Thread GitBox


ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610966015



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+private List classNames;
+private List classTypes;
+
+@Test
+public void shouldInstantiateAssignor() {
+classNames = Collections.singletonList(StickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldInstantiateListOfAssignors() {
+classNames = Arrays.asList(StickyAssignor.class.getName(), 
CooperativeStickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnNonAssignor() {
+classNames = Collections.singletonList(String.class.getName());
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+classNames = Collections.singletonList("Non-existent assignor");
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldInstantiateFromClassType() {

Review comment:
   Nice, thanks for the update. Looks good




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12650) NPE in InternalTopicManager#cleanUpCreatedTopics

2021-04-09 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12650:
---
Fix Version/s: 3.0.0

> NPE in InternalTopicManager#cleanUpCreatedTopics
> 
>
> Key: KAFKA-12650
> URL: https://issues.apache.org/jira/browse/KAFKA-12650
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 3.0.0
>
>
> {code:java}
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.cleanUpCreatedTopics(InternalTopicManager.java:675)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.maybeThrowTimeoutExceptionDuringSetup(InternalTopicManager.java:755)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.processCreateTopicResults(InternalTopicManager.java:652)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.setup(InternalTopicManager.java:599)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldOnlyRetryNotSuccessfulFuturesDuringSetup(InternalTopicManagerTest.java:180)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12650) NPE in InternalTopicManager#cleanUpCreatedTopics

2021-04-09 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12650:
---
Priority: Blocker  (was: Major)

> NPE in InternalTopicManager#cleanUpCreatedTopics
> 
>
> Key: KAFKA-12650
> URL: https://issues.apache.org/jira/browse/KAFKA-12650
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0
>
>
> {code:java}
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.cleanUpCreatedTopics(InternalTopicManager.java:675)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.maybeThrowTimeoutExceptionDuringSetup(InternalTopicManager.java:755)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.processCreateTopicResults(InternalTopicManager.java:652)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.setup(InternalTopicManager.java:599)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldOnlyRetryNotSuccessfulFuturesDuringSetup(InternalTopicManagerTest.java:180)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12650) NPE in InternalTopicManager#cleanUpCreatedTopics

2021-04-09 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17318403#comment-17318403
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12650:


cc [~cadonna] can you take a look at this? 

> NPE in InternalTopicManager#cleanUpCreatedTopics
> 
>
> Key: KAFKA-12650
> URL: https://issues.apache.org/jira/browse/KAFKA-12650
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> {code:java}
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.cleanUpCreatedTopics(InternalTopicManager.java:675)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.maybeThrowTimeoutExceptionDuringSetup(InternalTopicManager.java:755)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.processCreateTopicResults(InternalTopicManager.java:652)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.setup(InternalTopicManager.java:599)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldOnlyRetryNotSuccessfulFuturesDuringSetup(InternalTopicManagerTest.java:180)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12650) NPE in InternalTopicManager#cleanUpCreatedTopics

2021-04-09 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12650:
--

 Summary: NPE in InternalTopicManager#cleanUpCreatedTopics
 Key: KAFKA-12650
 URL: https://issues.apache.org/jira/browse/KAFKA-12650
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


{code:java}
java.lang.NullPointerException
at 
org.apache.kafka.streams.processor.internals.InternalTopicManager.cleanUpCreatedTopics(InternalTopicManager.java:675)
at 
org.apache.kafka.streams.processor.internals.InternalTopicManager.maybeThrowTimeoutExceptionDuringSetup(InternalTopicManager.java:755)
at 
org.apache.kafka.streams.processor.internals.InternalTopicManager.processCreateTopicResults(InternalTopicManager.java:652)
at 
org.apache.kafka.streams.processor.internals.InternalTopicManager.setup(InternalTopicManager.java:599)
at 
org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldOnlyRetryNotSuccessfulFuturesDuringSetup(InternalTopicManagerTest.java:180)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-09 Thread GitBox


dengziming commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610965340



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+private List classNames;
+private List classTypes;
+
+@Test
+public void shouldInstantiateAssignor() {
+classNames = Collections.singletonList(StickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldInstantiateListOfAssignors() {
+classNames = Arrays.asList(StickyAssignor.class.getName(), 
CooperativeStickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnNonAssignor() {
+classNames = Collections.singletonList(String.class.getName());
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+classNames = Collections.singletonList("Non-existent assignor");
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldInstantiateFromClassType() {

Review comment:
   Yes, this awkward, I fixed this with some code refactoring and fixed the 
comment in upgrade.html, PTAL, Thank you for your patience.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #10409: KAFKA-9295: increase heartbeat interval to avoid unneeded rebalance

2021-04-09 Thread GitBox


showuon edited a comment on pull request #10409:
URL: https://github.com/apache/kafka/pull/10409#issuecomment-817043654


   @ableegoldman , thanks for your comment. I know what your concern is, and it 
makes sense we merge other improvement first. I'll update the PR. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10409: KAFKA-9295: increase heartbeat interval to avoid unneeded rebalance

2021-04-09 Thread GitBox


showuon commented on pull request #10409:
URL: https://github.com/apache/kafka/pull/10409#issuecomment-817043654


   @ableegoldman , thanks for your comment. I know what your concern, and it 
makes sense we merge other improvement first. I'll update the PR. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12649) Expose cache resizer for dynamic memory allocation

2021-04-09 Thread Ben Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Chen reassigned KAFKA-12649:


Assignee: Ben Chen

> Expose cache resizer for dynamic memory allocation
> --
>
> Key: KAFKA-12649
> URL: https://issues.apache.org/jira/browse/KAFKA-12649
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Major
>  Labels: needs-kip
>
> When we added the add/removeStreamThread() APIs to Streams, we implemented a 
> cache resizer to adjust the allocated cache memory per thread accordingly. We 
> could expose that to users as a public API to let them dynamically 
> increase/decrease the amount of memory for the Streams cache (ie 
> cache.max.bytes.buffering).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12649) Expose cache resizer for dynamic memory allocation

2021-04-09 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12649:
--

 Summary: Expose cache resizer for dynamic memory allocation
 Key: KAFKA-12649
 URL: https://issues.apache.org/jira/browse/KAFKA-12649
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: A. Sophie Blee-Goldman


When we added the add/removeStreamThread() APIs to Streams, we implemented a 
cache resizer to adjust the allocated cache memory per thread accordingly. We 
could expose that to users as a public API to let them dynamically 
increase/decrease the amount of memory for the Streams cache (ie 
cache.max.bytes.buffering).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

2021-04-09 Thread GitBox


hachikuji commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r610962235



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Evolving
+public class DescribeTransactionsResult {
+private final Map> 
futures;
+
+DescribeTransactionsResult(Map> futures) {
+this.futures = futures;
+}
+
+public KafkaFuture transactionalIdResult(String 
transactionalId) {

Review comment:
   I agree the name is not great, but I was torn on an alternative. I went 
with `description`. Let me know what you think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-09 Thread GitBox


ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610959238



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+private List classNames;
+private List classTypes;
+
+@Test
+public void shouldInstantiateAssignor() {
+classNames = Collections.singletonList(StickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldInstantiateListOfAssignors() {
+classNames = Arrays.asList(StickyAssignor.class.getName(), 
CooperativeStickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnNonAssignor() {
+classNames = Collections.singletonList(String.class.getName());
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+classNames = Collections.singletonList("Non-existent assignor");
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldInstantiateFromClassType() {

Review comment:
   It's super awkward, obviously, but since this is what happens when we 
process the configs in the real code we should try to replicate that in the test




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-09 Thread GitBox


ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610959058



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+private List classNames;
+private List classTypes;
+
+@Test
+public void shouldInstantiateAssignor() {
+classNames = Collections.singletonList(StickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldInstantiateListOfAssignors() {
+classNames = Arrays.asList(StickyAssignor.class.getName(), 
CooperativeStickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnNonAssignor() {
+classNames = Collections.singletonList(String.class.getName());
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+classNames = Collections.singletonList("Non-existent assignor");
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldInstantiateFromClassType() {

Review comment:
   Ah, yeah, you'd need to do something more like what actually happens in 
the KafkaConsumer/`getAssignorInstances` code. eg
   ```
   @Test
   @SuppressWarnings("unchecked")
   public void shouldInstantiateAssignorClass() {
   Object classTypes = Collections.singletonList(StickyAssignor.class);
   List assignors = 
getAssignorInstances((List) classTypes, Collections.emptyMap());
assertTrue(assignors.get(0) instanceof StickyAssignor);
   }
   ```

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+

[GitHub] [kafka] ableegoldman commented on pull request #10409: KAFKA-9295: increase heartbeat interval to avoid unneeded rebalance

2021-04-09 Thread GitBox


ableegoldman commented on pull request #10409:
URL: https://github.com/apache/kafka/pull/10409#issuecomment-817035420


   Thanks @showuon . I guess my question is really this: why is it that this 
test, and no others, is frequently flaky due to dropping out on the session 
interval? There's nothing really "special" about it, ie it uses a single 
StreamThread and the default hb/session interval. It's hard to believe that 
Jenkins is so bad that a single consumer can't get a single heartbeat to the 
brokers in 10s. Maybe there is a bug or other slowdown in the networking layer, 
I don't know -- my point was really that dropping out on the heartbeat should 
not be expected, and if that's really causing tests to fail then it seems like 
a valid problem to investigate further, and not necessarily just expected 
flakiness (Even if it's not what this particular test was _supposed_ to be 
testing) Does that make sense?
   
   I do see that it appears to drop out on hb expiration, but that only occurs 
twice so I wonder if that's really causing the flakiness or whether it should 
be able to recover from this. What if we just merge the other improvements in 
this PR to start with, and see how much that helps?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

2021-04-09 Thread GitBox


hachikuji commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r610956477



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements 
AdminApiHandler {
+private final LogContext logContext;
+private final Logger log;
+private final Set keys;
+
+public DescribeTransactionsHandler(
+Collection transactionalIds,
+LogContext logContext
+) {
+this.keys = buildKeySet(transactionalIds);
+this.log = logContext.logger(DescribeTransactionsHandler.class);
+this.logContext = logContext;
+}
+
+private static Set buildKeySet(Collection 
transactionalIds) {
+return transactionalIds.stream()
+.map(DescribeTransactionsHandler::asCoordinatorKey)
+.collect(Collectors.toSet());
+}
+
+@Override
+public String apiName() {
+return "describeTransactions";
+}
+
+@Override
+public Keys initializeKeys() {
+return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+}
+
+@Override
+public DescribeTransactionsRequest.Builder buildRequest(
+Integer brokerId,
+Set keys
+) {
+DescribeTransactionsRequestData request = new 
DescribeTransactionsRequestData();
+List transactionalIds = keys.stream().map(key -> 
key.idValue).collect(Collectors.toList());
+request.setTransactionalIds(transactionalIds);
+return new DescribeTransactionsRequest.Builder(request);
+}
+
+@Override
+public ApiResult handleResponse(
+Integer brokerId,
+Set keys,
+AbstractResponse abstractResponse
+) {
+DescribeTransactionsResponse response = (DescribeTransactionsResponse) 
abstractResponse;
+Map completed = new 
HashMap<>();
+Map failed = new HashMap<>();
+List unmapped = new ArrayList<>();
+
+for (DescribeTransactionsResponseData.TransactionState 
transactionState : response.data().transactionStates()) {
+CoordinatorKey transactionalIdKey = 
asCoordinatorKey(transactionState.transactionalId());
+Errors error = Errors.forCode(transactionState.errorCode());
+
+if (error != Errors.NONE) {
+handleError(transactionalIdKey, error, failed, unmapped);
+continue;
+}
+
+OptionalLong transactionStartTimeMs = 
transactionState.transactionStartTimeMs() < 0 ?
+OptionalLong.empty() :
+OptionalLong.of(transactionState.transactionStartTimeMs());
+
+completed.put(transactionalIdKey, new TransactionDescription(
+brokerId,
+TransactionState.parse(transactionState.transactionState()),
+transactionState.producerId(),
+transactionState.producerEpoch(),
+   

[GitHub] [kafka] dengziming commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-09 Thread GitBox


dengziming commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610955990



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+private List classNames;
+private List classTypes;
+
+@Test
+public void shouldInstantiateAssignor() {
+classNames = Collections.singletonList(StickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldInstantiateListOfAssignors() {
+classNames = Arrays.asList(StickyAssignor.class.getName(), 
CooperativeStickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnNonAssignor() {
+classNames = Collections.singletonList(String.class.getName());
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+classNames = Collections.singletonList("Non-existent assignor");
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldInstantiateFromClassType() {

Review comment:
   It seems that `getAssignorInstances()` can only accept `List` as 
the first parameter, so this could not work.😳




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610955955



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+
+if (!buffer.hasRemaining()) {
+// The buffer is not big enough to read an entire batch
+throw new IllegalStateException(
+String.format(
+"Unable to read batch from file records buffer 
%s with maximum batch %s and record size %s",
+buffer,
+maxBatchSize,
+records.sizeInBytes()
+)
+);
+}

Review comment:
   Yeah, got it. I thought about it some more. This is currently only used 
for the metadata partitions. If the controller or broker cannot read the 
metadata partition the

[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-09 Thread GitBox


ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610954191



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+private List classNames;
+private List classTypes;
+
+@Test
+public void shouldInstantiateAssignor() {
+classNames = Collections.singletonList(StickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldInstantiateListOfAssignors() {
+classNames = Arrays.asList(StickyAssignor.class.getName(), 
CooperativeStickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnNonAssignor() {
+classNames = Collections.singletonList(String.class.getName());
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+classNames = Collections.singletonList("Non-existent assignor");
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldInstantiateFromClassType() {

Review comment:
   Ah, I was suggesting to just replicate the `shouldInstantiateAssignor` 
and `shouldInstantiateListOfAssignors` tests exactly, but with the `classTypes` 
being eg `StickyAssignor.class` instead of `StickyAssignor.class.getName()`. 
For example
   
   ```
   classNames = Collections.singletonList(StickyAssignor.class);
   List assignors = getAssignorInstances(classNames, 
Collections.emptyMap());
   assertTrue(assignors.get(0) instanceof StickyAssignor);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-09 Thread GitBox


ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610954191



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+private List classNames;
+private List classTypes;
+
+@Test
+public void shouldInstantiateAssignor() {
+classNames = Collections.singletonList(StickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldInstantiateListOfAssignors() {
+classNames = Arrays.asList(StickyAssignor.class.getName(), 
CooperativeStickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnNonAssignor() {
+classNames = Collections.singletonList(String.class.getName());
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+classNames = Collections.singletonList("Non-existent assignor");
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldInstantiateFromClassType() {

Review comment:
   Ah, I was suggesting to just replicate the `shouldInstantiateAssignor` 
and `shouldInstantiateListOfAssignors` tests exactly, but with the `classTypes` 
being eg `StickyAssignor.class` instead of `StickyAssignor.class.getName()`. 
For example
   
   ```
   classNames = Collections.singletonList(StickyAssignor.class);
   List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
   assertTrue(assignors.get(0) instanceof StickyAssignor);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610952440



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -418,48 +414,49 @@ public RawSnapshotWriter createSnapshot(OffsetAndEpoch 
snapshotId) {
 public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {}
 
 @Override
-public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) {
-if (logStartOffset() > logStartSnapshotId.offset ||
-highWatermark.offset < logStartSnapshotId.offset) {
+public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) {
+if (logStartOffset() > snapshotId.offset ||
+highWatermark.offset < snapshotId.offset) {
 
 throw new OffsetOutOfRangeException(
 String.format(
 "New log start (%s) is less than start offset (%s) or is 
greater than the high watermark (%s)",

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610951347



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+
+if (!buffer.hasRemaining()) {
+// The buffer is not big enough to read an entire batch
+throw new IllegalStateException(
+String.format(
+"Unable to read batch from file records buffer 
%s with maximum batch %s and record size %s",
+buffer,
+maxBatchSize,
+records.sizeInBytes()
+)
+);
+}
+} else {
+buffer = bufferSupplier.get(Math.min(maxBatchSize, 
records.sizeInBytes()));
+allocatedBuffer = Optional.of(buffer);
+

[GitHub] [kafka] MarcoLotz commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-04-09 Thread GitBox


MarcoLotz commented on pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#issuecomment-817024578


   @mjsax thanks! Rebased with the hotfix. Now should build as expected.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610949543



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {

Review comment:
   By the way Kafka already has 
`org.apache.kafka.common.record.RecordBatchIterator`.
   
   To me, the name of the iterator should represent what it iterates over not 
what it generates. In this case it is iterating over `Records` (`MemoryRecords` 
and `FileRecords`). When we implement @mumrah's suggestion of having separate 
implements for each, we will have `MemoryRecordsIterator` and 
`FileRecordsIterator`.
   
   I'll change the name to `RecordsIterator`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-09 Thread GitBox


dengziming commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610949055



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+private List classNames;
+private List classTypes;
+
+@Test
+public void shouldInstantiateAssignor() {
+classNames = Collections.singletonList(StickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldInstantiateListOfAssignors() {
+classNames = Arrays.asList(StickyAssignor.class.getName(), 
CooperativeStickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnNonAssignor() {
+classNames = Collections.singletonList(String.class.getName());
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+classNames = Collections.singletonList("Non-existent assignor");
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldInstantiateFromClassType() {

Review comment:
   `assignors ` is not visible in `KafkaConsumer`, we should add a 
`getAssignors()` in `KafkaConsumer` to test this. Maybe the other tests are 
enough to verify that the `getAssignorInstances` operate rightly, How about 
remove these 2 test cases?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #10489: MINOR Moved tiered storage API classes from clients module to a new storage-api module.

2021-04-09 Thread GitBox


satishd commented on pull request #10489:
URL: https://github.com/apache/kafka/pull/10489#issuecomment-817022234


   Thanks @ijuma for bringing this up. 
   
   Discussed earlier having api as the suffix for the API related classes. New 
package name in `storage:api` module becomes 
`org.apache.kafka.server.log.remote.storage.api`. 
   cc @junrao 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12648) Experiment with resilient isomorphic topologies

2021-04-09 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12648:
---
Description: 
We're not ready to make this a public feature yet, but I want to start 
experimenting with some ways to make Streams applications more resilient in the 
face of isomorphic topological changes (eg adding/removing/reordering 
subtopologies).

If this turns out to be stable and useful, we can circle back on doing a KIP to 
bring this feature into the public API

  was:We're not ready to make this a public feature yet, but I want to start 
experimenting with some ways to make Streams applications more resilient in the 
face of isomorphic topological changes (eg adding/removing/reordering 
subtopologies)


> Experiment with resilient isomorphic topologies
> ---
>
> Key: KAFKA-12648
> URL: https://issues.apache.org/jira/browse/KAFKA-12648
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>
> We're not ready to make this a public feature yet, but I want to start 
> experimenting with some ways to make Streams applications more resilient in 
> the face of isomorphic topological changes (eg adding/removing/reordering 
> subtopologies).
> If this turns out to be stable and useful, we can circle back on doing a KIP 
> to bring this feature into the public API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on a change in pull request #10503: KAFKA-9988: Suppress uncaught exceptions in log messages during task shutdown

2021-04-09 Thread GitBox


kkonstantine commented on a change in pull request #10503:
URL: https://github.com/apache/kafka/pull/10503#discussion_r610945519



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##
@@ -185,8 +185,16 @@ private void doRun() throws InterruptedException {
 
 execute();
 } catch (Throwable t) {
-log.error("{} Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted", this, t);
-throw t;
+if (!stopping && !cancelled) {

Review comment:
   Should we just write this as: 
   ```
   if (cancelled) {
   } else if(stopping) {
   } else {
   }
   ```
   ?
   Should be equivalent but a bit easier to follow. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10510: KAFKA-12607: Test case for resigned state vote granting

2021-04-09 Thread GitBox


dengziming commented on a change in pull request #10510:
URL: https://github.com/apache/kafka/pull/10510#discussion_r610945674



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -137,6 +137,65 @@ public void 
testRejectVotesFromSameEpochAfterResigningCandidacy() throws Excepti
 context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), false);
 }
 
+@Test
+public void testGrantVotesFromHigherEpochAfterResigningLeadership() throws 
Exception {

Review comment:
   Thank you for your reminder, done. PTAL again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12648) Experiment with resilient isomorphic topologies

2021-04-09 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12648:
--

 Summary: Experiment with resilient isomorphic topologies
 Key: KAFKA-12648
 URL: https://issues.apache.org/jira/browse/KAFKA-12648
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: A. Sophie Blee-Goldman
Assignee: A. Sophie Blee-Goldman


We're not ready to make this a public feature yet, but I want to start 
experimenting with some ways to make Streams applications more resilient in the 
face of isomorphic topological changes (eg adding/removing/reordering 
subtopologies)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-09 Thread GitBox


satishd commented on pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#issuecomment-817018916


   Thanks @junrao for the comment, addressed with the commit 
[317be7a](https://github.com/apache/kafka/pull/10218/commits/317be7abe50264b6ed09bb85f36eacdc6b09599f).
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-09 Thread GitBox


satishd commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r610944855



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
##
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class provides an in-memory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to leader epochs.
+ * 
+ * Remote log segment can go through the state transitions as mentioned in 
{@link RemoteLogSegmentState}.
+ * 
+ * This class will have all the segments which did not reach terminal state 
viz DELETE_SEGMENT_FINISHED. That means,any
+ * segment reaching the terminal state will get cleared from this instance.
+ * This class provides different methods to fetch segment metadata like {@link 
#remoteLogSegmentMetadata(int, long)},
+ * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, 
{@link #listAllRemoteLogSegments()}. Those
+ * methods have different semantics to fetch the segment based on its state.
+ * 
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}:
+ * 
+ * Segment in this state indicates it is not yet copied successfully. So, 
these segments will not be
+ * accessible for reads but these are considered for cleanups when a partition 
is deleted.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}:
+ * 
+ * Segment in this state indicates it is successfully copied and it is 
available for reads. So, these segments
+ * will be accessible for reads. But this should be available for any cleanup 
activity like deleting segments by the
+ * caller of this class.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}:
+ * Segment in this state indicates it is getting deleted. That means, it is 
not available for reads. But it should be
+ * available for any cleanup activity like deleting segments by the caller of 
this class.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}:
+ * Segment in this state indicate it is already deleted. That means, it is not 
available for any activity including
+ * reads or cleanup activity. This cache will clear entries containing this 
state.
+ * 
+ * 
+ *
+ * 
+ *  The below table summarizes whether the segment with the respective state 
are available for the given methods.
+ * 
+ * 
+-+--++-+-+
+ * |  Method / SegmentState  | COPY_SEGMENT_STARTED | 
COPY_SEGMENT_FINISHED  | DELETE_SEGMENT_STARTED  | DELETE_SEGMENT_STARTED  |
+ * 
|-+--++-+-|
+ * | remoteLogSegmentMetadata|No|   Yes
  |  No |   No|
+ * | (int leaderEpoch, long offset)  |  |  
  | | |
+ * 
|-+--++-+-|
+ * | listRemoteLogSegments   |Yes   |   Yes
  |  Yes|   No|
+ * | (int leaderEpoch)   |  

[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610944539



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -68,20 +70,65 @@ public synchronized void increment() {
 @Override
 public synchronized void handleCommit(BatchReader reader) {
 try {
-int initialValue = this.committed;
+int initialValue = committed;
 while (reader.hasNext()) {
 BatchReader.Batch batch = reader.next();
 log.debug("Handle commit of batch with records {} at base 
offset {}",
 batch.records(), batch.baseOffset());
 for (Integer value : batch.records()) {
-if (value != this.committed + 1) {
-throw new AssertionError("Expected next committed 
value to be " +
-(this.committed + 1) + ", but instead found " + 
value + " on node " + nodeId);
+if (value != committed + 1) {
+throw new AssertionError(
+String.format(
+"Expected next committed value to be %s, but 
instead found %s on node %s",
+committed + 1,
+value,
+nodeId
+)
+);
 }
-this.committed = value;
+committed = value;
 }
+
+nextReadOffset = batch.lastOffset() + 1;
+readEpoch = batch.epoch();
 }
 log.debug("Counter incremented from {} to {}", initialValue, 
committed);
+
+if (lastSnapshotEndOffset + 10 < nextReadOffset) {

Review comment:
   Hmm. We should only generate snapshots at batch boundaries. There is no 
guarantee that the `lastOffset` of a batch is a multiple of some number since 
batches can any number of records.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-09 Thread GitBox


ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610944206



##
File path: docs/upgrade.html
##
@@ -49,8 +49,10 @@ Notable changes in 3
 were removed. These methods were not intended to be public API and 
there is no replacement.
 The NoOffsetForPartitionException.partition() method 
was removed. Please use partitions()
 instead.
-The Scala kafka.common.MessageFormatter was removed. 
Plese use the Java org.apache.kafka.common.MessageFormatter.
+The Scala kafka.common.MessageFormatter was removed. 
Please use the Java org.apache.kafka.common.MessageFormatter.
 The MessageFormatter.init(Properties) method was 
removed. Please use configure(Map) instead.
+The deprecated 
org.apache.kafka.clients.consumer.internals.PartitionAssignor and 
related classes have been removed. Please use

Review comment:
   ```suggestion
   The deprecated 
org.apache.kafka.clients.consumer.internals.PartitionAssignor 
class has been removed. Please use
   ```
   
   See my reply on the original comment 🙂 

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+private List classNames;
+private List classTypes;
+
+@Test
+public void shouldInstantiateAssignor() {
+classNames = Collections.singletonList(StickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldInstantiateListOfAssignors() {
+classNames = Arrays.asList(StickyAssignor.class.getName(), 
CooperativeStickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnNonAssignor() {
+classNames = Collections.singletonList(String.class.getName());
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+classNames = Collections.singletonList("Non-existent assignor");
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldInstantiateFromClassType() {

Review comment:
   I think it would make sense to style this test (and 
`shouldInstantiateFromListOfClassTypes` below) more like 
`shouldInstantiateAssignors` now, ie where we actually validate the assignors 
that are returned (eg `assertTrue(assignors.get(0) instanceof 
StickyAssignor)`). Previously this test was just making sure that we adaptor 
would work and we wouldn't throw an exception when constructing the consumer, 
that's why it's like this 

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain

[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-09 Thread GitBox


ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610943138



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
##
@@ -254,4 +258,41 @@ public static RebalanceProtocol forId(byte id) {
 }
 }
 
+/**
+ * Get a list of configured instances of {@link 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor}
+ * based on the class names/types specified by {@link 
org.apache.kafka.clients.consumer.ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
+ */
+public static List 
getAssignorInstances(List assignorClasses, Map configs) 
{

Review comment:
   nit: this should be package private I think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-09 Thread GitBox


ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610942845



##
File path: docs/upgrade.html
##
@@ -49,8 +49,9 @@ Notable changes in 3
 were removed. These methods were not intended to be public API and 
there is no replacement.
 The NoOffsetForPartitionException.partition() method 
was removed. Please use partitions()
 instead.
-The Scala kafka.common.MessageFormatter was removed. 
Plese use the Java org.apache.kafka.common.MessageFormatter.
+The Scala kafka.common.MessageFormatter was removed. 
Please use the Java org.apache.kafka.common.MessageFormatter.
 The MessageFormatter.init(Properties) method was 
removed. Please use configure(Map) instead.
+The PartitionAssignor classes have been removed. 
Please use ConsumerPartitionAssignor instead.

Review comment:
   I changed it to the singular `class` intentionally since the 
`PartitionAssignorAdaptor` was never a public API, the only thing that was 
exposed to users -- and therefore the only thing they care or need to know 
about -- was the `PartitionAssignor`. Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610942230



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -68,20 +70,65 @@ public synchronized void increment() {
 @Override
 public synchronized void handleCommit(BatchReader reader) {
 try {
-int initialValue = this.committed;
+int initialValue = committed;
 while (reader.hasNext()) {
 BatchReader.Batch batch = reader.next();
 log.debug("Handle commit of batch with records {} at base 
offset {}",
 batch.records(), batch.baseOffset());
 for (Integer value : batch.records()) {
-if (value != this.committed + 1) {
-throw new AssertionError("Expected next committed 
value to be " +
-(this.committed + 1) + ", but instead found " + 
value + " on node " + nodeId);
+if (value != committed + 1) {
+throw new AssertionError(
+String.format(
+"Expected next committed value to be %s, but 
instead found %s on node %s",
+committed + 1,
+value,
+nodeId
+)
+);
 }
-this.committed = value;
+committed = value;
 }
+
+nextReadOffset = batch.lastOffset() + 1;

Review comment:
   Yep. Not sure what I was thinking when I added those fields.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-09 Thread GitBox


satishd commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r610940434



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * This class represents the in-memory state of segments associated with a 
leader epoch. This includes the mapping of offset to
+ * segment ids and unreferenced segments which are not mapped to any offset 
but they exist in remote storage.
+ * 
+ * This is used by {@link RemoteLogMetadataCache} to track the segments for 
each leader epoch.
+ */
+class RemoteLogLeaderEpochState {
+
+// It contains offset to segment ids mapping with the segment state as 
COPY_SEGMENT_FINISHED.
+private final NavigableMap offsetToId = new 
ConcurrentSkipListMap<>();
+
+/**
+ * It represents unreferenced segments for this leader epoch. It contains 
the segments still in COPY_SEGMENT_STARTED
+ * and DELETE_SEGMENT_STARTED state or these have been replaced by callers 
with other segments having the same
+ * start offset for the leader epoch. These will be returned by {@link 
RemoteLogMetadataCache#listAllRemoteLogSegments()}
+ * and {@link RemoteLogMetadataCache#listRemoteLogSegments(int 
leaderEpoch)} so that callers can clean them up if
+ * they still exist. These will be cleaned from the cache once they reach 
DELETE_SEGMENT_FINISHED state.
+ */
+private final Set unreferencedSegmentIds = 
ConcurrentHashMap.newKeySet();
+
+// It represents the highest log offset of the segments that were updated 
with updateHighestLogOffset.
+private volatile Long highestLogOffset;
+
+/**
+ * Returns all the segments associated with this leader epoch sorted by 
start offset in ascending order.
+ *
+ * @param idToSegmentMetadata mapping of id to segment metadata. This will 
be used to get RemoteLogSegmentMetadata
+ *for an id to be used for sorting.
+ * @return
+ */
+Iterator 
listAllRemoteLogSegments(Map 
idToSegmentMetadata) {
+// Return all the segments including unreferenced metadata.
+int size = offsetToId.size() + unreferencedSegmentIds.size();
+if (size == 0) {
+return Collections.emptyIterator();
+}
+
+ArrayList metadataList = new 
ArrayList<>(size);
+for (RemoteLogSegmentId id : offsetToId.values()) {
+metadataList.add(idToSegmentMetadata.get(id));
+}
+
+if (!unreferencedSegmentIds.isEmpty()) {
+for (RemoteLogSegmentId id : unreferencedSegmentIds) {
+metadataList.add(idToSegmentMetadata.get(id));
+}
+
+// sort only when unreferenced entries exist as they are already 
sorted in offsetToId.
+
metadataList.sort(Comparator.comparingLong(RemoteLogSegmentMetadata::startOffset));
+}
+
+return metadataList.iterator();
+}
+
+void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
+   Long leaderEpochEndOffset) {
+// Add the segment epochs mapping as the segment is copied 
successfully.
+RemoteLogSegmentId oldEntry = offsetToId.put(startOffset, 
remoteLogSegmentId);
+
+// Remove the metadata from unreferenced entries as it is successfully 
copied and added to the offset mapping.
+unreferencedSegmentIds.remove(remoteLogSegmentId);
+
+// Add the old entry to unreferenced entries as the mapping is removed 
for the old entry.
+if (oldEntry

[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-09 Thread GitBox


satishd commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r610940157



##
File path: 
storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManagerTest.java
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class InmemoryRemoteStorageManagerTest {
+private static final Logger log = 
LoggerFactory.getLogger(InmemoryRemoteStorageManagerTest.class);
+
+private static final TopicPartition TP = new TopicPartition("foo", 1);
+private static final File DIR = TestUtils.tempDirectory("inmem-rsm-");
+private static final Random RANDOM = new Random();
+
+@Test
+public void testCopyLogSegment() throws Exception {
+InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
+RemoteLogSegmentMetadata segmentMetadata = 
createRemoteLogSegmentMetadata();
+LogSegmentData logSegmentData = createLogSegmentData();
+// Copy all the segment data.
+rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
+
+// Check that the segment data exists in in-memory RSM.
+boolean containsSegment = 
rsm.containsKey(InmemoryRemoteStorageManager.generateKeyForSegment(segmentMetadata));
+Assertions.assertTrue(containsSegment);
+
+// Check that the indexes exist in in-memory RSM.
+for (RemoteStorageManager.IndexType indexType : 
RemoteStorageManager.IndexType.values()) {
+boolean containsIndex = 
rsm.containsKey(InmemoryRemoteStorageManager.generateKeyForIndex(segmentMetadata,
 indexType));
+Assertions.assertTrue(containsIndex);
+}
+}
+
+private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
+TopicIdPartition topicPartition = new 
TopicIdPartition(Uuid.randomUuid(), TP);
+RemoteLogSegmentId id = new RemoteLogSegmentId(topicPartition, 
Uuid.randomUuid());
+return new RemoteLogSegmentMetadata(id, 100L, 200L, 
System.currentTimeMillis(), 0,
+System.currentTimeMillis(), 100, Collections.singletonMap(1, 
100L));
+}
+
+@Test
+public void testFetchLogSegmentIndexes() throws Exception {
+InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
+RemoteLogSegmentMetadata segmentMetadata = 
createRemoteLogSegmentMetadata();
+int segSize = 100;
+LogSegmentData logSegmentData = createLogSegmentData(segSize);
+
+// Copy the segment
+rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
+
+// Check segment data exists for the copied segment.
+try (InputStream segmentStream = rsm.fetchLogSegment(segmentMetadata, 
0)) {
+checkContentSame(segmentStream, logSegmentData.logSegment());
+}
+
+HashMap expectedIndexToPaths = 
new HashMap<>();
+expectedIndexToPaths.put(RemoteStorageManager.IndexType.OFFSET, 
logSegmentData.offsetIndex());
+expectedIndexToPaths.put(RemoteStorageManager.IndexType.TIMESTAMP, 
logSegmentData.timeIndex());
+expectedIndexToPaths.put(RemoteStorageManager.IndexType.TRANSACTION, 
logSegmentData.txnIndex());
+
expectedIndexToPaths.put(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, 
logSegmentData.producerSnapshotIndex());
+
+// Check all segment indexes exist for the copied segment.
+for (Map.Entry entry : 
expectedIndexToPaths.entrySet()) {
+RemoteStor

[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610939585



##
File path: raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java
##
@@ -37,17 +36,21 @@
  *
  * @throws IOException for any IO error while reading the size

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610939450



##
File path: 
raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
##
@@ -124,6 +125,11 @@ public void handleCommit(BatchReader 
reader) {
 }
 }
 
+@Override
+public void handleSnapshot(SnapshotReader 
reader) {
+reader.close();

Review comment:
   In the future yes. Simply ignore the snapshot is okay for now for the 
following reasons:
   1. We are getting rid of this shim in 
https://github.com/apache/kafka/pull/10497
   2. None of the kraft listeners generate snapshots so this should never be 
called
   3. We will implement snapshot loading for the controller and broker as part 
of this jira: https://issues.apache.org/jira/browse/KAFKA-12466




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12647) Implement loading snapshot in the broker

2021-04-09 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-12647:
--

 Summary: Implement loading snapshot in the broker
 Key: KAFKA-12647
 URL: https://issues.apache.org/jira/browse/KAFKA-12647
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jose Armando Garcia Sancio






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12646) Implement loading snapshot in the controller

2021-04-09 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-12646:
--

 Summary: Implement loading snapshot in the controller
 Key: KAFKA-12646
 URL: https://issues.apache.org/jira/browse/KAFKA-12646
 Project: Kafka
  Issue Type: Sub-task
  Components: controller
Reporter: Jose Armando Garcia Sancio






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610934045



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+
+if (!buffer.hasRemaining()) {
+// The buffer is not big enough to read an entire batch
+throw new IllegalStateException(
+String.format(
+"Unable to read batch from file records buffer 
%s with maximum batch %s and record size %s",
+buffer,
+maxBatchSize,
+records.sizeInBytes()
+)
+);
+}
+} else {
+buffer = bufferSupplier.get(Math.min(maxBatchSize, 
records.sizeInBytes()));
+allocatedBuffer = Optional.of(buffer);
+

[GitHub] [kafka] dengziming commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-09 Thread GitBox


dengziming commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610932197



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+private List classNames;
+private List classTypes;
+
+@Test
+public void shouldInstantiateNewAssignors() {
+classNames = Collections.singletonList(StickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnNonAssignor() {
+classNames = Collections.singletonList(String.class.getName());
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+classNames = Collections.singletonList("Non-existent assignor");
+assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+}
+
+@Test
+public void shouldInstantiateFromListOfOldAndNewClassTypes() {

Review comment:
   I changed its name to `shouldInstantiateFromListOfClassTypes` and add a 
similar `shouldInstantiateFromClassType` to test "new" assignor, WDYT.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+private List classNames;
+private List classTypes;
+
+@Test
+public void shouldInstantiateNewAssignors() {
+classNames = Collections.singletonList(StickyAssignor.class.getName());
+List assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+assertTrue(assignors.get(0) instanceof StickyAssignor);

Review comment:
   Add `shouldInstantiateListOfAssignors`.

##
File path: docs/upgrade.html
##
@@ -49,8 +49,9 @@ Notable changes in 3
 were removed. These methods were not intended to be public API and 
there is no replacement.
 The NoOffsetForPar

[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r61098



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {

Review comment:
   I think most of the code will be the same but I can try to have to 
different implementations. Do you mind if I do that in a future PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610933083



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {

Review comment:
   Yes. Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-04-09 Thread GitBox


mjsax commented on pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#issuecomment-816995678


   Hotfix merged. Retriggered Jenkins for this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10517: HOTFIX: delete removed WindowedStore.put() method

2021-04-09 Thread GitBox


mjsax merged pull request #10517:
URL: https://github.com/apache/kafka/pull/10517


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on pull request #10429: KAFKA-10847: Add new RocksDBPrefixRangeIterator class

2021-04-09 Thread GitBox


spena commented on pull request #10429:
URL: https://github.com/apache/kafka/pull/10429#issuecomment-816987077


   Thanks @cadonna for the feedback. I decided not to use the prefix iterator. 
I will call the `all()` method, which will return the records ordered by time, 
and then I will delete those with a window close. It has the similar behavior 
that we want. The PR is here if you want to join the feedback party: 
https://github.com/apache/kafka/pull/10462


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena closed pull request #10429: KAFKA-10847: Add new RocksDBPrefixRangeIterator class

2021-04-09 Thread GitBox


spena closed pull request #10429:
URL: https://github.com/apache/kafka/pull/10429


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-04-09 Thread GitBox


mjsax commented on pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#issuecomment-816981220


   Not your fault... `trunk` is broken: 
https://github.com/apache/kafka/pull/10517


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #10482: KAFKA-12499: add transaction timeout verification

2021-04-09 Thread GitBox


abbccdda commented on a change in pull request #10482:
URL: https://github.com/apache/kafka/pull/10482#discussion_r610912167



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -1014,6 +1015,23 @@ protected StreamsConfig(final Map props,
 if (props.containsKey(RETRIES_CONFIG)) {
 log.warn("Configuration parameter `{}` is deprecated and will be 
removed in 3.0.0 release.", RETRIES_CONFIG);
 }
+
+if (eosEnabled) {
+verifyEOSTransactionTimeoutCompatibility();
+}
+}
+
+private void verifyEOSTransactionTimeoutCompatibility() {
+final long commitInterval = (long) 
originals().getOrDefault(COMMIT_INTERVAL_MS_CONFIG, 
EOS_DEFAULT_COMMIT_INTERVAL_MS);

Review comment:
   Yea, I think so.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-04-09 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r610910922



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -601,6 +601,9 @@ private[log] object LogCleanerManager extends Logging {
   // the active segment is always uncleanable
   Option(log.activeSegment.baseOffset),
 
+  // we do not want to clean past the high watermark
+  Option(log.highWatermark),

Review comment:
   I switched this to `lastStableOffset` replacing unstable offset above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-09 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r610911015



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1859,15 +1819,15 @@ private void appendBatch(
 offsetAndEpoch.offset + 1, Integer.MAX_VALUE);
 
 future.whenComplete((commitTimeMs, exception) -> {
-int numRecords = batch.records.size();
+int numRecords = batch.records.get().size();

Review comment:
   Good point, I think it may just be ignored somehow. I wrapped this in 
`if (batch.records.isPresent()) ` for good measure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-04-09 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r610910922



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -601,6 +601,9 @@ private[log] object LogCleanerManager extends Logging {
   // the active segment is always uncleanable
   Option(log.activeSegment.baseOffset),
 
+  // we do not want to clean past the high watermark
+  Option(log.highWatermark),

Review comment:
   I switched this to `lastStableOffset`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-04-09 Thread GitBox


hachikuji commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r610908966



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -601,6 +601,9 @@ private[log] object LogCleanerManager extends Logging {
   // the active segment is always uncleanable
   Option(log.activeSegment.baseOffset),
 
+  // we do not want to clean past the high watermark
+  Option(log.highWatermark),

Review comment:
   In practice, we do not allow the log start offset to get ahead of the 
high watermark, so I am not sure that is a real issue. Still, I cannot think of 
a strong reason to allow cleaning above the high watermark, so maybe it is 
simpler to prevent it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #10517: HOTFIX: delete removed WindowedStore.put() method

2021-04-09 Thread GitBox


mjsax opened a new pull request #10517:
URL: https://github.com/apache/kafka/pull/10517


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan edited a comment on pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-04-09 Thread GitBox


jolshan edited a comment on pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#issuecomment-814408153


   ~currently blocked on https://github.com/apache/kafka/pull/10492 
   (Need to add topic IDs to the metadata topic for fetching)~
   
   No longer blocked


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-04-09 Thread GitBox


mjsax commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r610904483



##
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##
@@ -247,6 +278,32 @@ public void 
shouldDetermineInternalTopicBasedOnTopicName1() {
 
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
 }
 
+@Test
+public void testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() {
+final long[] beginningAndEndOffsets = {0L, 5L, 10L, 20L};

Review comment:
   Fair enough :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610893784



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -161,7 +181,7 @@ public void process(final K key, final V1 value) {
 //
 // the condition below allows us to process the late 
record without the need
 // to hold it in the temporary outer store
-if (timeTo < maxStreamTime) {
+if (internalOuterJoinFixDisabled || timeTo < 
maxStreamTime) {

Review comment:
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] MarcoLotz commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-04-09 Thread GitBox


MarcoLotz commented on pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#issuecomment-816957853


   updated the PR and rebased with trunk. Build seems to be failing on trunk 
due to RocksDBTimeOrderedWindowStore.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610891690



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
 while (iter.hasNext()) {
 needOuterJoin = false;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+outerJoinWindowStore.ifPresent(store -> {
+// Delete the other joined key from the outer 
non-joined store now to prevent
+// further processing
+final KeyAndJoinSide otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {
+store.put(otherJoinKey, null, 
otherRecordTimestamp);
+}
+});
 }
 
 if (needOuterJoin) {
-context().forward(key, joiner.apply(key, value, null));
+// The maxStreamTime contains the max time observed in 
both sides of the join.
+// Having access to the time observed in the other join 
side fixes the following
+// problem:
+//
+// Say we have a window size of 5 seconds
+//  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to 
the outer-join store
+//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to 
the outer-join store
+//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and 
T2 should be emitted, but
+// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+//
+// See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+//
+// the condition below allows us to process the late 
record without the need
+// to hold it in the temporary outer store
+if (timeTo < maxStreamTime) {
+context().forward(key, joiner.apply(key, value, null));
+} else {
+outerJoinWindowStore.ifPresent(store -> store.put(
+KeyAndJoinSide.make(thisJoin, key),
+makeValueOrOtherValue(thisJoin, value),
+inputRecordTimestamp));
+}
+}
+
+outerJoinWindowStore.ifPresent(store -> {
+// only emit left/outer non-joined if the stream time has 
advanced (inputRecordTime = maxStreamTime)
+// if the current record is late, then there is no need to 
check for expired records
+if (inputRecordTimestamp == maxStreamTime) {
+maybeEmitOuterExpiryRecords(store, maxStreamTime);
+}
+});
+}
+}
+
+private ValueOrOtherValue makeValueOrOtherValue(final boolean 
thisJoin, final V1 value) {
+return thisJoin
+? ValueOrOtherValue.makeValue(value)
+: ValueOrOtherValue.makeOtherValue(value);
+}
+
+@SuppressWarnings("unchecked")
+private void maybeEmitOuterExpiryRecords(final 
WindowStore, ValueOrOtherValue> store, final long 
maxStreamTime) {
+try (final KeyValueIterator>, 
ValueOrOtherValue> it = store.all()) {
+while (it.hasNext()) {
+final KeyValue>, 
ValueOrOtherValue> e = it.next();
+
+// Skip next records if the oldest record has not expired 
yet
+if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+break;
+}
+
+final K key = e.key.key().getKey();
+
+// Emit the record by joining with a null value. But the 
order varies depending whether
+// this join is using a reverse joiner or not. Also 
wheth

[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610891590



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -92,6 +113,10 @@ public void process(final K key, final V1 value) {
 return;
 }
 
+// maxObservedStreamTime is updated and shared between left and 
right sides, so we can
+// process a non-join record immediately if it is late
+final long maxStreamTime = maxObservedStreamTime.updateAndGet(time 
-> Math.max(time, context().timestamp()));

Review comment:
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610891253



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -38,20 +45,32 @@
 private final String otherWindowName;
 private final long joinBeforeMs;
 private final long joinAfterMs;
+private final long joinGraceMs;
 
 private final ValueJoinerWithKey joiner;
 private final boolean outer;
+private final Optional outerJoinWindowName;
+private final AtomicLong maxObservedStreamTime;
+private final boolean thisJoin;

Review comment:
   Are you suggesting using two bool variables?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610891116



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -118,20 +132,40 @@
 final ProcessorGraphNode otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
 builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+Optional, 
ValueOrOtherValue>>> outerJoinWindowStore = Optional.empty();
+if (leftOuter || rightOuter) {
+final String outerJoinSuffix = "-shared-outer-join-store";
+final String outerJoinStoreGeneratedName = 
builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);
+final String outerJoinStoreName = userProvidedBaseStoreName == 
null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + 
outerJoinSuffix;
+
+outerJoinWindowStore = 
Optional.of(outerJoinWindowStoreBuilder(outerJoinStoreName, windows, 
streamJoinedInternal));
+}
+
+// Time shared between joins to keep track of the maximum stream time

Review comment:
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610890964



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
 return builder;
 }
 
+@SuppressWarnings("unchecked")
+private static  StoreBuilder, 
ValueOrOtherValue>> outerJoinWindowStoreBuilder(final String storeName,
+   
final JoinWindows windows,
+   
final 
StreamJoinedInternal streamJoinedInternal) {
+final StoreBuilder, 
ValueOrOtherValue>> builder = new 
TimeOrderedWindowStoreBuilder, ValueOrOtherValue>(
+persistentTimeOrderedWindowStore(
+storeName + "-store",
+Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+Duration.ofMillis(windows.size())
+),
+new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde()),

Review comment:
   Are you talking about the ValueOrOtherValueSerde -> 
LeftOrRightValueSerde? If so, then Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610890414



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if 
the key is
+ * part of the left join (true) or right join (false). This class is only 
useful when a state
+ * store needs to be shared between left and right processors, and each 
processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide {
+private final K key;
+private final boolean thisJoin;

Review comment:
   I changed to leftJoin. But I seems you suggested adding two bool 
variables, one for left and another for rigth?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if 
the key is
+ * part of the left join (true) or right join (false). This class is only 
useful when a state
+ * store needs to be shared between left and right processors, and each 
processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide {
+private final K key;
+private final boolean thisJoin;
+
+private KeyAndJoinSide(final boolean thisJoin, final K key) {
+this.key = Objects.requireNonNull(key, "key is null");
+this.thisJoin = thisJoin;
+}
+
+/**
+ * Create a new {@link KeyAndJoinSide} instance if the provide {@code key} 
is not {@code null}.
+ *
+ * @param thisJoin True if the key is part of the left topic (reference as 
thisJoin in {@code KStreamImplJoin})
+ * @param key  the key
+ * @param   the type of the key
+ * @return a new {@link KeyAndJoinSide} instance if the provide {@code 
key} is not {@code null}
+ */
+public static  KeyAndJoinSide make(final boolean thisJoin, final K 
key) {
+return new KeyAndJoinSide<>(thisJoin, key);
+}
+
+public boolean isThisJoin() {
+return thisJoin;
+}
+
+public K getKey() {
+return key;
+}
+
+@Override
+public String toString() {
+return "<" + thisJoin + "," + key + ">";

Review comment:
   Done

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License 

[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-04-09 Thread GitBox


MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r610879225



##
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##
@@ -76,6 +78,35 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
 assertEquals(3, records.count());
 }
 
+@Test
+public void testResetOffsetToSpecificOffsetWhenAfterEndOffset() {
+final long[] beginningOffsets = {0L, 5L, 10L, 20L};
+final long[] endOffsets = {0L, 5L, 10L, 20L};
+// Test on multiple beginning and end offset combinations
+for (int beginningOffsetIndex = 0; beginningOffsetIndex < 
beginningOffsets.length; beginningOffsetIndex++) {

Review comment:
   Sure. Will simplify then




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-04-09 Thread GitBox


MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r610877439



##
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##
@@ -247,6 +278,32 @@ public void 
shouldDetermineInternalTopicBasedOnTopicName1() {
 
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
 }
 
+@Test
+public void testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() {
+final long[] beginningAndEndOffsets = {0L, 5L, 10L, 20L};

Review comment:
   On this one, seems to me that only one value is sufficient. As the test 
describes `testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset`, the 
partition is empty - thus beginning offset == end offset. I will change to have 
a beginning offset equals to 5 anyways.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #10381: KAFKA-8410: Migrating KStream Stateless operators to new Processor API

2021-04-09 Thread GitBox


vvcephei merged pull request #10381:
URL: https://github.com/apache/kafka/pull/10381


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10301: KAFKA-9831: increase max.poll.interval.ms to avoid unexpected rebalance

2021-04-09 Thread GitBox


mjsax commented on pull request #10301:
URL: https://github.com/apache/kafka/pull/10301#issuecomment-816928393


   Thanks for the PR @showuon! -- Hope we get this test finally stable!
   
   Merged to `trunk` and cherry-picked to `2.8` and `2.7` branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Issue Comment Deleted] (KAFKA-9988) Connect incorrectly logs that task has failed when one takes too long to shutdown

2021-04-09 Thread Kalpesh Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kalpesh Patel updated KAFKA-9988:
-
Comment: was deleted

(was: https://github.com/apache/kafka/pull/10503/)

> Connect incorrectly logs that task has failed when one takes too long to 
> shutdown
> -
>
> Key: KAFKA-9988
> URL: https://issues.apache.org/jira/browse/KAFKA-9988
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.2.3, 2.5.0, 2.3.2, 2.4.1, 2.4.2, 
> 2.5.1
>Reporter: Sanjana Kaundinya
>Assignee: Kalpesh Patel
>Priority: Major
>  Labels: newbie
>
> If the OffsetStorageReader is closed while the task is trying to shutdown, 
> and the task is trying to access the offsets from the OffsetStorageReader, 
> then we see the following in the logs.
> {code:java}
> [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
> at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader 
> closed while attempting to read offsets. This is likely because the task was 
> been scheduled to stop but has taken longer than the graceful shutdown period 
> to do so.
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
> ... 14 more
> [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task is 
> being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> This is a bit misleading, because the task is already on its way of being 
> shutdown, and doesn't actually need manual intervention to be restarted. We 
> can see that as later on in the logs we see that it throws another 
> unrecoverable exception.
> {code:java}
> [2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=connector-18} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> If we know a task is on its way of shutting down, we should not throw a 
> ConnectException and instead log a warning so that we don't log false 
> negatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on a change in pull request #10503: KAFKA-9988: Suppress uncaught exceptions in log messages during task shutdown

2021-04-09 Thread GitBox


C0urante commented on a change in pull request #10503:
URL: https://github.com/apache/kafka/pull/10503#discussion_r610857776



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##
@@ -185,8 +185,12 @@ private void doRun() throws InterruptedException {
 
 execute();
 } catch (Throwable t) {
-log.error("{} Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted", this, t);
-throw t;
+if (!stopping && !cancelled) {

Review comment:
   Looks good to me!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10503: KAFKA-9988: Suppress uncaught exceptions in log messages during task shutdown

2021-04-09 Thread GitBox


kpatelatwork commented on a change in pull request #10503:
URL: https://github.com/apache/kafka/pull/10503#discussion_r610857516



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##
@@ -185,8 +185,12 @@ private void doRun() throws InterruptedException {
 
 execute();
 } catch (Throwable t) {
-log.error("{} Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted", this, t);
-throw t;
+if (!stopping && !cancelled) {

Review comment:
   @C0urante  please check now, I reworded the message and added a new case 
for cancelled vs stopped after our morning discussion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10301: KAFKA-9831: increase max.poll.interval.ms to avoid unexpected rebalance

2021-04-09 Thread GitBox


mjsax merged pull request #10301:
URL: https://github.com/apache/kafka/pull/10301


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12566) Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication

2021-04-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17318218#comment-17318218
 ] 

Matthias J. Sax commented on KAFKA-12566:
-

[https://github.com/apache/kafka/pull/10301/checks?check_run_id=2284331932] 

> Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication
> -
>
> Key: KAFKA-12566
> URL: https://issues.apache.org/jira/browse/KAFKA-12566
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
>  
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 2. 
> Offsets not translated downstream to primary cluster. ==> expected:  
> but was:  at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:289)
> {code}
> {{LOGs}}
> {quote}[2021-03-26 03:28:06,157] ERROR Could not check connector state info. 
> (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) 
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> read connector state. Error response: \{"error_code":404,"message":"No status 
> found for connector MirrorSourceConnector"} at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:479)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286)
>  at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:470)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:227){quote}
> and
> {quote}[2021-03-26 03:30:41,524] ERROR [MirrorHeartbeatConnector|task-0] 
> Graceful stop of task MirrorHeartbeatConnector-0 failed. 
> (org.apache.kafka.connect.runtime.Worker:866) [2021-03-26 03:30:41,527] ERROR 
> [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to 
> heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:372) 
> org.apache.kafka.common.KafkaException: Producer is closed forcefully. at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750)
>  at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282) 
> at java.lang.Thread.run(Thread.java:748) [2021-03-26 03:30:42,248] ERROR 
> [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} Failed to flush, timed out 
> while waiting for producer to flush outstanding 1 messages 
> (org.apache.kafka.connect.runtime.WorkerSourceTask:512){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #10423: MINOR website quickstart, fix typo

2021-04-09 Thread GitBox


mjsax commented on pull request #10423:
URL: https://github.com/apache/kafka/pull/10423#issuecomment-816904214


   Thanks @Alee4738! Merged both PRs. (Cherry-picked this one also to `2.8` 
branch.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10423: MINOR website quickstart, fix typo

2021-04-09 Thread GitBox


mjsax merged pull request #10423:
URL: https://github.com/apache/kafka/pull/10423


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-04-09 Thread GitBox


mjsax commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r610849890



##
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##
@@ -76,6 +78,35 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
 assertEquals(3, records.count());
 }
 
+@Test
+public void testResetOffsetToSpecificOffsetWhenAfterEndOffset() {
+final long[] beginningOffsets = {0L, 5L, 10L, 20L};
+final long[] endOffsets = {0L, 5L, 10L, 20L};
+// Test on multiple beginning and end offset combinations
+for (int beginningOffsetIndex = 0; beginningOffsetIndex < 
beginningOffsets.length; beginningOffsetIndex++) {

Review comment:
   I actually think that a single combination beginning=5 end=10 should be 
sufficient.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##
@@ -247,6 +278,32 @@ public void 
shouldDetermineInternalTopicBasedOnTopicName1() {
 
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
 }
 
+@Test
+public void testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() {
+final long[] beginningAndEndOffsets = {0L, 5L, 10L, 20L};

Review comment:
   So ensure that we really set to end, beginning and end should be set to 
different values, otherwise, we cannot distinguish both. Thus, similar to the 
test above, testing a single combination beginning=5 and end=10 seems 
sufficient?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12449) Remove deprecated WindowStore#put

2021-04-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12449:

Parent: KAFKA-12419
Issue Type: Sub-task  (was: Task)

> Remove deprecated WindowStore#put
> -
>
> Key: KAFKA-12449
> URL: https://issues.apache.org/jira/browse/KAFKA-12449
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
> Fix For: 3.0.0
>
>
> Related to KIP-474: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12449) Remove deprecated WindowStore#put

2021-04-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-12449.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

> Remove deprecated WindowStore#put
> -
>
> Key: KAFKA-12449
> URL: https://issues.apache.org/jira/browse/KAFKA-12449
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
> Fix For: 3.0.0
>
>
> Related to KIP-474: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #10293: KAFKA-12449: Remove deprecated WindowStore#put

2021-04-09 Thread GitBox


mjsax commented on pull request #10293:
URL: https://github.com/apache/kafka/pull/10293#issuecomment-816888464


   Thanks @jeqo! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10293: KAFKA-12449: Remove deprecated WindowStore#put

2021-04-09 Thread GitBox


mjsax merged pull request #10293:
URL: https://github.com/apache/kafka/pull/10293


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >