[GitHub] [kafka] dongjinleekr commented on a change in pull request #10826: KAFKA-7632: Support Compression Level

2021-06-16 Thread GitBox


dongjinleekr commented on a change in pull request #10826:
URL: https://github.com/apache/kafka/pull/10826#discussion_r652405099



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/CompressionConfig.java
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class CompressionConfig {
+private final CompressionType type;
+private final Integer level;
+
+public static CompressionConfig none() {
+return of(CompressionType.NONE);
+}
+
+public static CompressionConfig of(final CompressionType type) {
+return of(Objects.requireNonNull(type), null);
+}
+
+public static CompressionConfig of(final CompressionType type, final 
Integer level) {
+return new CompressionConfig(Objects.requireNonNull(type), level);
+}
+
+private CompressionConfig(final CompressionType type, final Integer level) 
{
+this.type = type;
+
+if (level != null && !type.isValidLevel(level.intValue())) {
+throw new IllegalArgumentException("Illegal level " + level + " 
for compression codec " + type.name);

Review comment:
   Let me have a look.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12951) Infinite loop while restoring a GlobalKTable

2021-06-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12951:

Affects Version/s: 2.8.0
   2.7.1

> Infinite loop while restoring a GlobalKTable
> 
>
> Key: KAFKA-12951
> URL: https://issues.apache.org/jira/browse/KAFKA-12951
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Damien Gasparina
>Assignee: Matthias J. Sax
>Priority: Major
>
> We encountered an issue a few time in some of our Kafka Streams application.
>  After an unexpected restart of our applications, some instances have not 
> been able to resume operating.
> They got stuck while trying to restore the state store of a GlobalKTable. The 
> only way to resume operating was to manually delete their `state.dir`.
> We observed the following timeline:
>  * After the restart of the Kafka Streams application, it tries to restore 
> its GlobalKTable
>  * It seeks to the last checkpoint available on the state.dir: 382 
> ([https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L259])
>  * The watermark ({{endOffset}} results) returned the offset 383 
> {code:java}
> handling ListOffsetResponse response for XX. Fetched offset 383, timestamp 
> -1{code}
>  * We enter the loop: 
> [https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L279]
>  * Then we invoked the {{poll()}}, but the poll returns nothing, so we enter: 
> [https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L306]
>  and we crash (x)
> {code:java}
> Global task did not make progress to restore state within 30 ms.{code}
>  * The POD restart, and we encounter the same issue until we manually delete 
> the {{state.dir}}
>  
> Regarding the topic, by leveraging the {{DumpLogSegment}} tool, I can see:
>  * {{Offset 381}} - Last business message received
>  * {{Offset 382}} - Txn COMMIT (last message)
> I think the real culprit is that the checkpoint is {{383}} instead of being 
> {{382}}. For information, this is a compacted topic, and just before the 
> outage, we encountered some ISR shrinking and leader changes.
> While experimenting with the API, it seems that the {{consumer.position()}} 
> call is a bit tricky, after a {{seek()}} and a {{poll()}}, it seems that the 
> {{position()}} is actually returning the seek position. After the {{poll()}} 
> call, even if no data is returned, the {{position()}} is returning the LSO. I 
> did an example on 
> [https://gist.github.com/Dabz/9aa0b4d1804397af6e7b6ad8cba82dcb] .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12951) Infinite loop while restoring a GlobalKTable

2021-06-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12951:

Description: 
We encountered an issue a few time in some of our Kafka Streams application.
 After an unexpected restart of our applications, some instances have not been 
able to resume operating.

They got stuck while trying to restore the state store of a GlobalKTable. The 
only way to resume operating was to manually delete their `state.dir`.

We observed the following timeline:
 * After the restart of the Kafka Streams application, it tries to restore its 
GlobalKTable
 * It seeks to the last checkpoint available on the state.dir: 382 
([https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L259])
 * The watermark ({{endOffset}} results) returned the offset 383 
{code:java}
handling ListOffsetResponse response for XX. Fetched offset 383, timestamp 
-1{code}

 * We enter the loop: 
[https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L279]
 * Then we invoked the {{poll()}}, but the poll returns nothing, so we enter: 
[https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L306]
 and we crash (x)
{code:java}
Global task did not make progress to restore state within 30 ms.{code}

 * The POD restart, and we encounter the same issue until we manually delete 
the {{state.dir}}

 

Regarding the topic, by leveraging the {{DumpLogSegment}} tool, I can see:
 * {{Offset 381}} - Last business message received
 * {{Offset 382}} - Txn COMMIT (last message)

I think the real culprit is that the checkpoint is {{383}} instead of being 
{{382}}. For information, the global topic is an transactional topic.

While experimenting with the API, it seems that the {{consumer.position()}} 
call is a bit tricky, after a {{seek()}} and a {{poll()}}, it seems that the 
{{position()}} is actually returning the seek position. After the {{poll()}} 
call, even if no data is returned, the {{position()}} is returning the LSO. I 
did an example on 
[https://gist.github.com/Dabz/9aa0b4d1804397af6e7b6ad8cba82dcb] .

  was:
We encountered an issue a few time in some of our Kafka Streams application.
 After an unexpected restart of our applications, some instances have not been 
able to resume operating.

They got stuck while trying to restore the state store of a GlobalKTable. The 
only way to resume operating was to manually delete their `state.dir`.

We observed the following timeline:
 * After the restart of the Kafka Streams application, it tries to restore its 
GlobalKTable
 * It seeks to the last checkpoint available on the state.dir: 382 
([https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L259])
 * The watermark ({{endOffset}} results) returned the offset 383 
{code:java}
handling ListOffsetResponse response for XX. Fetched offset 383, timestamp 
-1{code}

 * We enter the loop: 
[https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L279]
 * Then we invoked the {{poll()}}, but the poll returns nothing, so we enter: 
[https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L306]
 and we crash (x)
{code:java}
Global task did not make progress to restore state within 30 ms.{code}

 * The POD restart, and we encounter the same issue until we manually delete 
the {{state.dir}}

 

Regarding the topic, by leveraging the {{DumpLogSegment}} tool, I can see:
 * {{Offset 381}} - Last business message received
 * {{Offset 382}} - Txn COMMIT (last message)

I think the real culprit is that the checkpoint is {{383}} instead of being 
{{382}}. For information, this is a compacted topic, and just before the 
outage, we encountered some ISR shrinking and leader changes.

While experimenting with the API, it seems that the {{consumer.position()}} 
call is a bit tricky, after a {{seek()}} and a {{poll()}}, it seems that the 
{{position()}} is actually returning the seek position. After the {{poll()}} 
call, even if no data is returned, the {{position()}} is returning the LSO. I 
did an example on 
[https://gist.github.com/Dabz/9aa0b4d1804397af6e7b6ad8cba82dcb] .


> Infinite loop while restoring a GlobalKTable
> 
>
> Key: KAFKA-12951
> URL: https://issues.apache.org/jira/browse/KAFKA-12951
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Damien Gasparina

[jira] [Updated] (KAFKA-12951) Infinite loop while restoring a GlobalKTable

2021-06-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12951:

Description: 
We encountered an issue a few time in some of our Kafka Streams application.
 After an unexpected restart of our applications, some instances have not been 
able to resume operating.

They got stuck while trying to restore the state store of a GlobalKTable. The 
only way to resume operating was to manually delete their `state.dir`.

We observed the following timeline:
 * After the restart of the Kafka Streams application, it tries to restore its 
GlobalKTable
 * It seeks to the last checkpoint available on the state.dir: 382 
([https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L259])
 * The watermark ({{endOffset}} results) returned the offset 383 
{code:java}
handling ListOffsetResponse response for XX. Fetched offset 383, timestamp 
-1{code}

 * We enter the loop: 
[https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L279]
 * Then we invoked the {{poll()}}, but the poll returns nothing, so we enter: 
[https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L306]
 and we crash (x)
{code:java}
Global task did not make progress to restore state within 30 ms.{code}

 * The POD restart, and we encounter the same issue until we manually delete 
the {{state.dir}}

 

Regarding the topic, by leveraging the {{DumpLogSegment}} tool, I can see:
 * {{Offset 381}} - Last business message received
 * {{Offset 382}} - Txn COMMIT (last message)

I think the real culprit is that the checkpoint is {{383}} instead of being 
{{382}}. For information, the global topic is a *transactional topic*.

While experimenting with the API, it seems that the {{consumer.position()}} 
call is a bit tricky, after a {{seek()}} and a {{poll()}}, it seems that the 
{{position()}} is actually returning the seek position. After the {{poll()}} 
call, even if no data is returned, the {{position()}} is returning the LSO. I 
did an example on 
[https://gist.github.com/Dabz/9aa0b4d1804397af6e7b6ad8cba82dcb] .

  was:
We encountered an issue a few time in some of our Kafka Streams application.
 After an unexpected restart of our applications, some instances have not been 
able to resume operating.

They got stuck while trying to restore the state store of a GlobalKTable. The 
only way to resume operating was to manually delete their `state.dir`.

We observed the following timeline:
 * After the restart of the Kafka Streams application, it tries to restore its 
GlobalKTable
 * It seeks to the last checkpoint available on the state.dir: 382 
([https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L259])
 * The watermark ({{endOffset}} results) returned the offset 383 
{code:java}
handling ListOffsetResponse response for XX. Fetched offset 383, timestamp 
-1{code}

 * We enter the loop: 
[https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L279]
 * Then we invoked the {{poll()}}, but the poll returns nothing, so we enter: 
[https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L306]
 and we crash (x)
{code:java}
Global task did not make progress to restore state within 30 ms.{code}

 * The POD restart, and we encounter the same issue until we manually delete 
the {{state.dir}}

 

Regarding the topic, by leveraging the {{DumpLogSegment}} tool, I can see:
 * {{Offset 381}} - Last business message received
 * {{Offset 382}} - Txn COMMIT (last message)

I think the real culprit is that the checkpoint is {{383}} instead of being 
{{382}}. For information, the global topic is an transactional topic.

While experimenting with the API, it seems that the {{consumer.position()}} 
call is a bit tricky, after a {{seek()}} and a {{poll()}}, it seems that the 
{{position()}} is actually returning the seek position. After the {{poll()}} 
call, even if no data is returned, the {{position()}} is returning the LSO. I 
did an example on 
[https://gist.github.com/Dabz/9aa0b4d1804397af6e7b6ad8cba82dcb] .


> Infinite loop while restoring a GlobalKTable
> 
>
> Key: KAFKA-12951
> URL: https://issues.apache.org/jira/browse/KAFKA-12951
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Damien Gasparina
>Assignee: Matthias J. Sax
>Priority: Maj

[jira] [Created] (KAFKA-12954) Add Support for Scala 3 in 4.0.0

2021-06-16 Thread Josep Prat (Jira)
Josep Prat created KAFKA-12954:
--

 Summary: Add Support for Scala 3 in 4.0.0
 Key: KAFKA-12954
 URL: https://issues.apache.org/jira/browse/KAFKA-12954
 Project: Kafka
  Issue Type: Improvement
  Components: build
Reporter: Josep Prat
 Fix For: 4.0.0


This is a follow up task from 
https://issues.apache.org/jira/browse/KAFKA-12895, in which Scala 2.12 support 
will be dropped.

It would be good to, at the same time, add support for Scala 3.
Initially it would be enough to only make the code compile with Scala 3 so we 
can generate the proper Scala 3 artifacts, this might be achieved with the 
proper compiler flags and an occasional rewrite.
Follow up tasks could be created to migrate to a more idiomatic Scala 3 writing 
if desired.

If I understand it correctly, this would need a KIP as we are modifying the 
public interfaces (new artifacts). If this is the case, let me know  and I will 
write it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12954) Add Support for Scala 3 in 4.0.0

2021-06-16 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat reassigned KAFKA-12954:
--

Assignee: Josep Prat

> Add Support for Scala 3 in 4.0.0
> 
>
> Key: KAFKA-12954
> URL: https://issues.apache.org/jira/browse/KAFKA-12954
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Josep Prat
>Assignee: Josep Prat
>Priority: Major
>  Labels: needs-kip
> Fix For: 4.0.0
>
>
> This is a follow up task from 
> https://issues.apache.org/jira/browse/KAFKA-12895, in which Scala 2.12 
> support will be dropped.
> It would be good to, at the same time, add support for Scala 3.
> Initially it would be enough to only make the code compile with Scala 3 so we 
> can generate the proper Scala 3 artifacts, this might be achieved with the 
> proper compiler flags and an occasional rewrite.
> Follow up tasks could be created to migrate to a more idiomatic Scala 3 
> writing if desired.
> If I understand it correctly, this would need a KIP as we are modifying the 
> public interfaces (new artifacts). If this is the case, let me know  and I 
> will write it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12955) Fix LogLoader to pass materialized list of segments for deletion

2021-06-16 Thread Kowshik Prakasam (Jira)
Kowshik Prakasam created KAFKA-12955:


 Summary: Fix LogLoader to pass materialized list of segments for 
deletion
 Key: KAFKA-12955
 URL: https://issues.apache.org/jira/browse/KAFKA-12955
 Project: Kafka
  Issue Type: Sub-task
Reporter: Kowshik Prakasam


Within {{LogLoader.removeAndDeleteSegmentsAsync()}}, we should force 
materialization of the {{segmentsToDelete}} iterable, to make sure the results 
of the iteration remain valid and deterministic. We should also pass only the 
materialized view to the logic that deletes the segments.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12955) Fix LogLoader to pass materialized list of segments for deletion

2021-06-16 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam updated KAFKA-12955:
-
Description: Within {{LogLoader.removeAndDeleteSegmentsAsync()}}, we should 
force materialization of the {{segmentsToDelete}} iterable, to make sure the 
results of the iteration remain valid and deterministic. We should also pass 
only the materialized view to the logic that deletes the segments. Otherwise, 
we could end up deleting the wrong segments asynchronously.  (was: Within 
{{LogLoader.removeAndDeleteSegmentsAsync()}}, we should force materialization 
of the {{segmentsToDelete}} iterable, to make sure the results of the iteration 
remain valid and deterministic. We should also pass only the materialized view 
to the logic that deletes the segments.)

> Fix LogLoader to pass materialized list of segments for deletion
> 
>
> Key: KAFKA-12955
> URL: https://issues.apache.org/jira/browse/KAFKA-12955
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Priority: Critical
>
> Within {{LogLoader.removeAndDeleteSegmentsAsync()}}, we should force 
> materialization of the {{segmentsToDelete}} iterable, to make sure the 
> results of the iteration remain valid and deterministic. We should also pass 
> only the materialized view to the logic that deletes the segments. Otherwise, 
> we could end up deleting the wrong segments asynchronously.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik opened a new pull request #10888: KAFKA-12955: Fix LogLoader to pass materialized view of segments for deletion

2021-06-16 Thread GitBox


kowshik opened a new pull request #10888:
URL: https://github.com/apache/kafka/pull/10888


   Within `LogLoader.removeAndDeleteSegmentsAsync()`, we should force 
materialization of the `segmentsToDelete` iterable, to make sure the results of 
the iteration remain valid and deterministic. We should also pass only the 
materialized view to the logic that deletes the segments.
   
   **Tests:**
   Added the missing unit test coverage to 
`LogLoaderTest.testLogEndLessThanStartAfterReopen()`.
   Previously the test was in `LogTest` suite but it has been moved over now to 
`LogLoaderTest` suite, as it deserves to be there instead.
   This test fails without the fix, but passes with this fix:
   ```
   @Test
   def testLogEndLessThanStartAfterReopen(): Unit = {
  ...
  ...
  ...
  // Validate that the remaining segment matches our expectations
  val onlySegment = log.segments.firstSegment.get
  assertEquals(startOffset, onlySegment.baseOffset)
  assertTrue(onlySegment.log.file().exists())
  assertTrue(onlySegment.lazyOffsetIndex.file.exists())
  assertTrue(onlySegment.lazyTimeIndex.file.exists())
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12955) Fix LogLoader to pass materialized view of segments for deletion

2021-06-16 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam updated KAFKA-12955:
-
Summary: Fix LogLoader to pass materialized view of segments for deletion  
(was: Fix LogLoader to pass materialized list of segments for deletion)

> Fix LogLoader to pass materialized view of segments for deletion
> 
>
> Key: KAFKA-12955
> URL: https://issues.apache.org/jira/browse/KAFKA-12955
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Priority: Critical
>
> Within {{LogLoader.removeAndDeleteSegmentsAsync()}}, we should force 
> materialization of the {{segmentsToDelete}} iterable, to make sure the 
> results of the iteration remain valid and deterministic. We should also pass 
> only the materialized view to the logic that deletes the segments. Otherwise, 
> we could end up deleting the wrong segments asynchronously.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik commented on pull request #10888: KAFKA-12955: Fix LogLoader to pass materialized view of segments for deletion

2021-06-16 Thread GitBox


kowshik commented on pull request #10888:
URL: https://github.com/apache/kafka/pull/10888#issuecomment-862144734


   cc @junrao @dhruvilshah3 @ccding  for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-06-16 Thread GitBox


lkokhreidze commented on pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#issuecomment-862166415


   gentle nudge @cadonna @vvcephei @ableegoldman


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 commented on a change in pull request #10588: KAFKA-12662: add unit test for ProducerPerformance

2021-06-16 Thread GitBox


tang7526 commented on a change in pull request #10588:
URL: https://github.com/apache/kafka/pull/10588#discussion_r652487057



##
File path: 
tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
##
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+public class ProducerPerformanceTest {
+
+@Mock
+KafkaProducer producerMock;
+
+@Spy
+ProducerPerformance producerPerformanceSpy;
+
+private File createTempFile(String contents) throws IOException {
+File file = File.createTempFile("ProducerPerformanceTest", ".tmp");
+file.deleteOnExit();
+Files.write(file.toPath(), contents.getBytes());
+return file;
+}
+
+@Test
+public void testReadPayloadFile() throws Exception {
+File payloadFile = createTempFile("Hello\nKafka");
+String payloadFilePath = payloadFile.getAbsolutePath();
+String payloadDelimiter = "\n";
+
+List payloadByteList = 
ProducerPerformance.readPayloadFile(payloadFilePath, payloadDelimiter);
+
+assertEquals(2, payloadByteList.size());
+assertEquals("Hello", new String(payloadByteList.get(0)));
+assertEquals("Kafka", new String(payloadByteList.get(1)));
+}
+
+@Test
+public void testReadProps() throws Exception {
+
+List producerProps = 
Collections.singletonList("bootstrap.servers=localhost:9000");
+String producerConfig = createTempFile("acks=1").getAbsolutePath();
+String transactionalId = "1234";
+boolean transactionsEnabled = true;
+
+Properties prop = ProducerPerformance.readProps(producerProps, 
producerConfig, transactionalId, transactionsEnabled);
+
+assertNotNull(prop);
+assertEquals(5, prop.size());
+}
+
+@Test
+public void testNumberOfCallsForSendAndClose() throws IOException {
+
+doReturn(null).when(producerMock).send(any(), any());
+
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
+
+String[] args = new String[] {"--topic", "Hello-Kafka", 
"--num-records", "5", "--throughput", "100", "--record-size", "100", 
"--producer-props", "bootstrap.servers=localhost:9000"};
+producerPerformanceSpy.start(args);
+verify(producerMock, times(5)).send(any(), any());
+verify(producerMock, times(1)).close();
+}
+
+@Test
+public void testUnexpectedArg() {
+
+String[] args = new String[] {"--test", "test", "--topic", 
"Hello-Kafka", "--num-records", "5", "--throughput", "100", "--record-size", 
"100", "--producer-props", "bootstrap.servers=localhost:9000"};
+ArgumentParser parser = ProducerPerformance.argParser();
+ArgumentParserException thrown = 
assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args));
+assertEquals("unrecognized arguments: '--test'", thrown.getMessage());
+}
+
+@Test
+public void testGenerateRandomPayloadByPayloadFile() {
+
+Integer recordSize = null;
+String inputString = "Hell

[GitHub] [kafka] tang7526 commented on a change in pull request #10588: KAFKA-12662: add unit test for ProducerPerformance

2021-06-16 Thread GitBox


tang7526 commented on a change in pull request #10588:
URL: https://github.com/apache/kafka/pull/10588#discussion_r652487133



##
File path: tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
##
@@ -190,8 +159,66 @@ public static void main(String[] args) throws Exception {
 
 }
 
+KafkaProducer createKafkaProducer(Properties props) {
+return new KafkaProducer<>(props);
+}
+
+static byte[] generateRandomPayload(Integer recordSize, List 
payloadByteList, byte[] payload,
+Random random) {
+if (!payloadByteList.isEmpty()) {
+payload = 
payloadByteList.get(random.nextInt(payloadByteList.size()));
+} else if (recordSize != null) {
+for (int j = 0; j < payload.length; ++j)
+payload[j] = (byte) (random.nextInt(26) + 65);
+} else {
+throw new IllegalArgumentException("no payload File Path or record 
Size provided");
+}
+return payload;
+}
+
+static Properties readProps(List producerProps, String 
producerConfig, String transactionalId,
+boolean transactionsEnabled) throws IOException {
+Properties props = new Properties();
+if (producerConfig != null) {
+props.putAll(Utils.loadProps(producerConfig));
+}
+if (producerProps != null)
+for (String prop : producerProps) {
+String[] pieces = prop.split("=");
+if (pieces.length != 2)
+throw new IllegalArgumentException("Invalid property: " + 
prop);
+props.put(pieces[0], pieces[1]);
+}
+
+props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+if (transactionsEnabled)

Review comment:
   Done.

##
File path: 
tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
##
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+public class ProducerPerformanceTest {
+
+@Mock
+KafkaProducer producerMock;
+
+@Spy
+ProducerPerformance producerPerformanceSpy;
+
+private File createTempFile(String contents) throws IOException {
+File file = File.createTempFile("ProducerPerformanceTest", ".tmp");
+file.deleteOnExit();
+Files.write(file.toPath(), contents.getBytes());
+return file;
+}
+
+@Test
+public void testReadPayloadFile() throws Exception {
+File payloadFile = createTempFile("Hello\nKafka");
+String payloadFilePath = payloadFile.getAbsolutePath();
+String payloadDelimiter = "\n";
+
+List payloadByteList = 
ProducerPerformance.readPayloadFile(payloadFilePath, payloadDelimiter);
+
+assertEquals(2, payloadByteList.size());
+assertEquals("Hello", new String(payloadByteList.get(0)));
+assertEquals("Kafka", new String(payloadByteL

[jira] [Assigned] (KAFKA-12883) Adress KIP-100 type constraints now that Java 7 support is dropped

2021-06-16 Thread Nicolas Guignard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Guignard reassigned KAFKA-12883:


Assignee: (was: Nicolas Guignard)

> Adress KIP-100 type constraints now that Java 7 support is dropped
> --
>
> Key: KAFKA-12883
> URL: https://issues.apache.org/jira/browse/KAFKA-12883
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Priority: Minor
>  Labels: StarterProject, newbie++
>
> As part of [KIP-100 rejected 
> alternatives|https://cwiki.apache.org/confluence/display/KAFKA/KIP-100+-+Relax+Type+constraints+in+Kafka+Streams+API#KIP100RelaxTypeconstraintsinKafkaStreamsAPI-RejectedAlternatives],
>  we suggested a more correct alternative to the type constraints for some of 
> the {{KStream}} methods.
> Unfortunately at the time, there was a Java 7 compiler behavior that 
> prevented us from using those type constraints, so we had to relax them in 
> order to preserve backwards compatibility.
> As part of the KIP it was mentioned that:
> ??Once we drop support for 1.7 we can always decide to switch to approach 2. 
> without breaking source compatibility, by making a proposal similar to this 
> KIP.??
> Since Java 7 support has been dropped a while ago, it would be a good time to 
> revisit this and possibly switch to the alternative type constraints. The 
> change should be source compatible, although the streams APIs have 
> significantly evolved since, so there might be some additional investigation 
> required to ensure that is still the case and also covers the Scala Streams 
> APIs.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12883) Adress KIP-100 type constraints now that Java 7 support is dropped

2021-06-16 Thread Nicolas Guignard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Guignard reassigned KAFKA-12883:


Assignee: Nicolas Guignard

> Adress KIP-100 type constraints now that Java 7 support is dropped
> --
>
> Key: KAFKA-12883
> URL: https://issues.apache.org/jira/browse/KAFKA-12883
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Assignee: Nicolas Guignard
>Priority: Minor
>  Labels: StarterProject, newbie++
>
> As part of [KIP-100 rejected 
> alternatives|https://cwiki.apache.org/confluence/display/KAFKA/KIP-100+-+Relax+Type+constraints+in+Kafka+Streams+API#KIP100RelaxTypeconstraintsinKafkaStreamsAPI-RejectedAlternatives],
>  we suggested a more correct alternative to the type constraints for some of 
> the {{KStream}} methods.
> Unfortunately at the time, there was a Java 7 compiler behavior that 
> prevented us from using those type constraints, so we had to relax them in 
> order to preserve backwards compatibility.
> As part of the KIP it was mentioned that:
> ??Once we drop support for 1.7 we can always decide to switch to approach 2. 
> without breaking source compatibility, by making a proposal similar to this 
> KIP.??
> Since Java 7 support has been dropped a while ago, it would be a good time to 
> revisit this and possibly switch to the alternative type constraints. The 
> change should be source compatible, although the streams APIs have 
> significantly evolved since, so there might be some additional investigation 
> required to ensure that is still the case and also covers the Scala Streams 
> APIs.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12954) Add Support for Scala 3 in 4.0.0

2021-06-16 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364303#comment-17364303
 ] 

Ismael Juma commented on KAFKA-12954:
-

Thanks for the ticket. One thing to evaluate is the cost of supporting Scala 3 
and Scala 2.13 at the same time. If the code changes required are minimal, it 
makes sense. However, if they are more extensive, we should also evaluate 
whether we'd rather go to Java instead.

> Add Support for Scala 3 in 4.0.0
> 
>
> Key: KAFKA-12954
> URL: https://issues.apache.org/jira/browse/KAFKA-12954
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Josep Prat
>Assignee: Josep Prat
>Priority: Major
>  Labels: needs-kip
> Fix For: 4.0.0
>
>
> This is a follow up task from 
> https://issues.apache.org/jira/browse/KAFKA-12895, in which Scala 2.12 
> support will be dropped.
> It would be good to, at the same time, add support for Scala 3.
> Initially it would be enough to only make the code compile with Scala 3 so we 
> can generate the proper Scala 3 artifacts, this might be achieved with the 
> proper compiler flags and an occasional rewrite.
> Follow up tasks could be created to migrate to a more idiomatic Scala 3 
> writing if desired.
> If I understand it correctly, this would need a KIP as we are modifying the 
> public interfaces (new artifacts). If this is the case, let me know  and I 
> will write it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12956) Validate the snapshot id when the state machine freeze a snapshot

2021-06-16 Thread Haoran Xuan (Jira)
Haoran Xuan created KAFKA-12956:
---

 Summary: Validate the snapshot id when the state machine freeze a 
snapshot
 Key: KAFKA-12956
 URL: https://issues.apache.org/jira/browse/KAFKA-12956
 Project: Kafka
  Issue Type: Sub-task
Reporter: Haoran Xuan


This is similar to KAFKA-10800, in this PR, optionally validate the snapshot id 
when `onSnapshotFrozen` is being called. The validation logic will be 
implemented in KAFKA-10800, and this Jira is supposed to directly call that 
logic.

Currently, the `onSnapshotFrozen` can be called by `KafkaRaftClient` and 
`SnapshotWriter`. Do not validate if it is called by `KafkaRaftClient` when 
it's downloading snapshot from leader, do validate if it is called by  
`SnapshotWriter` which implies generating a specific snapshot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12954) Add Support for Scala 3 in 4.0.0

2021-06-16 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364310#comment-17364310
 ] 

Josep Prat commented on KAFKA-12954:


Absolutely, according to the Scala Center most of the code that compiles for 
Scala 2.13 does compile for Scala 3. So I expect this to be simply about 
setting the right flags and the occasional automatic rewrite Scala Center 
provides.

[~ijuma], am I right assuming it needs a KIP?

I will try it on my fork to validate my assumptions that is a low impact change.

> Add Support for Scala 3 in 4.0.0
> 
>
> Key: KAFKA-12954
> URL: https://issues.apache.org/jira/browse/KAFKA-12954
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Josep Prat
>Assignee: Josep Prat
>Priority: Major
>  Labels: needs-kip
> Fix For: 4.0.0
>
>
> This is a follow up task from 
> https://issues.apache.org/jira/browse/KAFKA-12895, in which Scala 2.12 
> support will be dropped.
> It would be good to, at the same time, add support for Scala 3.
> Initially it would be enough to only make the code compile with Scala 3 so we 
> can generate the proper Scala 3 artifacts, this might be achieved with the 
> proper compiler flags and an occasional rewrite.
> Follow up tasks could be created to migrate to a more idiomatic Scala 3 
> writing if desired.
> If I understand it correctly, this would need a KIP as we are modifying the 
> public interfaces (new artifacts). If this is the case, let me know  and I 
> will write it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12678) Flaky Test CustomQuotaCallbackTest.testCustomQuotaCallback

2021-06-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364360#comment-17364360
 ] 

Matthias J. Sax commented on KAFKA-12678:
-

Failed again: 
[https://github.com/apache/kafka/pull/10861/checks?check_run_id=2836970689] 

> Flaky Test CustomQuotaCallbackTest.testCustomQuotaCallback
> --
>
> Key: KAFKA-12678
> URL: https://issues.apache.org/jira/browse/KAFKA-12678
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> https://github.com/apache/kafka/pull/10548/checks?check_run_id=2363286324
> {quote} {{org.opentest4j.AssertionFailedError: Topic [group1_largeTopic] 
> metadata not propagated after 6 ms
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:117)
>   at 
> kafka.utils.TestUtils$.waitForAllPartitionsMetadata(TestUtils.scala:851)
>   at kafka.utils.TestUtils$.createTopic(TestUtils.scala:410)
>   at kafka.utils.TestUtils$.createTopic(TestUtils.scala:383)
>   at 
> kafka.api.CustomQuotaCallbackTest.createTopic(CustomQuotaCallbackTest.scala:180)
>   at 
> kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback(CustomQuotaCallbackTest.scala:135)}}
> {{}}{quote}
> {{STDOUT}}
> {quote}{{}}
>  {{[2021-04-16 13ː59ː37,961] WARN SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/kafka14612759777396794548.tmp'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it. 
> (org.apache.zookeeper.ClientCnxn:1094)
> [2021-04-16 13ː59ː37,962] ERROR [ZooKeeperClient] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient:74)
> [2021-04-16 13ː59ː38,367] WARN SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/kafka14612759777396794548.tmp'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it. 
> (org.apache.zookeeper.ClientCnxn:1094)
> [2021-04-16 13ː59ː38,367] ERROR [ZooKeeperClient] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient:74)
> [2021-04-16 13ː59ː38,514] WARN SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/kafka14612759777396794548.tmp'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it. 
> (org.apache.zookeeper.ClientCnxn:1094)
> [2021-04-16 13ː59ː38,514] ERROR [ZooKeeperClient Kafka server] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient:74)
> [2021-04-16 13ː59ː38,530] WARN No meta.properties file under dir 
> /tmp/kafka-3506547187885001632/meta.properties 
> (kafka.server.BrokerMetadataCheckpoint:70)
> [2021-04-16 13ː59ː38,838] WARN SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/kafka14612759777396794548.tmp'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it. 
> (org.apache.zookeeper.ClientCnxn:1094)
> [2021-04-16 13ː59ː38,838] ERROR [ZooKeeperClient Kafka server] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient:74)
> [2021-04-16 13ː59ː38,848] WARN No meta.properties file under dir 
> /tmp/kafka-15288707564978049709/meta.properties 
> (kafka.server.BrokerMetadataCheckpoint:70)
> [2021-04-16 14ː00ː16,029] WARN [RequestSendThread controllerId=0] Controller 
> 0 epoch 1 fails to send request (type=LeaderAndIsRequest, controllerId=0, 
> controllerEpoch=1, brokerEpoch=35, 
> partitionStates=[LeaderAndIsrPartitionState(topicName='group1_largeTopic', 
> partitionIndex=6, controllerEpoch=1, leader=0, leaderEpoch=0, isr=[0], 
> zkVersion=0, replicas=[0], addingReplicas=[], removingReplicas=[], 
> isNew=true), LeaderAndIsrPartitionState(topicName='group1_largeTopic', 
> partitionIndex=72, controllerEpoch=1, leader=0, leaderEpoch=0, isr=[0], 
> zkVersion=0, replicas=[0], addingReplicas=[], removingReplicas=[], 
> isNew=true), LeaderAndIsrPartitionState(topicName='group1_largeTopic', 
> partitionIndex=39, controllerEpoch=1, leader=0, leaderEpoch=0, isr=[0], 
> zkVersion=0, replicas=[0], addingReplicas=[], removingReplicas=[], 
> isNew=true), LeaderAndIsrPartitionState(topicName='group1_largeTopic', 
> partitionIndex=14, controllerEpoch=1, leader=0, leaderEpoch=0, isr=[0], 
> zkVersion=0, replicas=[0], addingReplicas=[], removi

[jira] [Commented] (KAFKA-12629) Failing Test: RaftClusterTest

2021-06-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364362#comment-17364362
 ] 

Matthias J. Sax commented on KAFKA-12629:
-

Failed two more times

- [https://github.com/apache/kafka/pull/10861/checks?check_run_id=2836660125]

- [https://github.com/apache/kafka/pull/10861/checks?check_run_id=2836599063] 

> Failing Test: RaftClusterTest
> -
>
> Key: KAFKA-12629
> URL: https://issues.apache.org/jira/browse/KAFKA-12629
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> {quote} {{java.util.concurrent.ExecutionException: 
> java.lang.ClassNotFoundException: 
> org.apache.kafka.controller.NoOpSnapshotWriterBuilder
>   at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
>   at 
> kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364)
>   at 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12957) Refactor Streams Logical Plan Generation

2021-06-16 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12957:
-

 Summary: Refactor Streams Logical Plan Generation
 Key: KAFKA-12957
 URL: https://issues.apache.org/jira/browse/KAFKA-12957
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


There is a general issue of Streams logical plan -> physical plan generation, 
where the physical processor nodes are generated at the parsing phase rather 
than the logical plan compilation phase. The former stage is agnostic to any 
user configurations while only the latter stage have access to it, and hence we 
would not generate physical processor nodes during the parsing phase (i.e. any 
code related to StreamsBuilder), but defer them to the logical plan phase (i.e. 
XXNode.writeToTopology). This has several issues such that many physical 
processor instantiation requires to access the configs, and hence we have to 
defer it to the `init` procedure of the node, which is scattered in many places 
from logical nodes to physical processors.

This would be a big refactoring on Stream's logical plan generation, but I 
think it would worth to get this in a cleaner state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8295) Optimize count() using RocksDB merge operator

2021-06-16 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364422#comment-17364422
 ] 

Sagar Rao commented on KAFKA-8295:
--

Yeah that makes sense. I haven't looked at the count() APIs so can't comment at 
this moment. I will go through the implementation and also find the feasibility 
of it.

Other option that I was thinking was to expose a counter based State store? 
Basically, users use it as a counter for the keys. This would be agnostic of 
any time window. I guess currently, even to achieve that, users will have to do 
a read modify write.

 

But, this may not be an issue as all records against the same key would end up 
on the same stream processor thread, so technically multiple threads won't be 
accessing the key leading to race conditions.

Do you think a counter would be useful in state stores?

> Optimize count() using RocksDB merge operator
> -
>
> Key: KAFKA-8295
> URL: https://issues.apache.org/jira/browse/KAFKA-8295
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>
> In addition to regular put/get/delete RocksDB provides a fourth operation, 
> merge. This essentially provides an optimized read/update/write path in a 
> single operation. One of the built-in (C++) merge operators exposed over the 
> Java API is a counter. We should be able to leverage this for a more 
> efficient implementation of count()
>  
> (Note: Unfortunately it seems unlikely we can use this to optimize general 
> aggregations, even if RocksJava allowed for a custom merge operator, unless 
> we provide a way for the user to specify and connect a C++ implemented 
> aggregator – otherwise we incur too much cost crossing the jni for a net 
> performance benefit)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12241) Partition offline when ISR shrinks to leader and LogDir goes offline

2021-06-16 Thread Jack Foy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364493#comment-17364493
 ] 

Jack Foy commented on KAFKA-12241:
--

In my opinion this is a cleaner fix than the one proposed in 
https://issues.apache.org/jira/browse/KAFKA-3861 for the same problem.

> Partition offline when ISR shrinks to leader and LogDir goes offline
> 
>
> Key: KAFKA-12241
> URL: https://issues.apache.org/jira/browse/KAFKA-12241
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.2
>Reporter: Noa Resare
>Priority: Major
>
> This is a long standing issue that we haven't previously tracked in a JIRA. 
> We experience this maybe once per month on average and we see the following 
> sequence of events:
>  # A broker shrinks ISR to just itself for a partition. However, the 
> followers are at highWatermark:{{ [Partition PARTITION broker=601] Shrinking 
> ISR from 1501,601,1201,1801 to 601. Leader: (highWatermark: 432385279, 
> endOffset: 432385280). Out of sync replicas: (brokerId: 1501, endOffset: 
> 432385279) (brokerId: 1201, endOffset: 432385279) (brokerId: 1801, endOffset: 
> 432385279).}}
>  # Around this time (in the case I have in front of me, 20ms earlier 
> according to the logging subsystem) LogDirFailureChannel captures an Error 
> while appending records to PARTITION due to a readonly filesystem.
>  # ~20 ms after the ISR shrink, LogDirFailureHandler offlines the partition: 
> Logs for partitions LIST_OF_PARTITIONS are offline and logs for future 
> partitions are offline due to failure on log directory /kafka/d6/data 
>  # ~50ms later the controller marks the replicas as offline from 601: 
> message: [Controller id=901] Mark replicas LIST_OF_PARTITIONS on broker 601 
> as offline 
>  # ~2ms later the controller offlines the partition: [Controller id=901 
> epoch=4] Changed partition PARTITION state from OnlinePartition to 
> OfflinePartition 
> To resolve this someone needs to manually enable unclean leader election, 
> which is obviously not ideal. Since the leader knows that all the followers 
> that are removed from ISR is at highWatermark, maybe it could convey that to 
> the controller in the LeaderAndIsr response so that the controller could pick 
> a new leader without having to resort to unclean leader election.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3861) Shrunk ISR before leader crash makes the partition unavailable

2021-06-16 Thread Jack Foy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364495#comment-17364495
 ] 

Jack Foy commented on KAFKA-3861:
-

https://issues.apache.org/jira/browse/KAFKA-12241 is a duplicate of this issue, 
but I think its proposed fix is cleaner.

> Shrunk ISR before leader crash makes the partition unavailable
> --
>
> Key: KAFKA-3861
> URL: https://issues.apache.org/jira/browse/KAFKA-3861
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
>Priority: Major
>
> We observed a case that the leader experienced a crash and lost its in-memory 
> data and latest HW offsets. Normally Kafka should be safe and be able to make 
> progress with a single node failure. However a few seconds before the crash 
> the leader shrunk its ISR to itself, which is safe since min-in-sync-replicas 
> is 2 and replication factor is 3 thus the troubled leader cannot accept new 
> produce messages. After the crash however the controller could not name any 
> of the of the followers as the new leader since as far as the controller 
> knows they are not in ISR and could potentially be behind the last leader. 
> Note that unclean-leader-election is disabled in this cluster since the 
> cluster requires a very high degree of durability and cannot tolerate data 
> loss.
> The impact could get worse if the admin brings up the crashed broker in an 
> attempt to make such partitions available again; this would take down even 
> more brokers as the followers panic when they find their offset larger than 
> HW offset in the leader:
> {code}
> if (leaderEndOffset < replica.logEndOffset.messageOffset) {
>   // Prior to truncating the follower's log, ensure that doing so is not 
> disallowed by the configuration for unclean leader election.
>   // This situation could only happen if the unclean election 
> configuration for a topic changes while a replica is down. Otherwise,
>   // we should never encounter this situation since a non-ISR leader 
> cannot be elected if disallowed by the broker configuration.
>   if (!LogConfig.fromProps(brokerConfig.originals, 
> AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
> ConfigType.Topic, 
> topicAndPartition.topic)).uncleanLeaderElectionEnable) {
> // Log a fatal error and shutdown the broker to ensure that data loss 
> does not unexpectedly occur.
> fatal("Halting because log truncation is not allowed for topic 
> %s,".format(topicAndPartition.topic) +
>   " Current leader %d's latest offset %d is less than replica %d's 
> latest offset %d"
>   .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, 
> replica.logEndOffset.messageOffset))
> Runtime.getRuntime.halt(1)
>   }
> {code}
> One hackish solution would be that the admin investigates the logs, determine 
> that unclean-leader-election in this particular case would be safe and 
> temporarily enables it (while the crashed node is down) until new leaders are 
> selected for affected partitions, wait for the topics LEO advances far enough 
> and then brings up the crashed node again. This manual process is however 
> slow and error-prone and the cluster will suffer partial unavailability in 
> the meanwhile.
> We are thinking of having the controller make an exception for this case: if 
> ISR size is less than min-in-sync-replicas and the new leader would be -1, 
> then the controller does an RPC to all the replicas and inquire of the latest 
> offset, and if all the replicas responded then chose the one with the largest 
> offset as the leader as well as the new ISR. Note that the controller cannot 
> do that if any of the non-leader replicas do not respond since there might be 
> a case that the responding replicas have not been involved the last ISR and 
> hence potentially behind the others (and the controller could not know that 
> since it does not keep track of previous ISR).
> Pros would be that kafka will be safely available when such cases occur and 
> would not require any admin intervention. The cons however is that the 
> controller talking to brokers inside the leader election function would break 
> the existing pattern in the source code as currently the leader is elected 
> locally without requiring any additional RPC.
> Thoughts?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12401) Flaky Test FeatureCommandTest#testUpgradeAllFeaturesSuccess

2021-06-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364507#comment-17364507
 ] 

Matthias J. Sax commented on KAFKA-12401:
-

kafka.admin.FeatureCommandTest.testDescribeFeaturesSuccess()
{quote}org.opentest4j.AssertionFailedError: expected:  but was: {quote}
Bunch of exceptions in the log:
{quote}2021-06-16 19:13:39,839] WARN [RequestSendThread controllerId=2] 
Controller 2's connection to broker localhost:33561 (id: 1 rack: null) was 
unsuccessful (kafka.controller.RequestSendThread:72) java.io.IOException: 
Connection to localhost:33561 (id: 1 rack: null) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:291)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:245) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){quote}

> Flaky Test FeatureCommandTest#testUpgradeAllFeaturesSuccess
> ---
>
> Key: KAFKA-12401
> URL: https://issues.apache.org/jira/browse/KAFKA-12401
> Project: Kafka
>  Issue Type: Test
>  Components: admin, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> {quote}kafka.admin.UpdateFeaturesException: 2 feature updates failed! at 
> kafka.admin.FeatureApis.maybeApplyFeatureUpdates(FeatureCommand.scala:289) at 
> kafka.admin.FeatureApis.upgradeAllFeatures(FeatureCommand.scala:191) at 
> kafka.admin.FeatureCommandTest.$anonfun$testUpgradeAllFeaturesSuccess$3(FeatureCommandTest.scala:134){quote}
> STDOUT
> {quote}[Add] Feature: feature_1 ExistingFinalizedMaxVersion: - 
> NewFinalizedMaxVersion: 3 Result: OK [Add] Feature: feature_2 
> ExistingFinalizedMaxVersion: - NewFinalizedMaxVersion: 5 Result: OK [Add] 
> Feature: feature_1 ExistingFinalizedMaxVersion: - NewFinalizedMaxVersion: 3 
> Result: OK [Add] Feature: feature_2 ExistingFinalizedMaxVersion: - 
> NewFinalizedMaxVersion: 5 Result: OK{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12770) Gradle build: allow the CheckStyle version to be specified via parameter

2021-06-16 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-12770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dejan Stojadinović reassigned KAFKA-12770:
--

Assignee: Ismael Juma  (was: Dejan Stojadinović)

> Gradle build: allow the CheckStyle version to be specified via parameter
> 
>
> Key: KAFKA-12770
> URL: https://issues.apache.org/jira/browse/KAFKA-12770
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Dejan Stojadinović
>Assignee: Ismael Juma
>Priority: Minor
>
>   ^*(i) Prologue*: 
> [https://github.com/apache/kafka/pull/10656#issuecomment-836074067]^
> *(on) Rationale:* if we implement this CheckStyle team ([~romani] and others) 
> can add Kafka project to their regression suite: 
> [https://github.com/apache/kafka/pull/10656#issuecomment-835809154] 
> *Related links:*
>  * [https://github.com/apache/kafka/blob/2.8.0/Jenkinsfile#L28]
>  * [https://github.com/apache/kafka#common-build-options]
>  * 
> [https://docs.gradle.org/7.0.1/dsl/org.gradle.api.plugins.quality.CheckstyleExtension.html#org.gradle.api.plugins.quality.CheckstyleExtension:toolVersion]
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12955) Fix LogLoader to pass materialized view of segments for deletion

2021-06-16 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-12955.
-
Fix Version/s: 3.0.0
 Assignee: Kowshik Prakasam
   Resolution: Fixed

Merged the PR to trunk.

> Fix LogLoader to pass materialized view of segments for deletion
> 
>
> Key: KAFKA-12955
> URL: https://issues.apache.org/jira/browse/KAFKA-12955
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Assignee: Kowshik Prakasam
>Priority: Critical
> Fix For: 3.0.0
>
>
> Within {{LogLoader.removeAndDeleteSegmentsAsync()}}, we should force 
> materialization of the {{segmentsToDelete}} iterable, to make sure the 
> results of the iteration remain valid and deterministic. We should also pass 
> only the materialized view to the logic that deletes the segments. Otherwise, 
> we could end up deleting the wrong segments asynchronously.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12958) Add similation invariant for leadership and snapshot

2021-06-16 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-12958:
--

 Summary: Add similation invariant for leadership and snapshot
 Key: KAFKA-12958
 URL: https://issues.apache.org/jira/browse/KAFKA-12958
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jose Armando Garcia Sancio


During the simulation we should add an invariant that notified leaders are 
never asked to load snapshots. The state machine always sees the following 
sequence of callback calls:

Leaders see:
...
handleLeaderChange state machine is notify of leadership
handleSnapshot is never called

Non-leader see:
...
handleLeaderChange state machine is notify that is not leader
handleSnapshot is called 0 or more times



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12718) SessionWindows are closed too early

2021-06-16 Thread Juan C. Gonzalez-Zurita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364613#comment-17364613
 ] 

Juan C. Gonzalez-Zurita commented on KAFKA-12718:
-

Attempting to fix the conflict at the moment. I find it odd that the compile 
step fails here as my local branch is having no issues whatsoever. The test 
cases for streams all succeeded. Since I don't know how to access the stack 
trace from the crashes in Jenkins I'm not sure what to do. Any advice? [~mjsax]

> SessionWindows are closed too early
> ---
>
> Key: KAFKA-12718
> URL: https://issues.apache.org/jira/browse/KAFKA-12718
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Juan C. Gonzalez-Zurita
>Priority: Critical
>  Labels: beginner, easy-fix, newbie
> Fix For: 3.0.0
>
>
> SessionWindows are defined based on a {{gap}} parameter, and also support an 
> additional {{grace-period}} configuration to handle out-of-order data.
> To incorporate the session-gap a session window should only be closed at 
> {{window-end + gap}} and to incorporate grace-period, the close time should 
> be pushed out further to {{window-end + gap + grace}}.
> However, atm we compute the window close time as {{window-end + grace}} 
> omitting the {{gap}} parameter.
> Because default grace-period is 24h most users might not notice this issues. 
> Even if they set a grace period explicitly (eg, when using suppress()), they 
> would most likely set a grace-period larger than gap-time not hitting the 
> issue (or maybe only realize it when inspecting the behavior closely).
> However, if a user wants to disable the grace-period and sets it to zero (on 
> any other value smaller than gap-time), sessions might be close too early and 
> user might notice.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12718) SessionWindows are closed too early

2021-06-16 Thread Juan C. Gonzalez-Zurita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364613#comment-17364613
 ] 

Juan C. Gonzalez-Zurita edited comment on KAFKA-12718 at 6/17/21, 12:43 AM:


Attempting to fix the conflict at the moment. I find it odd that some of the 
compile step fails here as my local branch is having no issues whatsoever. The 
test cases for streams all succeeded. Since I don't know how to access the 
stack trace from the crashes in Jenkins I'm not sure what to do. Any advice? 
[~mjsax]


was (Author: gonzur):
Attempting to fix the conflict at the moment. I find it odd that the compile 
step fails here as my local branch is having no issues whatsoever. The test 
cases for streams all succeeded. Since I don't know how to access the stack 
trace from the crashes in Jenkins I'm not sure what to do. Any advice? [~mjsax]

> SessionWindows are closed too early
> ---
>
> Key: KAFKA-12718
> URL: https://issues.apache.org/jira/browse/KAFKA-12718
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Juan C. Gonzalez-Zurita
>Priority: Critical
>  Labels: beginner, easy-fix, newbie
> Fix For: 3.0.0
>
>
> SessionWindows are defined based on a {{gap}} parameter, and also support an 
> additional {{grace-period}} configuration to handle out-of-order data.
> To incorporate the session-gap a session window should only be closed at 
> {{window-end + gap}} and to incorporate grace-period, the close time should 
> be pushed out further to {{window-end + gap + grace}}.
> However, atm we compute the window close time as {{window-end + grace}} 
> omitting the {{gap}} parameter.
> Because default grace-period is 24h most users might not notice this issues. 
> Even if they set a grace period explicitly (eg, when using suppress()), they 
> would most likely set a grace-period larger than gap-time not hitting the 
> issue (or maybe only realize it when inspecting the behavior closely).
> However, if a user wants to disable the grace-period and sets it to zero (on 
> any other value smaller than gap-time), sessions might be close too early and 
> user might notice.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12883) Adress KIP-100 type constraints now that Java 7 support is dropped

2021-06-16 Thread Nicolas Guignard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Guignard reassigned KAFKA-12883:


Assignee: Nicolas Guignard

> Adress KIP-100 type constraints now that Java 7 support is dropped
> --
>
> Key: KAFKA-12883
> URL: https://issues.apache.org/jira/browse/KAFKA-12883
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Assignee: Nicolas Guignard
>Priority: Minor
>  Labels: StarterProject, newbie++
>
> As part of [KIP-100 rejected 
> alternatives|https://cwiki.apache.org/confluence/display/KAFKA/KIP-100+-+Relax+Type+constraints+in+Kafka+Streams+API#KIP100RelaxTypeconstraintsinKafkaStreamsAPI-RejectedAlternatives],
>  we suggested a more correct alternative to the type constraints for some of 
> the {{KStream}} methods.
> Unfortunately at the time, there was a Java 7 compiler behavior that 
> prevented us from using those type constraints, so we had to relax them in 
> order to preserve backwards compatibility.
> As part of the KIP it was mentioned that:
> ??Once we drop support for 1.7 we can always decide to switch to approach 2. 
> without breaking source compatibility, by making a proposal similar to this 
> KIP.??
> Since Java 7 support has been dropped a while ago, it would be a good time to 
> revisit this and possibly switch to the alternative type constraints. The 
> change should be source compatible, although the streams APIs have 
> significantly evolved since, so there might be some additional investigation 
> required to ensure that is still the case and also covers the Scala Streams 
> APIs.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12718) SessionWindows are closed too early

2021-06-16 Thread Juan C. Gonzalez-Zurita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364613#comment-17364613
 ] 

Juan C. Gonzalez-Zurita edited comment on KAFKA-12718 at 6/17/21, 2:37 AM:
---

Attempting to fix the conflict at the moment. I find it odd that some of the 
compile step fails here as my local branch is having no issues whatsoever. The 
test cases for streams all succeeded. Since I don't know how to access the 
profiling report from the crashes in Jenkins I'm not sure what to do. Any 
advice? [~mjsax]


was (Author: gonzur):
Attempting to fix the conflict at the moment. I find it odd that some of the 
compile step fails here as my local branch is having no issues whatsoever. The 
test cases for streams all succeeded. Since I don't know how to access the 
stack trace from the crashes in Jenkins I'm not sure what to do. Any advice? 
[~mjsax]

> SessionWindows are closed too early
> ---
>
> Key: KAFKA-12718
> URL: https://issues.apache.org/jira/browse/KAFKA-12718
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Juan C. Gonzalez-Zurita
>Priority: Critical
>  Labels: beginner, easy-fix, newbie
> Fix For: 3.0.0
>
>
> SessionWindows are defined based on a {{gap}} parameter, and also support an 
> additional {{grace-period}} configuration to handle out-of-order data.
> To incorporate the session-gap a session window should only be closed at 
> {{window-end + gap}} and to incorporate grace-period, the close time should 
> be pushed out further to {{window-end + gap + grace}}.
> However, atm we compute the window close time as {{window-end + grace}} 
> omitting the {{gap}} parameter.
> Because default grace-period is 24h most users might not notice this issues. 
> Even if they set a grace period explicitly (eg, when using suppress()), they 
> would most likely set a grace-period larger than gap-time not hitting the 
> issue (or maybe only realize it when inspecting the behavior closely).
> However, if a user wants to disable the grace-period and sets it to zero (on 
> any other value smaller than gap-time), sessions might be close too early and 
> user might notice.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12953) Bump Zookeeper version to 3.6.3

2021-06-16 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12953:

Priority: Major  (was: Minor)

> Bump Zookeeper version to 3.6.3
> ---
>
> Key: KAFKA-12953
> URL: https://issues.apache.org/jira/browse/KAFKA-12953
> Project: Kafka
>  Issue Type: Task
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Major
>
> Bump the Zookeeper version used by Kafka to the latest stable release (3.6.3).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12959) Prioritize assigning standby tasks to threads without any active tasks

2021-06-16 Thread Ravi Bhardwaj (Jira)
Ravi Bhardwaj created KAFKA-12959:
-

 Summary: Prioritize assigning standby tasks to threads without any 
active tasks
 Key: KAFKA-12959
 URL: https://issues.apache.org/jira/browse/KAFKA-12959
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.8.0
Reporter: Ravi Bhardwaj
 Attachments: app1.log, app2.log

Currently while distributing the standby tasks streams does not check if there 
are threads without any tasks or with less number of tasks. This can lead to 
few threads getting assigned both active and standby tasks when are threads 
within the same instance without any tasks assigned.

Example:
{code:java}
App 1:
[wordcount-lambda-example-client-StreamThread-1] Handle new assignment with:
 New active tasks: [0_1]
 New standby tasks: [1_0]
 Existing active tasks: []
 Existing standby tasks: [1_0]
[wordcount-lambda-example-client-StreamThread-2] Handle new assignment with:
 New active tasks: [1_1]
 New standby tasks: []
 Existing active tasks: [1_1]
 Existing standby tasks: []
[wordcount-lambda-example-client-StreamThread-3] Handle new assignment with:
 New active tasks: []
 New standby tasks: []
 Existing active tasks: []
 Existing standby tasks: []
[wordcount-lambda-example-client-StreamThread-4] Handle new assignment with:
 New active tasks: []
 New standby tasks: []
 Existing active tasks: []
 Existing standby tasks: []
{code}
 
{code:java}
App2:
[wordcount-lambda-example-client-StreamThread-1] Handle new assignment with:
 New active tasks: [1_0]
 New standby tasks: [1_1]
 Existing active tasks: []
 Existing standby tasks: [1_0, 1_1]
[wordcount-lambda-example-client-StreamThread-2] Handle new assignment with:
 New active tasks: [0_0]
 New standby tasks: []
 Existing active tasks: []
 Existing standby tasks: []
[wordcount-lambda-example-client-StreamThread-3] Handle new assignment with:
 New active tasks: []
 New standby tasks: []
 Existing active tasks: []
 Existing standby tasks: []
[wordcount-lambda-example-client-StreamThread-4] Handle new assignment with:
 New active tasks: []
 New standby tasks: []
 Existing active tasks: []
 Existing standby tasks: []
{code}
 The standby tasks in both apps is assigned to Thread-1 even though it has an 
active tasks when Thread-3 and Thread-4 didn't have any tasks assigned.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12959) Prioritize assigning standby tasks to threads without any active tasks

2021-06-16 Thread Ravi Bhardwaj (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravi Bhardwaj updated KAFKA-12959:
--
Attachment: app2.log
app1.log

> Prioritize assigning standby tasks to threads without any active tasks
> --
>
> Key: KAFKA-12959
> URL: https://issues.apache.org/jira/browse/KAFKA-12959
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Ravi Bhardwaj
>Priority: Major
> Attachments: app1.log, app2.log
>
>
> Currently while distributing the standby tasks streams does not check if 
> there are threads without any tasks or with less number of tasks. This can 
> lead to few threads getting assigned both active and standby tasks when are 
> threads within the same instance without any tasks assigned.
> Example:
> {code:java}
> App 1:
> [wordcount-lambda-example-client-StreamThread-1] Handle new assignment with:
>  New active tasks: [0_1]
>  New standby tasks: [1_0]
>  Existing active tasks: []
>  Existing standby tasks: [1_0]
> [wordcount-lambda-example-client-StreamThread-2] Handle new assignment with:
>  New active tasks: [1_1]
>  New standby tasks: []
>  Existing active tasks: [1_1]
>  Existing standby tasks: []
> [wordcount-lambda-example-client-StreamThread-3] Handle new assignment with:
>  New active tasks: []
>  New standby tasks: []
>  Existing active tasks: []
>  Existing standby tasks: []
> [wordcount-lambda-example-client-StreamThread-4] Handle new assignment with:
>  New active tasks: []
>  New standby tasks: []
>  Existing active tasks: []
>  Existing standby tasks: []
> {code}
>  
> {code:java}
> App2:
> [wordcount-lambda-example-client-StreamThread-1] Handle new assignment with:
>  New active tasks: [1_0]
>  New standby tasks: [1_1]
>  Existing active tasks: []
>  Existing standby tasks: [1_0, 1_1]
> [wordcount-lambda-example-client-StreamThread-2] Handle new assignment with:
>  New active tasks: [0_0]
>  New standby tasks: []
>  Existing active tasks: []
>  Existing standby tasks: []
> [wordcount-lambda-example-client-StreamThread-3] Handle new assignment with:
>  New active tasks: []
>  New standby tasks: []
>  Existing active tasks: []
>  Existing standby tasks: []
> [wordcount-lambda-example-client-StreamThread-4] Handle new assignment with:
>  New active tasks: []
>  New standby tasks: []
>  Existing active tasks: []
>  Existing standby tasks: []
> {code}
>  The standby tasks in both apps is assigned to Thread-1 even though it has an 
> active tasks when Thread-3 and Thread-4 didn't have any tasks assigned.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12960) WindowStore and SessionStore do not enforce strict retention time

2021-06-16 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-12960:
---

 Summary: WindowStore and SessionStore do not enforce strict 
retention time
 Key: KAFKA-12960
 URL: https://issues.apache.org/jira/browse/KAFKA-12960
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


WindowedStore and SessionStore do not implement a strict retention time in 
general. We should consider to make retention time strict: even if we still 
have some record in the store (due to the segmented implementation), we might 
want to filter expired records on-read. This might benefit PAPI users.

Atm, InMemoryWindow store does already enforce a strict retention time.

As an alternative, we could also inject such a filter in the wrapping 
`MeteredStore` – this might lift the burden from users who implement a custom 
state store.

As an alternative, we could change all DSL operators to verify if data from a 
state store is already expired or not. It might be better to push this 
responsibility into the stores though.

It's especially an issue for stream-stream joins, because the operator relies 
on the retention time to implement it's grace period.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)