[GitHub] [kafka] guozhangwang commented on pull request #11112: MINOR: only request rejoin and log if necessary for metadata snapshot and subscription checks

2021-07-22 Thread GitBox


guozhangwang commented on pull request #2:
URL: https://github.com/apache/kafka/pull/2#issuecomment-885414788


   Would not merge to 3.0 since it is not a blocker and we've past code freeze 
deadline.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #11112: MINOR: only request rejoin and log if necessary for metadata snapshot and subscription checks

2021-07-22 Thread GitBox


guozhangwang merged pull request #2:
URL: https://github.com/apache/kafka/pull/2


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11112: MINOR: only request rejoin and log if necessary for metadata snapshot and subscription checks

2021-07-22 Thread GitBox


guozhangwang commented on pull request #2:
URL: https://github.com/apache/kafka/pull/2#issuecomment-885414377


   Test failures are known. Merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11114: KAFKA-13021: clarify KIP-633 javadocs and address remaining feedback

2021-07-22 Thread GitBox


guozhangwang commented on a change in pull request #4:
URL: https://github.com/apache/kafka/pull/4#discussion_r675318036



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -102,49 +102,61 @@ private JoinWindows(final long beforeMs,
 
 /**
  * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
- * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
- * the timestamp of the record from the primary stream. Using the method 
explicitly sets the grace period to
- * the duration specified by {@code afterWindowEnd} which means that out 
of order records arriving
- * after the window end will be dropped. The delay is defined as 
(stream_time - record_timestamp).
+ * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} before or after
+ * the timestamp of the record from the primary stream.
+ * 
+ * Using this method explicitly sets the grace period to the duration 
specified by {@code afterWindowEnd}, which
+ * means that only out-of-order records arriving more than the grace 
period after the window end will be dropped.
+ * The window close, after which any incoming records are considered late 
and will be rejected, is defined as
+ * {@code windowEnd + afterWindowEnd}
  *
  * @param timeDifference join window interval
  * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
- * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
  * @return A new JoinWindows object with the specified window definition 
and grace period
+ * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+ *  if the {@code afterWindowEnd} is 
negative or can't be represented as {@code long milliseconds}
  */
 public static JoinWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) {
-return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
+final String timeDifferenceMsgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");

Review comment:
   Thanks for the catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11089: MINOR: remove unnecessary judgment in AdminUtils::assignReplicasToBrokersRackAware

2021-07-22 Thread GitBox


guozhangwang commented on pull request #11089:
URL: https://github.com/apache/kafka/pull/11089#issuecomment-885410437


   Overall looks good to me. Just one qq: do we always guarantee 
`replicationFactor <= numBrokers`? I know we check and forbid it when creating 
a new topic, but after a topic is created, if we shutdown brokers and reassign 
replicas, would we stop with less replicas and warn users?
   
   cc @ijuma for another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11108: KAFKA-13116: Fix message_format_change_test and compatibility_test_new_broker_test failures

2021-07-22 Thread GitBox


ijuma commented on pull request #11108:
URL: https://github.com/apache/kafka/pull/11108#issuecomment-885391600


   @hachikuji I addressed your comment, this is ready for another review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

2021-07-22 Thread GitBox


ableegoldman commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r675296678



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -177,7 +204,9 @@ public long size() {
  * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
  * @return this updated builder
  * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0 Use {@link 
#ofTimeDifferenceWithNoGrace(Duration)} instead
  */
+@Deprecated
 public JoinWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {

Review comment:
   I picked up all the non-testing followup work in this PR so we could try 
to get it into 3.0: https://github.com/apache/kafka/pull/4




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13021) Improve Javadocs for API Changes and address followup from KIP-633

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17385929#comment-17385929
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13021:


PR: https://github.com/apache/kafka/pull/4

> Improve Javadocs for API Changes and address followup from KIP-633
> --
>
> Key: KAFKA-13021
> URL: https://issues.apache.org/jira/browse/KAFKA-13021
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 3.0.0
>Reporter: Israel Ekpo
>Assignee: Israel Ekpo
>Priority: Major
>
> There are Javadoc changes from the following PR that needs to be completed 
> prior to the 3.0 release. This Jira item is to track that work
> [https://github.com/apache/kafka/pull/10926]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman opened a new pull request #11114: KAFKA-13021: clarify KIP-633 javadocs and address remaining feedback

2021-07-22 Thread GitBox


ableegoldman opened a new pull request #4:
URL: https://github.com/apache/kafka/pull/4


   There were a few followup things to address from 
[#10926](https://github.com/apache/kafka/pull/10926), most importantly a number 
of fixes needed for the javadocs. Beyond that it's mostly just adding a few 
missing verification checks.
   
   Given the whole point of this KIP was to help reduce a major source of 
confusion, the meaning and usage of grace period within Streams, it's critical 
that we have clear and correct javadocs accompanying the new APIs. For that 
reason I think it's very important to get this into 3.0 -- it's also very 
low-risk, as the only non-docs changes are adding a handful of checks that 
already exist in the old APIs and were just missed in the new APIs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

2021-07-22 Thread GitBox


ableegoldman commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r675292037



##
File path: streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
##
@@ -40,7 +40,11 @@
 
 // By default grace period is 24 hours for all windows,
 // in other words we allow out-of-order data for up to a day
-protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L;
+// This behavior is now deprecated
+protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 * 
60 * 1000L;

Review comment:
   Yeah it's an internal config so I hope they wouldn't assume anything 
from the name and extrapolate to what they can and can't use. That said, it 
does appear in these classes which are public themselves, so users are still 
going to see it. But the important thing is that it makes sense to us, the 
devs, who will actually be using it -- I think 
`DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD` is a bit more clear, just need to sneak 
the word "default" in there somewhere imo




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11108: KAFKA-13116: Fix message_format_change_test and compatibility_test_new_broker_test failures

2021-07-22 Thread GitBox


ijuma commented on a change in pull request #11108:
URL: https://github.com/apache/kafka/pull/11108#discussion_r675289769



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -785,7 +785,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   Some(RecordBatch.MAGIC_VALUE_V1)
 else
   None
-  }
+  }.filter(_ => unconvertedRecords.batchIterator.hasNext)

Review comment:
   Fixed this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13021) Improve Javadocs for API Changes and address followup from KIP-633

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13021:
---
Summary: Improve Javadocs for API Changes and address followup from KIP-633 
 (was: Improve Javadocs for API Changes from KIP-633)

> Improve Javadocs for API Changes and address followup from KIP-633
> --
>
> Key: KAFKA-13021
> URL: https://issues.apache.org/jira/browse/KAFKA-13021
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 3.0.0
>Reporter: Israel Ekpo
>Assignee: Israel Ekpo
>Priority: Major
>
> There are Javadoc changes from the following PR that needs to be completed 
> prior to the 3.0 release. This Jira item is to track that work
> [https://github.com/apache/kafka/pull/10926]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

2021-07-22 Thread GitBox


ableegoldman commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r675285980



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -177,7 +204,9 @@ public long size() {
  * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
  * @return this updated builder
  * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0 Use {@link 
#ofTimeDifferenceWithNoGrace(Duration)} instead
  */
+@Deprecated
 public JoinWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {

Review comment:
   Personally I think it makes sense to just disallow calling 
`ofTimeDifferenceAndGrace(...).grace(...)` entirely, this seems like abusing 
the API




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11113: KAFKA-13128: wait for all keys to be fully processed in #shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-07-22 Thread GitBox


ableegoldman commented on pull request #3:
URL: https://github.com/apache/kafka/pull/3#issuecomment-885356922


   cc @PhilHardwick @cadonna @wcarlson5 @lct45 @vvcephei 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

2021-07-22 Thread GitBox


ableegoldman commented on a change in pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#discussion_r675269697



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -907,14 +944,23 @@ private void 
maybeSetOffsetForLeaderException(RuntimeException e) {
 final AtomicInteger remainingResponses = new 
AtomicInteger(timestampsToSearchByNode.size());
 
 for (Map.Entry> entry 
: timestampsToSearchByNode.entrySet()) {
-RequestFuture future =
-sendListOffsetRequest(entry.getKey(), entry.getValue(), 
requireTimestamps);
+// we skip sending the list off request only if there's already 
one with the exact
+// requested offsets for the destination node

Review comment:
   Hm..I wonder if deduplicating like this within the Fetcher itself is too 
low-level, ie there may be other callers of `sendListOffsetsRequests` that 
actually do want to issue a new request. I think there are arguments to be made 
for doing this for all requests, but maybe also some arguments against it -- 
this is a more drastic change that means APIs like `Consumer#endOffsets` can 
actually return old/stale results (by up to the configured `request.timeout` at 
most).
   
   Since this is a last-minute blocker fix I'd prefer to keep the changes to a 
minimum and scoped to the specific bug, if at all possible. Can we do the 
deduplication in another layer, so that we only avoid re-sending the 
listOffsets request in the specific case of `currentLag`, where we know it's 
acceptable to report a slightly-out-of-date value because the alternative is to 
report no value at all? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13129) Fix broken system tests relate to the ConfigCommand change

2021-07-22 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13129:
--
Component/s: system tests

> Fix broken system tests relate to the ConfigCommand change
> --
>
> Key: KAFKA-13129
> URL: https://issues.apache.org/jira/browse/KAFKA-13129
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.0.0
>
>
> After KAFKA-12598, the system tests failed in {{upgrade_test}}, 
> {{zookeeper_tls_encrypt_only_test.py,}} and {{zookeeper_tls_test.py}}. Fix 
> them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13129) Fix broken system tests relate to the ConfigCommand change

2021-07-22 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13129:
--
Priority: Blocker  (was: Major)

> Fix broken system tests relate to the ConfigCommand change
> --
>
> Key: KAFKA-13129
> URL: https://issues.apache.org/jira/browse/KAFKA-13129
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.0.0
>
>
> After KAFKA-12598, the system tests failed in {{upgrade_test}}, 
> {{zookeeper_tls_encrypt_only_test.py,}} and {{zookeeper_tls_test.py}}. Fix 
> them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13129) Fix broken system tests relate to the ConfigCommand change

2021-07-22 Thread Luke Chen (Jira)
Luke Chen created KAFKA-13129:
-

 Summary: Fix broken system tests relate to the ConfigCommand change
 Key: KAFKA-13129
 URL: https://issues.apache.org/jira/browse/KAFKA-13129
 Project: Kafka
  Issue Type: Bug
Reporter: Luke Chen
Assignee: Luke Chen
 Fix For: 3.0.0


After KAFKA-12598, the system tests failed in {{upgrade_test}}, 
{{zookeeper_tls_encrypt_only_test.py,}} and {{zookeeper_tls_test.py}}. Fix them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #11111: KAFKA-13126: guard against overflow when computing `joinGroupTimeoutMs`

2021-07-22 Thread GitBox


vvcephei commented on a change in pull request #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r675266851



##
File path: 
clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
##
@@ -60,17 +60,16 @@ public static boolean awaitReady(KafkaClient client, Node 
node, Time time, long
 throw new IllegalArgumentException("Timeout needs to be greater 
than 0");
 }
 long startTime = time.milliseconds();
-long expiryTime = startTime + timeoutMs;
 
 if (isReady(client, node, startTime) ||  client.ready(node, startTime))
 return true;
 
 long attemptStartTime = time.milliseconds();
-while (!client.isReady(node, attemptStartTime) && attemptStartTime < 
expiryTime) {
+while (!client.isReady(node, attemptStartTime) && attemptStartTime - 
startTime < timeoutMs) {
 if (client.connectionFailed(node)) {
 throw new IOException("Connection to " + node + " failed.");
 }
-long pollTimeout = expiryTime - attemptStartTime;
+long pollTimeout = (startTime - attemptStartTime) + timeoutMs;

Review comment:
   Ah, thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoeCqupt edited a comment on pull request #11089: MINOR: remove unnecessary judgment in AdminUtils::assignReplicasToBrokersRackAware

2021-07-22 Thread GitBox


JoeCqupt edited a comment on pull request #11089:
URL: https://github.com/apache/kafka/pull/11089#issuecomment-884671155


   call for review @ijuma @guozhangwang @hachikuji @mjsax @ableegoldman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-6948) Avoid overflow in timestamp comparison

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-6948:
-

Assignee: A. Sophie Blee-Goldman

> Avoid overflow in timestamp comparison
> --
>
> Key: KAFKA-6948
> URL: https://issues.apache.org/jira/browse/KAFKA-6948
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Giovanni Liva
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>
> Some comparisons with timestamp values are not safe. This comparisons can 
> trigger errors that were found in some other issues, e.g. KAFKA-4290 or 
> KAFKA-6608.
> The following classes contains some comparison between timestamps that can 
> overflow.
>  * org.apache.kafka.clients.NetworkClientUtils
>  * org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
>  * org.apache.kafka.common.security.kerberos.KerberosLogin
>  * org.apache.kafka.connect.runtime.WorkerSinkTask
>  * org.apache.kafka.connect.tools.MockSinkTask
>  * org.apache.kafka.connect.tools.MockSourceTask
>  * org.apache.kafka.streams.processor.internals.GlobalStreamThread
>  * org.apache.kafka.streams.processor.internals.StateDirectory
>  * org.apache.kafka.streams.processor.internals.StreamThread
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman opened a new pull request #11113: KAFKA-13128: wait for all keys to be fully processed in #shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-07-22 Thread GitBox


ableegoldman opened a new pull request #3:
URL: https://github.com/apache/kafka/pull/3


   This test is flaky due to waiting on all records to be processed for only a 
single key before issuing IQ lookups and asserting whether data was found. See 
[this 
comment](https://issues.apache.org/jira/browse/KAFKA-13128?focusedCommentId=17385841=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17385841)
 for full analysis on how this happened
   
   Should be cherrypicked to 3.0 (test fix to help stabilize the build 
@kkonstantine ) and to 2.8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11107: KAFKA-13125: close KeyValueIterator instances in internals tests (part 2)

2021-07-22 Thread GitBox


showuon commented on pull request #11107:
URL: https://github.com/apache/kafka/pull/11107#issuecomment-885348053


   Failed tests are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
   Build / JDK 8 and Scala 2.12 / 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
   Build / JDK 16 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #5183: KAFKA-6948 - Change comparison to avoid overflow inconsistencies

2021-07-22 Thread GitBox


ableegoldman commented on pull request #5183:
URL: https://github.com/apache/kafka/pull/5183#issuecomment-885347981


   A number of these fixes are no longer relevant since switching to Timers (I 
guess that was #10537 ), the remaining ones I just tacked onto this PR 
addressing a different overflow bug. So I think we can close this in favor of 
[#1 ](https://github.com/apache/kafka/pull/1)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman closed pull request #5183: KAFKA-6948 - Change comparison to avoid overflow inconsistencies

2021-07-22 Thread GitBox


ableegoldman closed pull request #5183:
URL: https://github.com/apache/kafka/pull/5183


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13128:
---
Priority: Blocker  (was: Major)

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.0.0, 2.8.1
>
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13128:
---
Affects Version/s: (was: 3.1.0)

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.0.0, 2.8.1
>
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13128:
---
Fix Version/s: 2.8.1
   3.0.0

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.0.0, 2.8.1
>
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-13128:
--

Assignee: A. Sophie Blee-Goldman

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17385841#comment-17385841
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13128:


The failure is from the second line in this series of assertions 
{code:java}
assertThat(store1.get(key), is(notNullValue()));
assertThat(store1.get(key2), is(notNullValue()));
assertThat(store1.get(key3), is(notNullValue()));
{code}
which is basically the first time we attempt IQ after starting up. The test 
setup includes starting Streams and waiting for it to reach RUNNING, then 
adding a new thread, and finally producing a set of 100 records for each of the 
three keys. After that it waits for all records to be processed *for _key3_* 
and then proceeds to the above assertions.

I suspect the problem is that we only wait for all data to be processed for 
_key3_, but not the other two keys. In theory this should work, since the data 
for _key3_ is produced last and would have the largest timestamps meaning the 
keys should be processed more or less in order. However the input topic 
actually has two partitions, so it could be that _key1_ and _key3_ correspond 
to task 1 while _key2_ corresponds to task 2. Again, that shouldn't affect the 
order in which records are processed – as long as the tasks are on the same 
thread.

But we started up a new thread in between waiting for Streams to reach RUNNING 
and producing data to the input topics. This new thread has to be assigned one 
of the tasks, but due to cooperative rebalancing it will take two full (though 
short) rebalances before the new thread can actually start processing any 
tasks. Therefore as long as the original thread continues to own the task 
corresponding to _key3_ after the new thread is added, it can easily get 
through all records for _key3_. Which would mean the test can proceed to the 
above assertions while the new thread is still waiting to start processing any 
data for _key2_ at all.

There are a few ways we can address this given how many things had to happen 
exactly right in order to see this failure, but the simplest fix is to just 
wait on all three keys to be fully processed rather than just the one. This 
seems to align with the original intention of the test best as well

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13128:
--

 Summary: Flaky Test 
StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
 Key: KAFKA-13128
 URL: https://issues.apache.org/jira/browse/KAFKA-13128
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.1.0
Reporter: A. Sophie Blee-Goldman


h3. Stacktrace

java.lang.AssertionError: Expected: is not null but: was null 
  at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
  at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
  at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
  at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
  at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)

 

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #11112: MINOR: only request rejoin and log if necessary for metadata snapshot and subscription checks

2021-07-22 Thread GitBox


guozhangwang commented on pull request #2:
URL: https://github.com/apache/kafka/pull/2#issuecomment-885335038


   @ableegoldman please review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang opened a new pull request #11112: MINOR: only request rejoin and log if necessary for metadata snapshot and subscription checks

2021-07-22 Thread GitBox


guozhangwang opened a new pull request #2:
URL: https://github.com/apache/kafka/pull/2


   Since now we call do not necessarily complete the rebalance within a poll 
call, we may keep checking the `rejoinNeededOrPending` which hits either of the 
conditions and returns true, but then returns early, resulting in flooding log 
entries. This PR would only log/set the flag when it was not set yet, 
effectively only logging for the first time.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

2021-07-22 Thread GitBox


guozhangwang commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-885328621


   I added inside the fetcher a concurrent hashmap maintaining the in-flight 
list-offset futures, and based on the map skip sending the same requests. LMK 
if it looks good, and then I will merge @vvcephei @ableegoldman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13127) Fix stray partition lookup logic

2021-07-22 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13127:

Description: The result of `BrokerMetadataPublisher.findGhostReplicas` is 
inverted. It returns all of the non-stray replicas. This causes all of these 
partitions to get deleted on startup by mistake.  (was: The result of 
`BrokerMetadataPublisher.findGhostReplicas` is inverted. It returns all of the 
non-stray replicas. This causes all off these partitions to get deleted on 
startup by mistake.)

> Fix stray partition lookup logic
> 
>
> Key: KAFKA-13127
> URL: https://issues.apache.org/jira/browse/KAFKA-13127
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The result of `BrokerMetadataPublisher.findGhostReplicas` is inverted. It 
> returns all of the non-stray replicas. This causes all of these partitions to 
> get deleted on startup by mistake.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13127) Fix stray partition lookup logic

2021-07-22 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13127:
---

 Summary: Fix stray partition lookup logic
 Key: KAFKA-13127
 URL: https://issues.apache.org/jira/browse/KAFKA-13127
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 3.0.0


The result of `BrokerMetadataPublisher.findGhostReplicas` is inverted. It 
returns all of the non-stray replicas. This causes all off these partitions to 
get deleted on startup by mistake.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #11111: KAFKA-13126: guard against overflow when computing `joinGroupTimeoutMs`

2021-07-22 Thread GitBox


ableegoldman commented on a change in pull request #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r675241237



##
File path: 
clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
##
@@ -60,17 +60,16 @@ public static boolean awaitReady(KafkaClient client, Node 
node, Time time, long
 throw new IllegalArgumentException("Timeout needs to be greater 
than 0");
 }
 long startTime = time.milliseconds();
-long expiryTime = startTime + timeoutMs;
 
 if (isReady(client, node, startTime) ||  client.ready(node, startTime))
 return true;
 
 long attemptStartTime = time.milliseconds();
-while (!client.isReady(node, attemptStartTime) && attemptStartTime < 
expiryTime) {
+while (!client.isReady(node, attemptStartTime) && attemptStartTime - 
startTime < timeoutMs) {
 if (client.connectionFailed(node)) {
 throw new IOException("Connection to " + node + " failed.");
 }
-long pollTimeout = expiryTime - attemptStartTime;
+long pollTimeout = (startTime - attemptStartTime) + timeoutMs;

Review comment:
   The `startTime` is set once at the beginning of the method while the 
`attemptStartTime` is initialized just before the first attempt and then 
updated again after every iteration. So the `attemptStartTime` is always 
greater than the `startTime` and therefore the quantity being added to the 
`timeoutMs` here is actually negative. 
   
   But I see how that's confusing, I'll refactor the expression to make this 
more clear




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-7497) Kafka Streams should support self-join on streams

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-7497:
-

Assignee: (was: A. Sophie Blee-Goldman)

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases

2021-07-22 Thread GitBox


showuon commented on pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#issuecomment-885307107


   Let me handle it !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11111: KAFKA-13126: guard against overflow when computing `joinGroupTimeoutMs`

2021-07-22 Thread GitBox


vvcephei commented on a change in pull request #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r675220503



##
File path: 
clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
##
@@ -60,17 +60,16 @@ public static boolean awaitReady(KafkaClient client, Node 
node, Time time, long
 throw new IllegalArgumentException("Timeout needs to be greater 
than 0");
 }
 long startTime = time.milliseconds();
-long expiryTime = startTime + timeoutMs;
 
 if (isReady(client, node, startTime) ||  client.ready(node, startTime))
 return true;
 
 long attemptStartTime = time.milliseconds();
-while (!client.isReady(node, attemptStartTime) && attemptStartTime < 
expiryTime) {
+while (!client.isReady(node, attemptStartTime) && attemptStartTime - 
startTime < timeoutMs) {
 if (client.connectionFailed(node)) {
 throw new IOException("Connection to " + node + " failed.");
 }
-long pollTimeout = expiryTime - attemptStartTime;
+long pollTimeout = (startTime - attemptStartTime) + timeoutMs;

Review comment:
   This would still overflow if `timeoutMs` is `MAX_VALUE`, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13126) Overflow in joinGroupTimeoutMs when max.poll.interval.ms is MAX_VALUE leads to missing rebalances

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13126:
---
Description: 
In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was 
overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this 
override, users of both the plain consumer client and kafka streams still set 
the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an 
overflow when computing the {{joinGroupTimeoutMs}} and results in it being set 
to the {{request.timeout.ms}} instead, which is much lower.

This can easily make consumers drop out of the group, since they must rejoin 
now within 30s (by default) but have no obligation to almost ever call poll() 
given the high {{max.poll.interval.ms}} – basically they will only do so after 
processing the last record from the previously polled batch. So in heavy 
processing cases, where each record takes a long time to process, or when using 
a very large  {{max.poll.records}}, it can be difficult to make any progress at 
all before dropping out and needing to rejoin. And of course, the rebalance 
that is kicked off upon this member rejoining can result in many of the other 
members in the group dropping out as well, leading to an endless cycle of 
missed rebalances.

We just need to check for overflow and fix it to {{Integer.MAX_VALUE}} when it 
occurs. The workaround until then is of course to just set the 
{{max.poll.interval.ms}} to MAX_VALUE - 5000 (5s is the 
JOIN_GROUP_TIMEOUT_LAPSE)

  was:
In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was 
overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this 
override, users of both the plain consumer client and kafka streams still set 
the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an 
overflow when computing the {{joinGroupTimeoutMs}} and results in it being set 
to the {{request.timeout.ms}} instead, which is much lower.

This can easily make consumers drop out of the group, since they must rejoin 
now within 30s (by default) but have no obligation to almost ever call poll() 
given the high {{max.poll.interval.ms}} – basically they will only do so after 
processing the last record from the previously polled batch. So in heavy 
processing cases, where each record takes a long time to process, or when using 
a very large  {{max.poll.records}}, it can be difficult to make any progress at 
all before dropping out and needing to rejoin. And of course, the rebalance 
that is kicked off upon this member rejoining can result in many of the other 
members in the group dropping out as well, leading to an endless cycle of 
missed rebalances.

We just need to check for overflow and fix it to {{Integer.MAX_VALUE}} when it 
occurs.


> Overflow in joinGroupTimeoutMs when max.poll.interval.ms is MAX_VALUE leads 
> to missing rebalances
> -
>
> Key: KAFKA-13126
> URL: https://issues.apache.org/jira/browse/KAFKA-13126
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 3.1.0
>
>
> In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was 
> overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this 
> override, users of both the plain consumer client and kafka streams still set 
> the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an 
> overflow when computing the {{joinGroupTimeoutMs}} and results in it being 
> set to the {{request.timeout.ms}} instead, which is much lower.
> This can easily make consumers drop out of the group, since they must rejoin 
> now within 30s (by default) but have no obligation to almost ever call poll() 
> given the high {{max.poll.interval.ms}} – basically they will only do so 
> after processing the last record from the previously polled batch. So in 
> heavy processing cases, where each record takes a long time to process, or 
> when using a very large  {{max.poll.records}}, it can be difficult to make 
> any progress at all before dropping out and needing to rejoin. And of course, 
> the rebalance that is kicked off upon this member rejoining can result in 
> many of the other members in the group dropping out as well, leading to an 
> endless cycle of missed rebalances.
> We just need to check for overflow and fix it to {{Integer.MAX_VALUE}} when 
> it occurs. The workaround until then is of course to just set the 
> {{max.poll.interval.ms}} to MAX_VALUE - 5000 (5s is the 
> JOIN_GROUP_TIMEOUT_LAPSE)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13126) Overflow in joinGroupTimeoutMs when max.poll.interval.ms is MAX_VALUE leads to missing rebalances

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13126:
---
Description: 
In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was 
overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this 
override, users of both the plain consumer client and kafka streams still set 
the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an 
overflow when computing the {{joinGroupTimeoutMs}} and results in it being set 
to the {{request.timeout.ms}} instead, which is much lower.

This can easily make consumers drop out of the group, since they must rejoin 
now within 30s (by default) but have no obligation to almost ever call poll() 
given the high {{max.poll.interval.ms}} – basically they will only do so after 
processing the last record from the previously polled batch. So in heavy 
processing cases, where each record takes a long time to process, or when using 
a very large  {{max.poll.records}}, it can be difficult to make any progress at 
all before dropping out and needing to rejoin. And of course, the rebalance 
that is kicked off upon this member rejoining can result in many of the other 
members in the group dropping out as well, leading to an endless cycle of 
missed rebalances.

We just need to check for overflow and fix it to {{Integer.MAX_VALUE}} when it 
occurs.

  was:
In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was 
overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this 
override, users of both the plain consumer client and kafka streams still set 
the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an 
overflow when computing the {{joinGroupTimeoutMs}} and results in it being set 
to the {{request.timeout.ms}} instead, which is much lower.

This can easily make consumers drop out of the group, since they must rejoin 
now within 30s (by default) yet have no obligation to almost ever call poll() 
given the high {{max.poll.interval.ms}}. We just need to check for overflow and 
fix it to {{Integer.MAX_VALUE}} when it occurs.


> Overflow in joinGroupTimeoutMs when max.poll.interval.ms is MAX_VALUE leads 
> to missing rebalances
> -
>
> Key: KAFKA-13126
> URL: https://issues.apache.org/jira/browse/KAFKA-13126
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 3.1.0
>
>
> In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was 
> overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this 
> override, users of both the plain consumer client and kafka streams still set 
> the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an 
> overflow when computing the {{joinGroupTimeoutMs}} and results in it being 
> set to the {{request.timeout.ms}} instead, which is much lower.
> This can easily make consumers drop out of the group, since they must rejoin 
> now within 30s (by default) but have no obligation to almost ever call poll() 
> given the high {{max.poll.interval.ms}} – basically they will only do so 
> after processing the last record from the previously polled batch. So in 
> heavy processing cases, where each record takes a long time to process, or 
> when using a very large  {{max.poll.records}}, it can be difficult to make 
> any progress at all before dropping out and needing to rejoin. And of course, 
> the rebalance that is kicked off upon this member rejoining can result in 
> many of the other members in the group dropping out as well, leading to an 
> endless cycle of missed rebalances.
> We just need to check for overflow and fix it to {{Integer.MAX_VALUE}} when 
> it occurs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13126) Overflow in joinGroupTimeoutMs when max.poll.interval.ms is MAX_VALUE leads to missing rebalances

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13126:
--

 Summary: Overflow in joinGroupTimeoutMs when max.poll.interval.ms 
is MAX_VALUE leads to missing rebalances
 Key: KAFKA-13126
 URL: https://issues.apache.org/jira/browse/KAFKA-13126
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: A. Sophie Blee-Goldman
Assignee: A. Sophie Blee-Goldman
 Fix For: 3.1.0


In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was 
overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this 
override, users of both the plain consumer client and kafka streams still set 
the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an 
overflow when computing the {{joinGroupTimeoutMs}} and results in it being set 
to the {{request.timeout.ms}} instead, which is much lower.

This can easily make consumers drop out of the group, since they must rejoin 
now within 30s (by default) yet have no obligation to almost ever call poll() 
given the high {{max.poll.interval.ms}}. We just need to check for overflow and 
fix it to {{Integer.MAX_VALUE}} when it occurs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12724) Add 2.8.0 to system tests and streams upgrade tests

2021-07-22 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-12724:
-
Priority: Blocker  (was: Major)

> Add 2.8.0 to system tests and streams upgrade tests
> ---
>
> Key: KAFKA-12724
> URL: https://issues.apache.org/jira/browse/KAFKA-12724
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Blocker
>
> Kafka v2.8.0 is released. We should add this version to the system tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12724) Add 2.8.0 to system tests and streams upgrade tests

2021-07-22 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-12724:
-
Fix Version/s: 3.0.0

> Add 2.8.0 to system tests and streams upgrade tests
> ---
>
> Key: KAFKA-12724
> URL: https://issues.apache.org/jira/browse/KAFKA-12724
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Kafka v2.8.0 is released. We should add this version to the system tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #11101: MINOR: Remove redundant fields in dump log record output

2021-07-22 Thread GitBox


hachikuji commented on pull request #11101:
URL: https://github.com/apache/kafka/pull/11101#issuecomment-885262803


   @ijuma I've added a test which covers most of the interesting cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #11111: HOTFIX: guard against overflow when computing `joinGroupTimeoutMs`

2021-07-22 Thread GitBox


ableegoldman opened a new pull request #1:
URL: https://github.com/apache/kafka/pull/1


   In older versions of Kafka Streams, the `max.poll.interval.ms` config was 
overridden by default to `Integer.MAX_VALUE`. Even after we removed this 
override, users of both the plain consumer client and kafka streams still set 
the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an 
overflow when computing the `joinGroupTimeoutMs` and results in it being set to 
the `request.timeout.ms` instead, which is much lower.
   
   This can easily make consumers drop out of the group, since they must rejoin 
now within 30s (by default) yet have no obligation to almost ever call poll() 
given the high `max.poll.interval.ms`. We just need to check for overflow and 
fix it to `Integer.MAX_VALUE` when it occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

2021-07-22 Thread GitBox


ableegoldman commented on a change in pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#discussion_r675163448



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -2237,7 +2237,21 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 acquireAndEnsureOpen();
 try {
 final Long lag = subscriptions.partitionLag(topicPartition, 
isolationLevel);
-return lag == null ? OptionalLong.empty() : OptionalLong.of(lag);
+
+// if the log end offset is not known and hence cannot return lag,
+// issue a list offset request for that partition so that next time
+// we may get the answer; we do not need to wait for the return 
value
+// since we would not try to poll the network client synchronously
+if (lag == null) {
+if (subscriptions.partitionEndOffset(topicPartition, 
isolationLevel) == null) {
+log.info("Requesting the log end offset for {} in order to 
compute lag", topicPartition);
+fetcher.endOffsets(Collections.singleton(topicPartition), 
time.timer(0L));

Review comment:
   Ah sorry, I overlooked that we passed in a timeout of 0 (originally 
thought that would throw a TimeoutException but I see now it would just return 
-- nevermind this then)
   
   However I do think it's probably worth taking care not to fire off a million 
requests per second (possible slight over-exaggeration) when we're just waiting 
on the same partition(s). It shouldn't be too complicated to avoid sending 
duplicated requests so imo it's not over-optimization...thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

2021-07-22 Thread GitBox


guozhangwang commented on a change in pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#discussion_r675158219



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -2237,7 +2237,21 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 acquireAndEnsureOpen();
 try {
 final Long lag = subscriptions.partitionLag(topicPartition, 
isolationLevel);
-return lag == null ? OptionalLong.empty() : OptionalLong.of(lag);
+
+// if the log end offset is not known and hence cannot return lag,
+// issue a list offset request for that partition so that next time
+// we may get the answer; we do not need to wait for the return 
value
+// since we would not try to poll the network client synchronously
+if (lag == null) {
+if (subscriptions.partitionEndOffset(topicPartition, 
isolationLevel) == null) {
+log.info("Requesting the log end offset for {} in order to 
compute lag", topicPartition);
+fetcher.endOffsets(Collections.singleton(topicPartition), 
time.timer(0L));

Review comment:
   I modified the fetcher so that it would not wait for the future to 
complete, with timer(0) it would not be a blocking call. Or did I miss anything?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

2021-07-22 Thread GitBox


ableegoldman commented on a change in pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#discussion_r675136826



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -2237,7 +2237,21 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 acquireAndEnsureOpen();
 try {
 final Long lag = subscriptions.partitionLag(topicPartition, 
isolationLevel);
-return lag == null ? OptionalLong.empty() : OptionalLong.of(lag);
+
+// if the log end offset is not known and hence cannot return lag,
+// issue a list offset request for that partition so that next time
+// we may get the answer; we do not need to wait for the return 
value
+// since we would not try to poll the network client synchronously
+if (lag == null) {
+if (subscriptions.partitionEndOffset(topicPartition, 
isolationLevel) == null) {
+log.info("Requesting the log end offset for {} in order to 
compute lag", topicPartition);
+fetcher.endOffsets(Collections.singleton(topicPartition), 
time.timer(0L));

Review comment:
   Isn't this actually a blocking call? I couldn't find anything that 
asserted yes or no in the javadocs, but this ultimately calls down into 
`Fetcher#fetchOffsetsByTimes` which does seem to wait for the request future to 
complete (in fact, it seems to be doing a busy wait on the timer...? that 
doesn't seem right 樂 )




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

2021-07-22 Thread GitBox


ableegoldman commented on a change in pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#discussion_r675136826



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -2237,7 +2237,21 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 acquireAndEnsureOpen();
 try {
 final Long lag = subscriptions.partitionLag(topicPartition, 
isolationLevel);
-return lag == null ? OptionalLong.empty() : OptionalLong.of(lag);
+
+// if the log end offset is not known and hence cannot return lag,
+// issue a list offset request for that partition so that next time
+// we may get the answer; we do not need to wait for the return 
value
+// since we would not try to poll the network client synchronously
+if (lag == null) {
+if (subscriptions.partitionEndOffset(topicPartition, 
isolationLevel) == null) {
+log.info("Requesting the log end offset for {} in order to 
compute lag", topicPartition);
+fetcher.endOffsets(Collections.singleton(topicPartition), 
time.timer(0L));

Review comment:
   Isn't this a blocking call? I couldn't find anything that asserted yes 
or no in the javadocs, but this ultimately calls down into 
`Fetcher#fetchOffsetsByTimes` which does seem to wait for the request future to 
complete (in fact, it seems to be doing a busy wait on the timer...? that 
doesn't seem right 樂 )




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc edited a comment on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings

2021-07-22 Thread GitBox


d8tltanc edited a comment on pull request #11002:
URL: https://github.com/apache/kafka/pull/11002#issuecomment-885177467


   
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once
   
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once_beta
   
   try running them on master without the idempotent default changes
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc edited a comment on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings

2021-07-22 Thread GitBox


d8tltanc edited a comment on pull request #11002:
URL: https://github.com/apache/kafka/pull/11002#issuecomment-885177467






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings

2021-07-22 Thread GitBox


d8tltanc commented on pull request #11002:
URL: https://github.com/apache/kafka/pull/11002#issuecomment-885177467


   
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once
   
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once_beta
   
   try running them on a branch without the idempotent default changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding opened a new pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-07-22 Thread GitBox


ccding opened a new pull request #0:
URL: https://github.com/apache/kafka/pull/0


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12985) CVE-2021-28169 - Upgrade jetty to 9.4.42

2021-07-22 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-12985:
--
Summary: CVE-2021-28169 - Upgrade jetty to 9.4.42  (was: CVE-2021-28169 - 
Upgrade jetty to 9.4.41)

> CVE-2021-28169 - Upgrade jetty to 9.4.42
> 
>
> Key: KAFKA-12985
> URL: https://issues.apache.org/jira/browse/KAFKA-12985
> Project: Kafka
>  Issue Type: Task
>  Components: security
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Minor
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> CVE-2021-28169 vulnerability affects Jetty versions up to 9.4.40. For more 
> information see https://nvd.nist.gov/vuln/detail/CVE-2021-28169
> Upgrading to Jetty version 9.4.41 should address this issue 
> (https://github.com/eclipse/jetty.project/security/advisories/GHSA-gwcr-j4wh-j3cq).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12985) CVE-2021-28169 - Upgrade jetty to 9.4.41

2021-07-22 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-12985.
---
Fix Version/s: 2.8.1
   2.7.2
   3.0.0
   Resolution: Fixed

> CVE-2021-28169 - Upgrade jetty to 9.4.41
> 
>
> Key: KAFKA-12985
> URL: https://issues.apache.org/jira/browse/KAFKA-12985
> Project: Kafka
>  Issue Type: Task
>  Components: security
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Minor
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> CVE-2021-28169 vulnerability affects Jetty versions up to 9.4.40. For more 
> information see https://nvd.nist.gov/vuln/detail/CVE-2021-28169
> Upgrading to Jetty version 9.4.41 should address this issue 
> (https://github.com/eclipse/jetty.project/security/advisories/GHSA-gwcr-j4wh-j3cq).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13070) LogManager shutdown races with periodic work scheduled by the instance

2021-07-22 Thread Kowshik Prakasam (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17385712#comment-17385712
 ] 

Kowshik Prakasam commented on KAFKA-13070:
--

[~manasvigupta]  I didn't realize you had assigned it to yourself. I was 
actually planning on working on this myself. Are you working on a fix for this 
relatively soon? If not, I will address it with a PR relatively soon. Please 
let me know.

> LogManager shutdown races with periodic work scheduled by the instance
> --
>
> Key: KAFKA-13070
> URL: https://issues.apache.org/jira/browse/KAFKA-13070
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kowshik Prakasam
>Assignee: Manasvi Gupta
>Priority: Major
>
> In the LogManager shutdown sequence (in LogManager.shutdown()), we don't 
> cancel the periodic work scheduled by it prior to shutdown. As a result, the 
> periodic work could race with the shutdown sequence causing some unwanted 
> side effects. This is reproducible by a unit test in LogManagerTest.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #11109: KAFKA-13113: Support unregistering Raft listeners

2021-07-22 Thread GitBox


jsancio commented on a change in pull request #11109:
URL: https://github.com/apache/kafka/pull/11109#discussion_r675065535



##
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##
@@ -362,24 +363,27 @@ LeaderAndEpoch notifiedLeader() {
 }
 
 void handleCommit(MemoryBatchReader reader) {
-listener.handleCommit(reader);
+listener.handleCommit(this, reader);
 offset = reader.lastOffset().getAsLong();
 }
 
 void handleSnapshot(SnapshotReader reader) {
-listener.handleSnapshot(reader);
+listener.handleSnapshot(this, reader);
 offset = reader.lastContainedLogOffset();
 }
 
 void handleLeaderChange(long offset, LeaderAndEpoch leader) {
-listener.handleLeaderChange(leader);
+listener.handleLeaderChange(this, leader);
 notifiedLeader = leader;
 this.offset = offset;
 }
 
 void beginShutdown() {
-listener.beginShutdown();
+listener.beginShutdown(this);
 }
+
+@Override
+public void close() {}

Review comment:
   Should fix this by appending an event to eventQueue that removes this 
listener.

##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -2491,6 +2492,47 @@ public void testLateRegisteredListenerCatchesUp() throws 
Exception {
 assertEquals(9L, secondListener.claimedEpochStartOffset(epoch));
 }
 
+@Test
+public void testReregistrationChangesListenerContext() throws Exception {
+int localId = 0;
+int otherNodeId = 1;
+int epoch = 5;
+Set voters = Utils.mkSet(localId, otherNodeId);
+
+List batch1 = Arrays.asList("1", "2", "3");
+List batch2 = Arrays.asList("4", "5", "6");
+List batch3 = Arrays.asList("7", "8", "9");
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.appendToLog(1, batch1)
+.appendToLog(1, batch2)
+.appendToLog(2, batch3)
+.withUnknownLeader(epoch - 1)
+.build();
+
+context.becomeLeader();
+context.client.poll();
+assertEquals(10L, context.log.endOffset().offset);
+
+// Let the initial listener catch up
+context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 10L, 
epoch, 0));

Review comment:
   Use the helper method "advance high-watermark" in a few of these places.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

2021-07-22 Thread GitBox


jeqo commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r675036126



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
##
@@ -156,7 +156,7 @@ public void kTableShouldLogAndMeterOnSkippedRecords() {
 .filter(e -> e.getLevel().equals("WARN"))
 .map(Event::getMessage)
 .collect(Collectors.toList()),
-hasItem("Skipping record due to null key. topic=[topic] 
partition=[0] offset=[0]")
+hasItem("Skipping record due to null key. value=[value] 
topic=[topic] partition=[0] offset=[0]")

Review comment:
   > On another note, I guess we could add a test for the other (new) code 
path when the metadata is absent. I'll leave it up to you.
   
   Happy to add more tests. What I'm wondering is how to test scenarios where 
no context is added 樂 ? TopologyTestDriver allows to nullify this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

2021-07-22 Thread GitBox


jeqo commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r675034484



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##
@@ -90,6 +92,13 @@ public String newStoreName(final String prefix) {
 "store-"
 );
 
+final ProcessorSupplier processorSupplier 
= () ->
+new ContextualProcessor() {
+@Override
+public void process(final Record record) {
+}

Review comment:
   Looking at the tests, they are more related to the thread and store 
states more than value checking. Should we add some store updates as part of 
the processor for _more_ correctness?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 commented on pull request #11062: KAFKA-13094: Session windows do not consider user-specified grace when computing retention time for changelog

2021-07-22 Thread GitBox


tang7526 commented on pull request #11062:
URL: https://github.com/apache/kafka/pull/11062#issuecomment-885088407


   > @tang7526 Could you rebase this PR on the latest 2.8 branch?
   
   @cadonna Done.  I've already rebased this PR on the latest 2.8 branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11097: KAFKA-8529: Flakey test ConsumerBounceTest#testCloseDuringRebalance

2021-07-22 Thread GitBox


jolshan commented on a change in pull request #11097:
URL: https://github.com/apache/kafka/pull/11097#discussion_r675011109



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -144,6 +144,44 @@ class AbstractFetcherThreadTest {
 assertEquals(2L, replicaState.highWatermark)
   }
 
+  @Test
+  def testDelay(): Unit = {
+val partition = new TopicPartition("topic", 0)
+
+class ErrorMockFetcherThread(fetchBackOffMs: Int)
+  extends MockFetcherThread(fetchBackOffMs =  fetchBackOffMs) {
+
+  override def fetchFromLeader(fetchRequest: FetchRequest.Builder): 
Map[TopicPartition, FetchData] = {
+ throw new UnknownTopicIdException("Topic ID was unknown as expected 
for this test")
+  }
+}
+val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000)
+
+fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 
0)))
+
+val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0,
+  new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
+val leaderState = MockFetcherThread.PartitionState(Seq(batch), leaderEpoch 
= 0, highWatermark = 2L)
+fetcher.setLeaderState(partition, leaderState)
+
+// Do work for the first time. This should result in all partitions in 
error.
+val timeBeforeFirst = System.currentTimeMillis()
+fetcher.doWork()
+val timeAfterFirst = System.currentTimeMillis()
+val firstWork = timeAfterFirst - timeBeforeFirst
+
+// The second doWork will pause for fetchBackOffMs since all partitions 
will be delayed
+val timeBeforeSecond = System.currentTimeMillis()
+fetcher.doWork()
+val timeAfterSecond = System.currentTimeMillis()

Review comment:
   Ah hmm. It does seem to be a little flaky for the second check 
(fetchBackOffMs < secondWorkDuration).
   
   In a sample of 50 tests I ran with backOffMs = 500, there were 8 failures 
and it seems like all of them had secondWorkDuration = 500. So maybe I can just 
change to <=
   
   Rerunning with this setup 200 times and with fetchBackOffMs=250, I saw 0 
failures.
   
   Of course, this was all locally. I'm not sure if Jenkins will behave 
differently.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11103: HOTFIX: Set session interval back to 10s for StreamsCooperativeRebalanceUpgradeTest

2021-07-22 Thread GitBox


cadonna commented on pull request #11103:
URL: https://github.com/apache/kafka/pull/11103#issuecomment-885086432


   Cherry-picked to 3.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna merged pull request #11103: HOTFIX: Set session interval back to 10s for StreamsCooperativeRebalanceUpgradeTest

2021-07-22 Thread GitBox


cadonna merged pull request #11103:
URL: https://github.com/apache/kafka/pull/11103


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio opened a new pull request #11109: KAFKA-13113: Support unregistering Raft listeners

2021-07-22 Thread GitBox


jsancio opened a new pull request #11109:
URL: https://github.com/apache/kafka/pull/11109


   Support unregistering by returning a ListenerContext on registration and 
exposing a close method on the returned ListenerContext. To allow the user to 
use the same Listener on different registrations the associated ListenerContext 
is sent through all of the methods described by the Raft Listener.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11103: HOTFIX: Set session interval back to 10s for StreamsCooperativeRebalanceUpgradeTest

2021-07-22 Thread GitBox


cadonna commented on pull request #11103:
URL: https://github.com/apache/kafka/pull/11103#issuecomment-885084926


   Test failure are unrelated and known to be flaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11097: KAFKA-8529: Flakey test ConsumerBounceTest#testCloseDuringRebalance

2021-07-22 Thread GitBox


hachikuji commented on a change in pull request #11097:
URL: https://github.com/apache/kafka/pull/11097#discussion_r674981462



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -144,6 +144,44 @@ class AbstractFetcherThreadTest {
 assertEquals(2L, replicaState.highWatermark)
   }
 
+  @Test
+  def testDelay(): Unit = {
+val partition = new TopicPartition("topic", 0)
+
+class ErrorMockFetcherThread(fetchBackOffMs: Int)
+  extends MockFetcherThread(fetchBackOffMs =  fetchBackOffMs) {
+
+  override def fetchFromLeader(fetchRequest: FetchRequest.Builder): 
Map[TopicPartition, FetchData] = {
+ throw new UnknownTopicIdException("Topic ID was unknown as expected 
for this test")
+  }
+}
+val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000)

Review comment:
   nit: create a `val` for `fetchBackoffMs`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

2021-07-22 Thread GitBox


hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r675005086



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
 }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-scheduler.schedule("transactionalId-expiration", () => {
-  val now = time.milliseconds()
-  inReadLock(stateLock) {
-val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-  transactionMetadataCache.flatMap { case (_, entry) =>
-entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
-  case Empty | CompleteCommit | CompleteAbort => true
-  case _ => false
-}
-}.filter { case (_, txnMetadata) =>
-  txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
-}.map { case (transactionalId, txnMetadata) =>
-  val txnMetadataTransition = txnMetadata.inLock {
-txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+partitionId: Int,
+partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+val currentTimeMs = time.milliseconds()
+
+inReadLock(stateLock) {
+  val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+  replicaManager.getLogConfig(transactionPartition) match {
+case Some(logConfig) =>
+  val maxBatchSize = logConfig.maxMessageSize
+  val expired = 
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+  lazy val recordsBuilder = MemoryRecords.builder(
+ByteBuffer.allocate(math.min(16384, maxBatchSize)),

Review comment:
   Note that the buffer will still grow to reach the limit of 
max.message.bytes. I agree, however, that one hour is a long time to wait. Let 
me look into triggering the next run right away.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna edited a comment on pull request #11062: KAFKA-13094: Session windows do not consider user-specified grace when computing retention time for changelog

2021-07-22 Thread GitBox


cadonna edited a comment on pull request #11062:
URL: https://github.com/apache/kafka/pull/11062#issuecomment-885080580


   @tang7526 Could you rebase this PR on the latest 2.8 branch?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11062: KAFKA-13094: Session windows do not consider user-specified grace when computing retention time for changelog

2021-07-22 Thread GitBox


cadonna commented on pull request #11062:
URL: https://github.com/apache/kafka/pull/11062#issuecomment-885080580


   @tang7526 Could you rebase this PR on the latest 2.8 branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

2021-07-22 Thread GitBox


jeqo commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r675002211



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##
@@ -92,33 +98,62 @@ public void init(final 
org.apache.kafka.streams.processor.ProcessorContext conte
 }
 
 @Override
-public void process(final K key, final V value) {
+public void process(final Record record) {
 // if the key is null, then ignore the record
-if (key == null) {
-LOG.warn(
-"Skipping record due to null key. topic=[{}] 
partition=[{}] offset=[{}]",
-context().topic(), context().partition(), 
context().offset()
-);
+if (record.key() == null) {
+if (context.recordMetadata().isPresent()) {
+final RecordMetadata recordMetadata = 
context.recordMetadata().get();
+LOG.warn(
+"Skipping record due to null key. "
++ "value=[{}] topic=[{}] partition=[{}] 
offset=[{}]",

Review comment:
   No worries. This is actually a good findings, as there are other 16 
places where we are logging value and keys.  Maybe let's create a issue to 
handle this in another PR? 
   
   https://user-images.githubusercontent.com/6180701/126681148-2bd07547-5f56-4cc8-a9f1-88b4826b983e.png;>
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11097: KAFKA-8529: Flakey test ConsumerBounceTest#testCloseDuringRebalance

2021-07-22 Thread GitBox


jolshan commented on a change in pull request #11097:
URL: https://github.com/apache/kafka/pull/11097#discussion_r675000400



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -144,6 +144,44 @@ class AbstractFetcherThreadTest {
 assertEquals(2L, replicaState.highWatermark)
   }
 
+  @Test
+  def testDelay(): Unit = {
+val partition = new TopicPartition("topic", 0)
+
+class ErrorMockFetcherThread(fetchBackOffMs: Int)
+  extends MockFetcherThread(fetchBackOffMs =  fetchBackOffMs) {
+
+  override def fetchFromLeader(fetchRequest: FetchRequest.Builder): 
Map[TopicPartition, FetchData] = {
+ throw new UnknownTopicIdException("Topic ID was unknown as expected 
for this test")
+  }
+}
+val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000)
+
+fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 
0)))
+
+val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0,
+  new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
+val leaderState = MockFetcherThread.PartitionState(Seq(batch), leaderEpoch 
= 0, highWatermark = 2L)
+fetcher.setLeaderState(partition, leaderState)
+
+// Do work for the first time. This should result in all partitions in 
error.
+val timeBeforeFirst = System.currentTimeMillis()
+fetcher.doWork()
+val timeAfterFirst = System.currentTimeMillis()
+val firstWork = timeAfterFirst - timeBeforeFirst
+
+// The second doWork will pause for fetchBackOffMs since all partitions 
will be delayed
+val timeBeforeSecond = System.currentTimeMillis()
+fetcher.doWork()
+val timeAfterSecond = System.currentTimeMillis()

Review comment:
   As mentioned above, the typical time to run the first doWork was about 
30ms on my machine. I can run a batch of these tests to ensure it is not flaky.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11097: KAFKA-8529: Flakey test ConsumerBounceTest#testCloseDuringRebalance

2021-07-22 Thread GitBox


jolshan commented on a change in pull request #11097:
URL: https://github.com/apache/kafka/pull/11097#discussion_r674999603



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -144,6 +144,44 @@ class AbstractFetcherThreadTest {
 assertEquals(2L, replicaState.highWatermark)
   }
 
+  @Test
+  def testDelay(): Unit = {
+val partition = new TopicPartition("topic", 0)
+
+class ErrorMockFetcherThread(fetchBackOffMs: Int)
+  extends MockFetcherThread(fetchBackOffMs =  fetchBackOffMs) {
+
+  override def fetchFromLeader(fetchRequest: FetchRequest.Builder): 
Map[TopicPartition, FetchData] = {
+ throw new UnknownTopicIdException("Topic ID was unknown as expected 
for this test")
+  }
+}
+val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000)

Review comment:
   Sure. I just went with the default config value. Would 500 still be too 
high? In my testing, the non-delayed call usually took 30ms on my local 
machine, so we have quite a ways to go before we get close.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11097: KAFKA-8529: Flakey test ConsumerBounceTest#testCloseDuringRebalance

2021-07-22 Thread GitBox


hachikuji commented on a change in pull request #11097:
URL: https://github.com/apache/kafka/pull/11097#discussion_r674990877



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -144,6 +144,44 @@ class AbstractFetcherThreadTest {
 assertEquals(2L, replicaState.highWatermark)
   }
 
+  @Test
+  def testDelay(): Unit = {
+val partition = new TopicPartition("topic", 0)
+
+class ErrorMockFetcherThread(fetchBackOffMs: Int)
+  extends MockFetcherThread(fetchBackOffMs =  fetchBackOffMs) {
+
+  override def fetchFromLeader(fetchRequest: FetchRequest.Builder): 
Map[TopicPartition, FetchData] = {
+ throw new UnknownTopicIdException("Topic ID was unknown as expected 
for this test")
+  }
+}
+val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000)

Review comment:
   nit: can you create a `val` for `fetchBackoffMs`? Then we can use it 
below as well.
   
   Also, could we use a lower value for the backoff? Would that make the test 
faster?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11097: KAFKA-8529: Flakey test ConsumerBounceTest#testCloseDuringRebalance

2021-07-22 Thread GitBox


jolshan commented on a change in pull request #11097:
URL: https://github.com/apache/kafka/pull/11097#discussion_r674999603



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -144,6 +144,44 @@ class AbstractFetcherThreadTest {
 assertEquals(2L, replicaState.highWatermark)
   }
 
+  @Test
+  def testDelay(): Unit = {
+val partition = new TopicPartition("topic", 0)
+
+class ErrorMockFetcherThread(fetchBackOffMs: Int)
+  extends MockFetcherThread(fetchBackOffMs =  fetchBackOffMs) {
+
+  override def fetchFromLeader(fetchRequest: FetchRequest.Builder): 
Map[TopicPartition, FetchData] = {
+ throw new UnknownTopicIdException("Topic ID was unknown as expected 
for this test")
+  }
+}
+val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000)

Review comment:
   Sure. I just went with the default value. Would 500 still be too high? 
In my testing, the non-delayed call usually took 30ms on my local machine, so 
we have quite a ways to go before we get close.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11097: KAFKA-8529: Flakey test ConsumerBounceTest#testCloseDuringRebalance

2021-07-22 Thread GitBox


hachikuji commented on a change in pull request #11097:
URL: https://github.com/apache/kafka/pull/11097#discussion_r674994728



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -144,6 +144,44 @@ class AbstractFetcherThreadTest {
 assertEquals(2L, replicaState.highWatermark)
   }
 
+  @Test
+  def testDelay(): Unit = {
+val partition = new TopicPartition("topic", 0)
+
+class ErrorMockFetcherThread(fetchBackOffMs: Int)
+  extends MockFetcherThread(fetchBackOffMs =  fetchBackOffMs) {
+
+  override def fetchFromLeader(fetchRequest: FetchRequest.Builder): 
Map[TopicPartition, FetchData] = {
+ throw new UnknownTopicIdException("Topic ID was unknown as expected 
for this test")
+  }
+}
+val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000)
+
+fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 
0)))
+
+val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0,
+  new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
+val leaderState = MockFetcherThread.PartitionState(Seq(batch), leaderEpoch 
= 0, highWatermark = 2L)
+fetcher.setLeaderState(partition, leaderState)
+
+// Do work for the first time. This should result in all partitions in 
error.
+val timeBeforeFirst = System.currentTimeMillis()
+fetcher.doWork()
+val timeAfterFirst = System.currentTimeMillis()
+val firstWork = timeAfterFirst - timeBeforeFirst
+
+// The second doWork will pause for fetchBackOffMs since all partitions 
will be delayed
+val timeBeforeSecond = System.currentTimeMillis()
+fetcher.doWork()
+val timeAfterSecond = System.currentTimeMillis()

Review comment:
   The reliance on real time is a little annoying. Have you run this a few 
times to make sure it is not flaky?

##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -144,6 +144,44 @@ class AbstractFetcherThreadTest {
 assertEquals(2L, replicaState.highWatermark)
   }
 
+  @Test
+  def testDelay(): Unit = {
+val partition = new TopicPartition("topic", 0)
+
+class ErrorMockFetcherThread(fetchBackOffMs: Int)
+  extends MockFetcherThread(fetchBackOffMs =  fetchBackOffMs) {
+
+  override def fetchFromLeader(fetchRequest: FetchRequest.Builder): 
Map[TopicPartition, FetchData] = {
+ throw new UnknownTopicIdException("Topic ID was unknown as expected 
for this test")
+  }
+}
+val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000)

Review comment:
   nit: can you create a `val` for `fetchBackoffMs`? Then we can use it 
below as well.
   
   Also, could we use a lower value for the backoff?

##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -144,6 +144,44 @@ class AbstractFetcherThreadTest {
 assertEquals(2L, replicaState.highWatermark)
   }
 
+  @Test
+  def testDelay(): Unit = {
+val partition = new TopicPartition("topic", 0)
+
+class ErrorMockFetcherThread(fetchBackOffMs: Int)
+  extends MockFetcherThread(fetchBackOffMs =  fetchBackOffMs) {
+
+  override def fetchFromLeader(fetchRequest: FetchRequest.Builder): 
Map[TopicPartition, FetchData] = {
+ throw new UnknownTopicIdException("Topic ID was unknown as expected 
for this test")
+  }
+}
+val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000)

Review comment:
   nit: create a `val` for `fetchBackoffMs`

##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -144,6 +144,44 @@ class AbstractFetcherThreadTest {
 assertEquals(2L, replicaState.highWatermark)
   }
 
+  @Test
+  def testDelay(): Unit = {
+val partition = new TopicPartition("topic", 0)
+
+class ErrorMockFetcherThread(fetchBackOffMs: Int)
+  extends MockFetcherThread(fetchBackOffMs =  fetchBackOffMs) {
+
+  override def fetchFromLeader(fetchRequest: FetchRequest.Builder): 
Map[TopicPartition, FetchData] = {
+ throw new UnknownTopicIdException("Topic ID was unknown as expected 
for this test")
+  }
+}
+val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = 1000)
+
+fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 
0)))
+
+val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0,
+  new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
+val leaderState = MockFetcherThread.PartitionState(Seq(batch), leaderEpoch 
= 0, highWatermark = 2L)
+fetcher.setLeaderState(partition, leaderState)
+
+// Do work for the first time. This should result in all partitions in 
error.
+val timeBeforeFirst = System.currentTimeMillis()
+fetcher.doWork()
+val 

[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

2021-07-22 Thread GitBox


hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r674988520



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
 }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-scheduler.schedule("transactionalId-expiration", () => {
-  val now = time.milliseconds()
-  inReadLock(stateLock) {
-val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-  transactionMetadataCache.flatMap { case (_, entry) =>
-entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
-  case Empty | CompleteCommit | CompleteAbort => true
-  case _ => false
-}
-}.filter { case (_, txnMetadata) =>
-  txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
-}.map { case (transactionalId, txnMetadata) =>
-  val txnMetadataTransition = txnMetadata.inLock {
-txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+partitionId: Int,
+partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+val currentTimeMs = time.milliseconds()
+
+inReadLock(stateLock) {
+  val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+  replicaManager.getLogConfig(transactionPartition) match {
+case Some(logConfig) =>
+  val maxBatchSize = logConfig.maxMessageSize
+  val expired = 
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+  lazy val recordsBuilder = MemoryRecords.builder(
+ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+TransactionLog.EnforcedCompressionType,
+TimestampType.CREATE_TIME,
+0L,
+maxBatchSize
+  )
+
+  partitionCacheEntry.metadataPerTransactionalId.foreachWhile { 
(transactionalId, txnMetadata) =>
+txnMetadata.inLock {
+  if (!shouldExpire(txnMetadata, currentTimeMs)) {
+true
+  } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, 
currentTimeMs, maxBatchSize)) {
+val transitMetadata = txnMetadata.prepareDead()
+expired += TransactionalIdCoordinatorEpochAndMetadata(
+  transactionalId,
+  partitionCacheEntry.coordinatorEpoch,
+  transitMetadata
+)
+true
+  } else {
+// If the batch is full, return false to end the search. Any 
remaining
+// transactionalIds eligible for expiration can be picked next 
time.
+false
   }
-  TransactionalIdCoordinatorEpochAndMetadata(transactionalId, 
entry.coordinatorEpoch, txnMetadataTransition)
 }
-  }.groupBy { transactionalIdCoordinatorEpochAndMetadata =>
-
partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId)
   }
 
-val recordsPerPartition = transactionalIdByPartition
-  .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas) 
=>
-val deletes: Array[SimpleRecord] = 
transactionalIdCoordinatorEpochAndMetadatas.map { entry =>
-  new SimpleRecord(now, 
TransactionLog.keyToBytes(entry.transactionalId), null)
-}.toArray
-val records = 
MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*)
-val topicPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
-(topicPartition, records)
+  if (expired.isEmpty) {
+(Seq.empty, MemoryRecords.EMPTY)
+  } else {
+(expired, recordsBuilder.build())
   }
 
-def removeFromCacheCallback(responses: collection.Map[TopicPartition, 
PartitionResponse]): Unit = {
-  responses.forKeyValue { (topicPartition, response) =>
-inReadLock(stateLock) {
-  val toRemove = 
transactionalIdByPartition(topicPartition.partition)
-  transactionMetadataCache.get(topicPartition.partition).foreach { 
txnMetadataCacheEntry =>
-toRemove.foreach { idCoordinatorEpochAndMetadata =>
-  val transactionalId = 
idCoordinatorEpochAndMetadata.transactionalId
-  val txnMetadata = 
txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
-  txnMetadata.inLock {
-if (txnMetadataCacheEntry.coordinatorEpoch == 
idCoordinatorEpochAndMetadata.coordinatorEpoch
-  && txnMetadata.pendingState.contains(Dead)
-   

[GitHub] [kafka] hachikuji commented on pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

2021-07-22 Thread GitBox


hachikuji commented on pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#issuecomment-885065413


   @dajac Yeah, I was thinking about that too. I'll open a jira and we can 
address that separately.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11108: KAFKA-13116: Fix message_format_change_test and compatibility_test_new_broker_test failures

2021-07-22 Thread GitBox


ijuma commented on a change in pull request #11108:
URL: https://github.com/apache/kafka/pull/11108#discussion_r674978912



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -785,7 +785,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   Some(RecordBatch.MAGIC_VALUE_V1)
 else
   None
-  }
+  }.filter(_ => unconvertedRecords.batchIterator.hasNext)

Review comment:
   That code is deprecated for removal and it has been this way for years, 
is it worth it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases

2021-07-22 Thread GitBox


ijuma commented on pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#issuecomment-885055345


   @showuon can you please file a blocker Jira for 3.0? Do you have cycles to 
fix this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11108: KAFKA-13116: Fix message_format_change_test and compatibility_test_new_broker_test failures

2021-07-22 Thread GitBox


hachikuji commented on a change in pull request #11108:
URL: https://github.com/apache/kafka/pull/11108#discussion_r674975130



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -785,7 +785,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   Some(RecordBatch.MAGIC_VALUE_V1)
 else
   None
-  }
+  }.filter(_ => unconvertedRecords.batchIterator.hasNext)

Review comment:
   This is fine I guess, but it kind of feels like we should fix the 
down-conversion code if it is broken.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10074: KAFKA-12305: Fix Flatten SMT for array types

2021-07-22 Thread GitBox


C0urante commented on pull request #10074:
URL: https://github.com/apache/kafka/pull/10074#issuecomment-885046027


   @tombentley know it's been a while and we're probably past the boat for 3.0, 
but wanted to check in and see if there's anything blocking this PR from being 
merged at the moment?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino edited a comment on pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases

2021-07-22 Thread GitBox


rondagostino edited a comment on pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#issuecomment-885022501


   Yeah, it looks like we call `self.zk.describe()` in 3 system tests; the one 
above and also in `zookeeper_tls_encrypt_only_test.py` and 
`zookeeper_tls_test.py`.  The latter two are simply confirming that the 
`--zk-tls-config-file` parameter will work with `kafka-configs.sh` (i.e. that 
it can talk to TLS-enabled ZooKeeper nodes), so we could easily change those to 
do something with SCRAM configs.
   
   The one mentioned here is simply trying to surface an error as early as 
possible as per the comment:
   ```
   # Confirm we have a successful ZooKeeper upgrade by describing the 
topic.
   # Not trying to detect a problem here leads to failure in the 
ensuing Kafka roll, which would be a less
   # intuitive failure than seeing a problem here, so detect ZooKeeper 
upgrade problems before involving Kafka.
   self.zk.describe(self.topic)
   ```
   
   So we could either just get rid of it or maybe list ACLs/do something with 
SCRAM.
   
   It also looks like `describe()` and the above 3 invocations of it are the 
only things we have to worry about here -- the ZK Security Migrator in 
`zookeeper_migration()` and listing ACLs via the `list_acls()` method are the 
only other things that access ZooKeeper directly, and both of these are fully 
supported.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases

2021-07-22 Thread GitBox


rondagostino commented on pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#issuecomment-885022501


   Yeah, it looks like we call `self.zk.describe()` in 3 system tests; the one 
above and also in `zookeeper_tls_encrypt_only_test.py` and 
`zookeeper_tls_test.py`.  The latter two are simply confirming that the 
`--zk-tls-config-file` parameter will work with `kafka-configs.sh` (i.e. that 
it can talk to TLS-enabled ZooKeeper nodes), so we could easily change those to 
do something with SCRAM configs.
   
   The one mentioned here is simply trying to surface an error as early as 
possible as per the comment:
   ```
   # Confirm we have a successful ZooKeeper upgrade by describing the 
topic.
   # Not trying to detect a problem here leads to failure in the 
ensuing Kafka roll, which would be a less
   # intuitive failure than seeing a problem here, so detect ZooKeeper 
upgrade problems before involving Kafka.
   self.zk.describe(self.topic)
   ```
   
   So we could either just get rid of it or maybe list ACLs/do something with 
SCRAM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases

2021-07-22 Thread GitBox


ijuma edited a comment on pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#issuecomment-885004599


   @showuon I'm seeing the following in the system test `upgrade_test` 
(from_kafka_version=0.9.0.1.to_message_format_version=0.9.0.1.compression_types=.none):
   
   > INFO  - 2021-07-22 08:16:06,976 - runner_client - log - lineno:241]: 
RunnerClient: 
kafkatest.tests.core.upgrade_test.TestUpgrade.test_upgrade.from_kafka_version=0.9.0.1.to_message_format_version=0.9.0.1.compression_types=.none:
 FAIL: Rem
   > oteCommandError({'ssh_config': {'host': 'ducker11', 'hostname': 
'ducker11', 'user': 'ducker', 'port': 22, 'password': '', 'identityfile': 
'/home/ducker/.ssh/id_rsa'}, 'hostname': 'ducker11', 'ssh_hostname': 
'ducker11', 'user': 'ducker', '
   > externally_routable_ip': 'ducker11', '_logger': , 'os': 'linux', '_ssh_clie
   > nt': , '_sftp_client': 
}, 
'/opt/kafka-dev/bin/kafka-run-class.sh kafka.admin.ConfigCommand --zookeeper 
ducker11:2181  --describe
   >  --topic test_topic', 1, b'')
   > Traceback (most recent call last):
   >   File 
"/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 
133, in run
   > data = self.run_test()
   >   File 
"/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 
190, in run_test
   > return self.test_context.function(self.test)
   >   File "/usr/local/lib/python3.7/dist-packages/ducktape/mark/_mark.py", 
line 429, in wrapper
   > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
   >   File "/opt/kafka-dev/tests/kafkatest/tests/core/upgrade_test.py", line 
199, in test_upgrade
   > cluster_id = self.kafka.cluster_id()
   >   File "/opt/kafka-dev/tests/kafkatest/tests/produce_consume_validate.py", 
line 105, in run_produce_consume_validate
   > core_test_action(*args)
   >   File "/opt/kafka-dev/tests/kafkatest/tests/core/upgrade_test.py", line 
200, in 
   > assert cluster_id is not None
   >   File "/opt/kafka-dev/tests/kafkatest/tests/core/upgrade_test.py", line 
57, in perform_upgrade
   > self.zk.describe(self.topic)
   >   File "/opt/kafka-dev/tests/kafkatest/services/zookeeper.py", line 234, 
in describe
   > output = self.nodes[0].account.ssh_output(cmd)
   >   File 
"/usr/local/lib/python3.7/dist-packages/ducktape/cluster/remoteaccount.py", 
line 370, in ssh_output
   > raise RemoteCommandError(self, cmd, exit_status, stderr.read())
   > ducktape.cluster.remoteaccount.RemoteCommandError: ducker@ducker11: 
Command '/opt/kafka-dev/bin/kafka-run-class.sh kafka.admin.ConfigCommand 
--zookeeper ducker11:2181  --describe --topic test_topic' returned non-zero 
exit status 1.
   
   Is this related to this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases

2021-07-22 Thread GitBox


ijuma commented on pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#issuecomment-885004599


   @showuon I'm seeing the following in the system test `upgrade_test`:
   
   > INFO  - 2021-07-22 08:16:06,976 - runner_client - log - lineno:241]: 
RunnerClient: 
kafkatest.tests.core.upgrade_test.TestUpgrade.test_upgrade.from_kafka_version=0.9.0.1.to_message_format_version=0.9.0.1.compression_types=.none:
 FAIL: Rem
   > oteCommandError({'ssh_config': {'host': 'ducker11', 'hostname': 
'ducker11', 'user': 'ducker', 'port': 22, 'password': '', 'identityfile': 
'/home/ducker/.ssh/id_rsa'}, 'hostname': 'ducker11', 'ssh_hostname': 
'ducker11', 'user': 'ducker', '
   > externally_routable_ip': 'ducker11', '_logger': , 'os': 'linux', '_ssh_clie
   > nt': , '_sftp_client': 
}, 
'/opt/kafka-dev/bin/kafka-run-class.sh kafka.admin.ConfigCommand --zookeeper 
ducker11:2181  --describe
   >  --topic test_topic', 1, b'')
   > Traceback (most recent call last):
   >   File 
"/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 
133, in run
   > data = self.run_test()
   >   File 
"/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 
190, in run_test
   > return self.test_context.function(self.test)
   >   File "/usr/local/lib/python3.7/dist-packages/ducktape/mark/_mark.py", 
line 429, in wrapper
   > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
   >   File "/opt/kafka-dev/tests/kafkatest/tests/core/upgrade_test.py", line 
199, in test_upgrade
   > cluster_id = self.kafka.cluster_id()
   >   File "/opt/kafka-dev/tests/kafkatest/tests/produce_consume_validate.py", 
line 105, in run_produce_consume_validate
   > core_test_action(*args)
   >   File "/opt/kafka-dev/tests/kafkatest/tests/core/upgrade_test.py", line 
200, in 
   > assert cluster_id is not None
   >   File "/opt/kafka-dev/tests/kafkatest/tests/core/upgrade_test.py", line 
57, in perform_upgrade
   > self.zk.describe(self.topic)
   >   File "/opt/kafka-dev/tests/kafkatest/services/zookeeper.py", line 234, 
in describe
   > output = self.nodes[0].account.ssh_output(cmd)
   >   File 
"/usr/local/lib/python3.7/dist-packages/ducktape/cluster/remoteaccount.py", 
line 370, in ssh_output
   > raise RemoteCommandError(self, cmd, exit_status, stderr.read())
   > ducktape.cluster.remoteaccount.RemoteCommandError: ducker@ducker11: 
Command '/opt/kafka-dev/bin/kafka-run-class.sh kafka.admin.ConfigCommand 
--zookeeper ducker11:2181  --describe --topic test_topic' returned non-zero 
exit status 1.
   
   Is this related to this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #11108: KAFKA-13116: Adjust system tests due to KAFKA-12944

2021-07-22 Thread GitBox


ijuma opened a new pull request #11108:
URL: https://github.com/apache/kafka/pull/11108


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

2021-07-22 Thread GitBox


vvcephei commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r674847014



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##
@@ -92,33 +98,62 @@ public void init(final 
org.apache.kafka.streams.processor.ProcessorContext conte
 }
 
 @Override
-public void process(final K key, final V value) {
+public void process(final Record record) {
 // if the key is null, then ignore the record
-if (key == null) {
-LOG.warn(
-"Skipping record due to null key. topic=[{}] 
partition=[{}] offset=[{}]",
-context().topic(), context().partition(), 
context().offset()
-);
+if (record.key() == null) {
+if (context.recordMetadata().isPresent()) {
+final RecordMetadata recordMetadata = 
context.recordMetadata().get();
+LOG.warn(
+"Skipping record due to null key. "
++ "value=[{}] topic=[{}] partition=[{}] 
offset=[{}]",
+record.value(),
+recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
+);
+} else {
+LOG.warn(
+"Skipping record due to null key. "
++ "value=[{}]. Topic, partition, and offset not 
known.",
+record.value()
+);
+}
 droppedRecordsSensor.record();
 return;
 }
 
 if (queryableName != null) {
-final ValueAndTimestamp oldValueAndTimestamp = 
store.get(key);
-final V oldValue;
+final ValueAndTimestamp oldValueAndTimestamp = 
store.get(record.key());
+final VIn oldValue;
 if (oldValueAndTimestamp != null) {
 oldValue = oldValueAndTimestamp.value();
-if (context().timestamp() < 
oldValueAndTimestamp.timestamp()) {
-LOG.warn("Detected out-of-order KTable update for {} 
at offset {}, partition {}.",
-store.name(), context().offset(), 
context().partition());
+if (record.timestamp() < oldValueAndTimestamp.timestamp()) 
{
+if (context.recordMetadata().isPresent()) {
+final RecordMetadata recordMetadata = 
context.recordMetadata().get();
+LOG.warn(
+"Detected out-of-order KTable update for {}, "
++ "old timestamp=[{}] new timestamp=[{}]. "
++ "value=[{}] topic=[{}] partition=[{}] 
offset=[{}].",

Review comment:
   also here

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##
@@ -92,33 +98,62 @@ public void init(final 
org.apache.kafka.streams.processor.ProcessorContext conte
 }
 
 @Override
-public void process(final K key, final V value) {
+public void process(final Record record) {
 // if the key is null, then ignore the record
-if (key == null) {
-LOG.warn(
-"Skipping record due to null key. topic=[{}] 
partition=[{}] offset=[{}]",
-context().topic(), context().partition(), 
context().offset()
-);
+if (record.key() == null) {
+if (context.recordMetadata().isPresent()) {
+final RecordMetadata recordMetadata = 
context.recordMetadata().get();
+LOG.warn(
+"Skipping record due to null key. "
++ "value=[{}] topic=[{}] partition=[{}] 
offset=[{}]",

Review comment:
   Oh, I'm sorry, but it looks like we need one more revision. Useful as it 
would be at times, we can't log any data (keys, values, or headers) because it 
might leak sensitive information into the logs.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
##
@@ -156,7 +156,7 @@ public void kTableShouldLogAndMeterOnSkippedRecords() {
 .filter(e -> e.getLevel().equals("WARN"))
 .map(Event::getMessage)
 .collect(Collectors.toList()),
-hasItem("Skipping record due to null key. topic=[topic] 
partition=[0] offset=[0]")
+hasItem("Skipping record due to null key. value=[value] 
topic=[topic] partition=[0] offset=[0]")

Review comment:
   I probably don't need to point this out, but this will have to change 
back when you remove the value from the production code.
   
   On 

[GitHub] [kafka] showuon commented on pull request #11105: KAFKA-13123: close KeyValueIterator instances in example code and tests

2021-07-22 Thread GitBox


showuon commented on pull request #11105:
URL: https://github.com/apache/kafka/pull/11105#issuecomment-884931769


   @mjsax @bbejeck , could you help review this PR (and the other 2 similar 
PRs:  #11106, #11107). Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11105: KAFKA-13123: close KeyValueIterator instances in example code and tests

2021-07-22 Thread GitBox


showuon commented on pull request #11105:
URL: https://github.com/apache/kafka/pull/11105#issuecomment-884877170


   Failed tests are unrelated. Thanks.
   ```
   Build / JDK 11 and Scala 2.13 / 
kafka.api.TransactionsTest.testSendOffsetsToTransactionTimeout()
   Build / JDK 11 and Scala 2.13 / 
kafka.log.LogCleanerParameterizedIntegrationTest.[5] codec=ZSTD
   Build / JDK 11 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment()
   Build / JDK 11 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
   Build / JDK 11 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
   Build / JDK 8 and Scala 2.12 / 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11106: KAFKA-13124: close KeyValueIterator instance in internals tests (part 1)

2021-07-22 Thread GitBox


showuon commented on pull request #11106:
URL: https://github.com/apache/kafka/pull/11106#issuecomment-884876686


   Failed tests are unrelated. Thanks.
   ```
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, 
useInlinePem=true
   Build / JDK 8 and Scala 2.12 / 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
   Build / JDK 8 and Scala 2.12 / 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
   Build / JDK 8 and Scala 2.12 / 
kafka.network.SocketServerTest.testIdleConnection()
   Build / JDK 11 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
   Build / JDK 11 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
   Build / JDK 11 and Scala 2.13 / 
kafka.api.TransactionsExpirationTest.testBumpTransactionalEpochAfterInvalidProducerIdMapping()
   Build / JDK 16 and Scala 2.13 / 
kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsToLatest()
   Build / JDK 16 and Scala 2.13 / 
kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsShiftByLowerThanEarliest()
   Build / JDK 16 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoeCqupt edited a comment on pull request #11089: MINOR: remove unnecessary judgment in AdminUtils::assignReplicasToBrokersRackAware

2021-07-22 Thread GitBox


JoeCqupt edited a comment on pull request #11089:
URL: https://github.com/apache/kafka/pull/11089#issuecomment-884671155


   call for review @ijuma @guozhangwang @hachikuji @mjsax 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #10277: KAFKA-9914: Fix replication cycle detection

2021-07-22 Thread GitBox


mimaison commented on pull request #10277:
URL: https://github.com/apache/kafka/pull/10277#issuecomment-884830191


   @tvainika It looks like a few `IdentityReplicationIntegrationTest` tests are 
failing. Can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #10973: KAFKA-13033: COORDINATOR_NOT_AVAILABLE should be unmapped

2021-07-22 Thread GitBox


mimaison commented on pull request #10973:
URL: https://github.com/apache/kafka/pull/10973#issuecomment-884824815


   Sorry I was away for a bit. Thanks @showuon and @dajac for following up on 
KIP-699!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

2021-07-22 Thread GitBox


dajac commented on pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#issuecomment-884809208


   btw, I was looking at the code which expires groups and it seems that it 
does not consider the max batch size neither, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

2021-07-22 Thread GitBox


dajac commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r674676502



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
 }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-scheduler.schedule("transactionalId-expiration", () => {
-  val now = time.milliseconds()
-  inReadLock(stateLock) {
-val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-  transactionMetadataCache.flatMap { case (_, entry) =>
-entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
-  case Empty | CompleteCommit | CompleteAbort => true
-  case _ => false
-}
-}.filter { case (_, txnMetadata) =>
-  txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
-}.map { case (transactionalId, txnMetadata) =>
-  val txnMetadataTransition = txnMetadata.inLock {
-txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+partitionId: Int,
+partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+val currentTimeMs = time.milliseconds()
+
+inReadLock(stateLock) {
+  val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+  replicaManager.getLogConfig(transactionPartition) match {
+case Some(logConfig) =>
+  val maxBatchSize = logConfig.maxMessageSize
+  val expired = 
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+  lazy val recordsBuilder = MemoryRecords.builder(
+ByteBuffer.allocate(math.min(16384, maxBatchSize)),

Review comment:
   I do agree that 16k seems quite reasonable for the common case. The 
downside is that we have to wait another hour to clean the remaining ones if 
they are many transactions to be expired.

##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
 }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-scheduler.schedule("transactionalId-expiration", () => {
-  val now = time.milliseconds()
-  inReadLock(stateLock) {
-val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-  transactionMetadataCache.flatMap { case (_, entry) =>
-entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
-  case Empty | CompleteCommit | CompleteAbort => true
-  case _ => false
-}
-}.filter { case (_, txnMetadata) =>
-  txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
-}.map { case (transactionalId, txnMetadata) =>
-  val txnMetadataTransition = txnMetadata.inLock {
-txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+partitionId: Int,
+partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+val currentTimeMs = time.milliseconds()
+
+inReadLock(stateLock) {
+  val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+  replicaManager.getLogConfig(transactionPartition) match {
+case Some(logConfig) =>
+  val maxBatchSize = logConfig.maxMessageSize
+  val expired = 
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+  lazy val recordsBuilder = MemoryRecords.builder(
+ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+TransactionLog.EnforcedCompressionType,
+TimestampType.CREATE_TIME,
+0L,
+maxBatchSize
+  )
+
+  partitionCacheEntry.metadataPerTransactionalId.foreachWhile { 
(transactionalId, txnMetadata) =>
+txnMetadata.inLock {
+  if (!shouldExpire(txnMetadata, currentTimeMs)) {
+true
+  } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, 
currentTimeMs, maxBatchSize)) {
+val transitMetadata = txnMetadata.prepareDead()
+expired += TransactionalIdCoordinatorEpochAndMetadata(
+  transactionalId,
+  partitionCacheEntry.coordinatorEpoch,
+  transitMetadata
+)
+true
+  } else {
+// If the batch is full, return false to end the search. Any 
remaining
+// transactionalIds eligible for expiration can be 

[jira] [Resolved] (KAFKA-13069) Add magic number to DefaultKafkaPrincipalBuilder.KafkaPrincipalSerde

2021-07-22 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino resolved KAFKA-13069.
---
Resolution: Invalid

Flexible fields are sufficient as per KIP-590 VOTE email thread, so a magic 
number will not be needed.

> Add magic number to DefaultKafkaPrincipalBuilder.KafkaPrincipalSerde
> 
>
> Key: KAFKA-13069
> URL: https://issues.apache.org/jira/browse/KAFKA-13069
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0, 2.8.0
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Critical
> Fix For: 3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac merged pull request #11079: MINOR: Small refactoring in admin group handlers

2021-07-22 Thread GitBox


dajac merged pull request #11079:
URL: https://github.com/apache/kafka/pull/11079


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13122) Close KeyValueIterator implemented instance to avoid resource leak

2021-07-22 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13122:
--
Description: 
Found there are "many" KeyValueIterator implemented instances don't explicitly 
get closed, which will cause resource leak.

>From the java doc in KeyValueIterator:

{color:#808080}* Users must call its {{color}{color:#808080}@code 
{color}{color:#808080}close} method explicitly upon completeness to release 
resources{color}

 

This issue mostly happen in tests because we usually query state store to get 
result iterator, and then do verification, but forgot close it. This issue also 
*appear in the example code in our developer guide docs*.

 

I'll use try-with-resource to fix them. To avoid huge PR created, I split this 
bug into 3 sub-tasks.

  was:
Found there are "many" KeyValueIterator implemented instances don't explicitly 
get closed, which will cause resource leak. This issue mostly happen in tests 
because we usually query state store to get result iterator, and then do 
verification, but forgot close it. This issue also *appear in the example code 
in our developer guide docs*.

 

I'll use try-with-resource to fix them. To avoid huge PR created, I split this 
bug into 3 sub-tasks.


> Close KeyValueIterator implemented instance to avoid resource leak
> --
>
> Key: KAFKA-13122
> URL: https://issues.apache.org/jira/browse/KAFKA-13122
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Found there are "many" KeyValueIterator implemented instances don't 
> explicitly get closed, which will cause resource leak.
> From the java doc in KeyValueIterator:
> {color:#808080}* Users must call its {{color}{color:#808080}@code 
> {color}{color:#808080}close} method explicitly upon completeness to release 
> resources{color}
>  
> This issue mostly happen in tests because we usually query state store to get 
> result iterator, and then do verification, but forgot close it. This issue 
> also *appear in the example code in our developer guide docs*.
>  
> I'll use try-with-resource to fix them. To avoid huge PR created, I split 
> this bug into 3 sub-tasks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >