[GitHub] [kafka] tombentley commented on a change in pull request #11006: KAFKA-13049: Name the threads used for log recovery

2021-07-12 Thread GitBox


tombentley commented on a change in pull request #11006:
URL: https://github.com/apache/kafka/pull/11006#discussion_r668471349



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File],
   val logDirAbsolutePath = dir.getAbsolutePath
   var hadCleanShutdown: Boolean = false
   try {
-val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
+val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, 
new ThreadFactory {

Review comment:
   Yes, thanks! I've now factored out a common method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 commented on pull request #11034: KAFKA-13075: Consolidate RocksDBStore and RocksDBKeyValueStoreTest

2021-07-12 Thread GitBox


tang7526 commented on pull request #11034:
URL: https://github.com/apache/kafka/pull/11034#issuecomment-878820896


   @ableegoldman Could you help to review this PR?  thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #11006: KAFKA-13049: Name the threads used for log recovery

2021-07-12 Thread GitBox


tombentley commented on a change in pull request #11006:
URL: https://github.com/apache/kafka/pull/11006#discussion_r668466658



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File],
   val logDirAbsolutePath = dir.getAbsolutePath
   var hadCleanShutdown: Boolean = false
   try {
-val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
+val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, 
new ThreadFactory {
+  private val factory = Executors.defaultThreadFactory()
+  private val threadNumber = new AtomicInteger(1)

Review comment:
   Only as a means of having unique names when >1 thread per log dir. You 
can't really infer anything from the number though, so I could remove it if you 
want.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 opened a new pull request #11034: KAFKA-13075: Consolidate RocksDBStore and RocksDBKeyValueStoreTest

2021-07-12 Thread GitBox


tang7526 opened a new pull request #11034:
URL: https://github.com/apache/kafka/pull/11034


   https://issues.apache.org/jira/browse/KAFKA-13075
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13075) Consolidate RocksDBStore and RocksDBKeyValueStoreTest

2021-07-12 Thread Chun-Hao Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chun-Hao Tang reassigned KAFKA-13075:
-

Assignee: Chun-Hao Tang

> Consolidate RocksDBStore and RocksDBKeyValueStoreTest
> -
>
> Key: KAFKA-13075
> URL: https://issues.apache.org/jira/browse/KAFKA-13075
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Chun-Hao Tang
>Priority: Major
>  Labels: newbie, newbie++
>
> Looks like we have two different test classes covering pretty much the same 
> thing: RocksDBStore. It seems like RocksDBKeyValueStoreTest was the original 
> test class for RocksDBStore, but someone later added RocksDBStoreTest, most 
> likely because they didn't notice the RocksDBKeyValueStoreTest which didn't 
> follow the usual naming scheme for test classes. 
> We should consolidate these two into a single file, ideally retaining the 
> RocksDBStoreTest name since that conforms to the test naming pattern used 
> throughout Streams (and so this same thing doesn't happen again). It should 
> also extend AbstractKeyValueStoreTest like the RocksDBKeyValueStoreTest 
> currently does so we continue to get the benefit of all the tests in there as 
> well



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug

2021-07-12 Thread GitBox


guozhangwang commented on pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#issuecomment-878781675


   Just one more reply on the clearing logic, otherwise LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


guozhangwang commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668428677



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -205,6 +237,9 @@ private boolean allSubscriptionsEqual(Set allTopics,
 // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected members
 // with more than the minQuota partitions, so keep "maxQuota" 
of the owned partitions, and revoke the rest of the partitions
 numMembersAssignedOverMinQuota++;
+if (numMembersAssignedOverMinQuota == 
expectedNumMembersAssignedOverMinQuota) {
+potentiallyUnfilledMembersAtMinQuota.clear();

Review comment:
   Yes that makes sense, still this logic
   
   ```
   if (numMembersAssignedOverMinQuota == 
expectedNumMembersAssignedOverMinQuota) {
   potentiallyUnfilledMembersAtMinQuota.clear();
   }
   ```
   
   Seems only needed because we have the check in 309 (?) Say if we do not 
check that, but instead just check the expected numbers of consumers with 
minQuota and maxQuota is satisfied, then do we still need this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13075) Consolidate RocksDBStore and RocksDBKeyValueStoreTest

2021-07-12 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13075:
--

 Summary: Consolidate RocksDBStore and RocksDBKeyValueStoreTest
 Key: KAFKA-13075
 URL: https://issues.apache.org/jira/browse/KAFKA-13075
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


Looks like we have two different test classes covering pretty much the same 
thing: RocksDBStore. It seems like RocksDBKeyValueStoreTest was the original 
test class for RocksDBStore, but someone later added RocksDBStoreTest, most 
likely because they didn't notice the RocksDBKeyValueStoreTest which didn't 
follow the usual naming scheme for test classes. 

We should consolidate these two into a single file, ideally retaining the 
RocksDBStoreTest name since that conforms to the test naming pattern used 
throughout Streams (and so this same thing doesn't happen again). It should 
also extend AbstractKeyValueStoreTest like the RocksDBKeyValueStoreTest 
currently does so we continue to get the benefit of all the tests in there as 
well



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


guozhangwang commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668427821



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
 // this consumer is potential maxQuota candidate since we're 
still under the number of expected members
 // with more than the minQuota partitions. Note, if the number 
of expected members with more than
-// the minQuota partitions is 0, it means minQuota == 
maxQuota, so they won't be put into unfilledMembers
+// the minQuota partitions is 0, it means minQuota == 
maxQuota, and there are no potentially unfilled
 if (numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota) {
-unfilledMembers.add(consumer);
+potentiallyUnfilledMembersAtMinQuota.add(consumer);

Review comment:
   Ack.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.

2021-07-12 Thread GitBox


satishd commented on pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#issuecomment-878780060


   This change is built on top of https://github.com/apache/kafka/pull/10579. 
The latest commit 
https://github.com/apache/kafka/pull/11033/commits/b7837cdc01148b5e1957bb43ed98c8f3877655c3
 includes the change relevant to this PR. Earlier commits will be merged as 
part of  https://github.com/apache/kafka/pull/10579.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12925) prefixScan missing from intermediate interfaces

2021-07-12 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12925:
---
Priority: Critical  (was: Major)

> prefixScan missing from intermediate interfaces
> ---
>
> Key: KAFKA-12925
> URL: https://issues.apache.org/jira/browse/KAFKA-12925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Michael Viamari
>Assignee: Sagar Rao
>Priority: Critical
> Fix For: 3.0.0, 2.8.1
>
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#discussion_r668417652



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
##
@@ -66,6 +68,20 @@ public MemoryNavigableLRUCache(final String name, final int 
maxCacheSize) {
 .subMap(from, true, to, true).descendingKeySet().iterator(), 
treeMap));
 }
 
+@Override
+public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
   Can you add a test or two for this in `InMemoryLRUCacheStoreTest`? 
Alternatively, I think you should be able to just collect all the tests in 
`InMemoryKeyValueStoreTest` and `CachingInMemoryKeyValueStoreTest` and move 
them over to
   `AbstractKeyValueStoreTest` instead. That way you get test coverage for both 
of those plus a handful of other store classes at once, without having to copy 
the same test over and over across a bunch of different files.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##
@@ -26,13 +26,15 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
-import java.util.List;
 import java.util.NavigableMap;
-import java.util.Set;
 import java.util.TreeMap;
+import java.util.List;

Review comment:
   Can you revert the changes in this file?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##
@@ -383,6 +387,1002 @@ public void 
testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
 assertNull(store.get("key4"));
 }
 
+@Test
+public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
+final String storeName = "prefixScanStore";
+final StoreBuilder> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+.withCachingDisabled()
+.withLoggingDisabled();
+topology
+.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+.addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+.addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+driver = new TopologyTestDriver(topology, props);
+
+final TestInputTopic inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+final TestOutputTopic outputTopic1 =
+driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+inputTopic.pipeInput("key1", "value1");
+inputTopic.pipeInput("key2", "value2");
+inputTopic.pipeInput("key3", "value3");
+inputTopic.pipeInput("key1", "value4");
+assertTrue(outputTopic1.isEmpty());
+
+final KeyValueStore store = 
driver.getKeyValueStore("prefixScanStore");
+final KeyValueIterator prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+final List> results = new ArrayList<>();
+while (prefixScan.hasNext()) {
+final KeyValue next = prefixScan.next();
+results.add(next);
+}
+
+assertEquals("key1", results.get(0).key);
+assertEquals("value4", results.get(0).value);
+assertEquals("key2", results.get(1).key);
+assertEquals("value2", results.get(1).value);
+assertEquals("key3", results.get(2).key);
+assertEquals("value3", results.get(2).value);
+
+}
+
+@Test
+public void testPrefixScanInMemoryStoreWithCachingNoLogging() {
+final String storeName = "prefixScanStore";
+final StoreBuilder> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+.withCachingEnabled()
+.withLoggingDisabled();
+topology
+.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+.addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+.addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+driver = new TopologyTestDriver(topology, props);
+
+final TestInputTopic inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+final TestOutputTopic outputTopic1 =
+driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+inputTopic.pipeInput("key1", "value1");
+inputTopic.pipeInput("key2", "value2");
+inputTopic.pipeInput("key3", "value3");
+inputTopic.pipeInput("key1", "value4");
+

[GitHub] [kafka] satishd opened a new pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.

2021-07-12 Thread GitBox


satishd opened a new pull request #11033:
URL: https://github.com/apache/kafka/pull/11033


- Added asynchronous API support for RemoeLogMetadataManager add/update/put 
methods.
- Implemented the changes on default topic based RemoteLogMetadataManager.
- Refactored the respective tests to cover the introduced asynchronous 
APIs. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

2021-07-12 Thread GitBox


ableegoldman commented on pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#issuecomment-878765736


   @vamossagar12 looks like there's a checkstyle failure that's preventing the 
tests from running, can you fix that as well? Giving this a pass now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug

2021-07-12 Thread GitBox


ableegoldman commented on pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#issuecomment-878760857


   Responded to your comments @guozhangwang , let me know if that all makes 
sense or if you have any more concerns that need to be addressed in this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668410517



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -238,32 +272,50 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 Iterator unfilledConsumerIter = unfilledMembers.iterator();
 // Round-Robin filling remaining members up to the expected numbers of 
maxQuota, otherwise, to minQuota
 for (TopicPartition unassignedPartition : unassignedPartitions) {
-if (!unfilledConsumerIter.hasNext()) {
-if (unfilledMembers.isEmpty()) {
-// Should not enter here since we have calculated the 
exact number to assign to each consumer
-// There might be issues in the assigning algorithm, or 
maybe assigning the same partition to two owners.
+String consumer;
+if (unfilledConsumerIter.hasNext()) {
+consumer = unfilledConsumerIter.next();
+} else {
+if (unfilledMembers.isEmpty() && 
potentiallyUnfilledMembersAtMinQuota.isEmpty()) {
+// Should not enter here since we have calculated the 
exact number to assign to each consumer.
+// This indicates issues in the assignment algorithm
 int currentPartitionIndex = 
unassignedPartitions.indexOf(unassignedPartition);
 log.error("No more unfilled consumers to be assigned. The 
remaining unassigned partitions are: {}",
-unassignedPartitions.subList(currentPartitionIndex, 
unassignedPartitions.size()));
+  
unassignedPartitions.subList(currentPartitionIndex, 
unassignedPartitions.size()));
 throw new IllegalStateException("No more unfilled 
consumers to be assigned.");
+} else if (unfilledMembers.isEmpty()) {
+consumer = potentiallyUnfilledMembersAtMinQuota.poll();
+} else {
+unfilledConsumerIter = unfilledMembers.iterator();
+consumer = unfilledConsumerIter.next();
 }
-unfilledConsumerIter = unfilledMembers.iterator();
 }
-String consumer = unfilledConsumerIter.next();
+
 List consumerAssignment = assignment.get(consumer);
 consumerAssignment.add(unassignedPartition);
 
 // We already assigned all possible ownedPartitions, so we know 
this must be newly assigned to this consumer
-if (allRevokedPartitions.contains(unassignedPartition))
+// or else the partition was actually claimed by multiple previous 
owners and had to be invalidated from all
+// members claimed ownedPartitions
+if (allRevokedPartitions.contains(unassignedPartition) || 
partitionsWithMultiplePreviousOwners.contains(unassignedPartition))
 partitionsTransferringOwnership.put(unassignedPartition, 
consumer);
 
 int currentAssignedCount = consumerAssignment.size();
-int expectedAssignedCount = numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota;
-if (currentAssignedCount == expectedAssignedCount) {
-if (currentAssignedCount == maxQuota) {
-numMembersAssignedOverMinQuota++;
-}
+if (currentAssignedCount == minQuota) {
 unfilledConsumerIter.remove();
+potentiallyUnfilledMembersAtMinQuota.add(consumer);
+} else if (currentAssignedCount == maxQuota) {
+numMembersAssignedOverMinQuota++;
+if (numMembersAssignedOverMinQuota == 
expectedNumMembersAssignedOverMinQuota) {
+// We only start to iterate over the "potentially 
unfilled" members at minQuota after we've filled
+// all members up to at least minQuota, so once the last 
minQuota member reaches maxQuota, we
+// should be done. But in case of some algorithmic error, 
just log a warning and continue to
+// assign any remaining partitions within the assignment 
constraints
+if (unassignedPartitions.indexOf(unassignedPartition) != 
unassignedPartitions.size() - 1) {
+log.warn("Filled the last member up to maxQuota but 
still had partitions remaining to assign, "

Review comment:
   I responded to the above comment as well, but specifically here I think 
that to just check on that condition requires us to make assumptions about the 
algorithm's correctness up to this point (and the correctness of its 
assumptions). But if those are all correct then we would never reach this to 
begin with, so it's better to directly look for any remaining 
`unassignedPartitions` -- it's a san

[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668409463



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -205,6 +237,9 @@ private boolean allSubscriptionsEqual(Set allTopics,
 // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected members
 // with more than the minQuota partitions, so keep "maxQuota" 
of the owned partitions, and revoke the rest of the partitions
 numMembersAssignedOverMinQuota++;
+if (numMembersAssignedOverMinQuota == 
expectedNumMembersAssignedOverMinQuota) {
+potentiallyUnfilledMembersAtMinQuota.clear();

Review comment:
   While I'm not really a fan of the `potentiallyUnfilledMembersAtMinQuota` 
logic (it's definitely awkward but I felt it was still the lesser evil in terms 
of complicating the code), I don't think we can get rid of it that easily. The 
problem is that when `minQuota != maxQuota`, and so far 
`currentNumMembersWithOverMinQuotaPartitions` < 
`expectedNumMembersWithOverMinQuotaPartitions`, then consumers that are filled 
up to exactly `minQuota` have to be considered potentially not yet at capacity 
since some will need one more partition, though not all. So this data structure 
is not just used to verify that everything is properly assigned after we've 
exhausted the `unassignedPartitions`, it's used to track which consumers can 
still receive another partition (ie, are "unfilled"). Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668406999



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
 // this consumer is potential maxQuota candidate since we're 
still under the number of expected members
 // with more than the minQuota partitions. Note, if the number 
of expected members with more than
-// the minQuota partitions is 0, it means minQuota == 
maxQuota, so they won't be put into unfilledMembers
+// the minQuota partitions is 0, it means minQuota == 
maxQuota, and there are no potentially unfilled
 if (numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota) {
-unfilledMembers.add(consumer);
+potentiallyUnfilledMembersAtMinQuota.add(consumer);

Review comment:
   For the 3 & 4th suggested renamings, it's a bit subtle but this would 
actually be incorrect. In the case `minQuota == maxQuota`, the 
`expectedNumMembersAssignedOverMinQuota` variable will evaluate to 0, which 
would not make sense if it was called `expectedNumMembersWithMaxQuota`. Of 
course we could go through a tweak the logic for this case, but I'd prefer not 
to mix that into this PR. For now I'll just clarify in the comments for these 
variables.
   (I did still rename them slightly to hopefully be more clear, and also in 
line with the new names of the other two data structures we renamed)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668406999



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
 // this consumer is potential maxQuota candidate since we're 
still under the number of expected members
 // with more than the minQuota partitions. Note, if the number 
of expected members with more than
-// the minQuota partitions is 0, it means minQuota == 
maxQuota, so they won't be put into unfilledMembers
+// the minQuota partitions is 0, it means minQuota == 
maxQuota, and there are no potentially unfilled
 if (numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota) {
-unfilledMembers.add(consumer);
+potentiallyUnfilledMembersAtMinQuota.add(consumer);

Review comment:
   For the 3 & 4th suggested renamings, it's a bit subtle but this would 
actually be incorrect. In the case `minQuota == maxQuota`, the 
`expectedNumMembersAssignedOverMinQuota` variable will evaluate to 0, which 
would not make sense if it was called `expectedNumMembersWithMaxQuota`. Of 
course we could go through a tweak the logic for this case, but I'd prefer not 
to mix that into this PR. For now I'll just clarify in the comments for these 
variables




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13074) Implement mayClean for MockLog

2021-07-12 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13074:
--

 Summary: Implement mayClean for MockLog
 Key: KAFKA-13074
 URL: https://issues.apache.org/jira/browse/KAFKA-13074
 Project: Kafka
  Issue Type: Bug
Reporter: Jose Armando Garcia Sancio


The current implement of MockLog doesn't implement maybeClean. It is expected 
that MockLog has the same semantic as KafkaMetadataLog. This is assumed to be 
true for a few of the tests suite like the raft simulation and the kafka raft 
client test context.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13074) Implement mayClean for MockLog

2021-07-12 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-13074:
---
Labels: kip-500  (was: )

> Implement mayClean for MockLog
> --
>
> Key: KAFKA-13074
> URL: https://issues.apache.org/jira/browse/KAFKA-13074
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: kip-500
>
> The current implement of MockLog doesn't implement maybeClean. It is expected 
> that MockLog has the same semantic as KafkaMetadataLog. This is assumed to be 
> true for a few of the tests suite like the raft simulation and the kafka raft 
> client test context.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668406080



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
 // this consumer is potential maxQuota candidate since we're 
still under the number of expected members
 // with more than the minQuota partitions. Note, if the number 
of expected members with more than
-// the minQuota partitions is 0, it means minQuota == 
maxQuota, so they won't be put into unfilledMembers
+// the minQuota partitions is 0, it means minQuota == 
maxQuota, and there are no potentially unfilled
 if (numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota) {
-unfilledMembers.add(consumer);
+potentiallyUnfilledMembersAtMinQuota.add(consumer);

Review comment:
   Ack on the first two renamings, though I'd still want to prefix them 
with `unfilled` to emphasize that these structures only hold members that may 
potentially be assigned one or more partitions. ie, if `minQuota == maxQuota`, 
then `potentiallyUnfilledMembersAtMinQuota` should actually be empty, in which 
case `MembersWithExactMinQuotaPartitions` doesn't quite make sense. I'll 
clarify this in the comments as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13073) Simulation test fails due to inconsistency in MockLog's implementation

2021-07-12 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-13073:
---
Labels: kip-500  (was: )

> Simulation test fails due to inconsistency in MockLog's implementation
> --
>
> Key: KAFKA-13073
> URL: https://issues.apache.org/jira/browse/KAFKA-13073
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, replication
>Affects Versions: 3.0.0
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> We are getting the following error on trunk
> {code:java}
> RaftEventSimulationTest > canRecoverAfterAllNodesKilled STANDARD_OUT
> timestamp = 2021-07-12T16:26:55.663, 
> RaftEventSimulationTest:canRecoverAfterAllNodesKilled =
>   java.lang.RuntimeException:
> Uncaught exception during poll of node 1  
> |---jqwik---
> tries = 25| # of calls to property
> checks = 25   | # of not rejected calls
> generation = RANDOMIZED   | parameters are randomly generated
> after-failure = PREVIOUS_SEED | use the previous seed
> when-fixed-seed = ALLOW   | fixing the random seed is allowed
> edge-cases#mode = MIXIN   | edge cases are mixed in
> edge-cases#total = 108| # of all combined edge cases
> edge-cases#tried = 4  | # of edge cases tried in current run
> seed = 8079861963960994566| random seed to reproduce generated values 
>Sample
> --
>   arg0: 4002
>   arg1: 2
>   arg2: 4{code}
> I think there are a couple of issues here:
>  # The {{ListenerContext}} for {{KafkaRaftClient}} uses the value returned by 
> {{ReplicatedLog::startOffset()}} to determined the log start and when to load 
> a snapshot while the {{MockLog}} implementation uses {{logStartOffset}} which 
> could be a different value.
>  # {{MockLog}} doesn't implement {{ReplicatedLog::maybeClean}} so the log 
> start offset is always 0.
>  # The snapshot id validation for {{MockLog}} and {{KafkaMetadataLog}}'s 
> {{createNewSnapshot}} throws an exception when the snapshot id is less than 
> the log start offset.
> Solutions:
> Fix the error quoted above we only need to fix bullet point 3. but I think we 
> should fix all of the issues enumerated in this Jira.
> For 1. we should change the {{MockLog}} implementation so that it uses 
> {{startOffset}} both externally and internally.
> For 2. I will file another issue to track this implementation.
> For 3. I think this validation is too strict. I think it is safe to simply 
> ignore any attempt by the state machine to create an snapshot with an id less 
> that the log start offset. We should return a {{Optional.empty()}}when the 
> snapshot id is less than the log start offset. This tells the user that it 
> doesn't need to generate a snapshot for that offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio opened a new pull request #11032: KAFKA-13073: Inconsistent MockLog implementation

2021-07-12 Thread GitBox


jsancio opened a new pull request #11032:
URL: https://github.com/apache/kafka/pull/11032


   Fix a simulation test failure by:
   
   1. Relaxing the valiation of the snapshot id against the log start
   offset when the state machine attempts to create new snapshot. It
   is safe to just ignore the request instead of throwing an exception
   when the snapshot id is less that the log start offset.
   
   2. Fixing the MockLog implementation so that it uses startOffset both
   externally and internally.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668402587



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -130,19 +146,26 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 for (final TopicPartition tp : memberData.partitions) {
 // filter out any topics that no longer exist or aren't 
part of the current subscription
 if (allTopics.contains(tp.topic())) {
-ownedPartitions.add(tp);
+
+if (!allPreviousPartitionsToOwner.containsKey(tp)) {
+allPreviousPartitionsToOwner.put(tp, consumer);
+ownedPartitions.add(tp);
+} else {
+String otherConsumer = 
allPreviousPartitionsToOwner.get(tp);
+log.warn("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "

Review comment:
   Good point, yes I would absolutely want/hope a user would report this as 
a bug. Changed to ERROR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668402323



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -121,7 +129,12 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 
 // If the current member's generation is higher, all the 
previously owned partitions are invalid
 if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
-
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+allPreviousPartitionsToOwner.clear();

Review comment:
   In that case, it's never added to `consumerToOwnedPartitions` in the 
first place. This map is not pre-filled, it gets populated inside this loop. So 
if its `< maxGeneration`, then we just insert an empty list into the map for 
that member's owned partitions




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13073) Simulation test fails due to inconsistency in MockLog's implementation

2021-07-12 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13073:
--

 Summary: Simulation test fails due to inconsistency in MockLog's 
implementation
 Key: KAFKA-13073
 URL: https://issues.apache.org/jira/browse/KAFKA-13073
 Project: Kafka
  Issue Type: Bug
  Components: controller, replication
Affects Versions: 3.0.0
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio
 Fix For: 3.0.0


We are getting the following error on trunk
{code:java}
RaftEventSimulationTest > canRecoverAfterAllNodesKilled STANDARD_OUT
timestamp = 2021-07-12T16:26:55.663, 
RaftEventSimulationTest:canRecoverAfterAllNodesKilled =
  java.lang.RuntimeException:
Uncaught exception during poll of node 1
  |---jqwik---
tries = 25| # of calls to property
checks = 25   | # of not rejected calls
generation = RANDOMIZED   | parameters are randomly generated
after-failure = PREVIOUS_SEED | use the previous seed
when-fixed-seed = ALLOW   | fixing the random seed is allowed
edge-cases#mode = MIXIN   | edge cases are mixed in
edge-cases#total = 108| # of all combined edge cases
edge-cases#tried = 4  | # of edge cases tried in current run
seed = 8079861963960994566| random seed to reproduce generated values   
 Sample
--
  arg0: 4002
  arg1: 2
  arg2: 4{code}
I think there are a couple of issues here:
 # The {{ListenerContext}} for {{KafkaRaftClient}} uses the value returned by 
{{ReplicatedLog::startOffset()}} to determined the log start and when to load a 
snapshot while the {{MockLog}} implementation uses {{logStartOffset}} which 
could be a different value.
 # {{MockLog}} doesn't implement {{ReplicatedLog::maybeClean}} so the log start 
offset is always 0.
 # The snapshot id validation for {{MockLog}} and {{KafkaMetadataLog}}'s 
{{createNewSnapshot}} throws an exception when the snapshot id is less than the 
log start offset.

Solutions:

Fix the error quoted above we only need to fix bullet point 3. but I think we 
should fix all of the issues enumerated in this Jira.

For 1. we should change the {{MockLog}} implementation so that it uses 
{{startOffset}} both externally and internally.

For 2. I will file another issue to track this implementation.

For 3. I think this validation is too strict. I think it is safe to simply 
ignore any attempt by the state machine to create an snapshot with an id less 
that the log start offset. We should return a {{Optional.empty()}}when the 
snapshot id is less than the log start offset. This tells the user that it 
doesn't need to generate a snapshot for that offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10986: KAFKA-12983: reset needsJoinPrepare flag before rejoining the group

2021-07-12 Thread GitBox


ableegoldman commented on pull request #10986:
URL: https://github.com/apache/kafka/pull/10986#issuecomment-878749974


   Now ready for review @dajac @hachikuji @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #11006: KAFKA-13049: Name the threads used for log recovery

2021-07-12 Thread GitBox


chia7712 commented on a change in pull request #11006:
URL: https://github.com/apache/kafka/pull/11006#discussion_r668398858



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File],
   val logDirAbsolutePath = dir.getAbsolutePath
   var hadCleanShutdown: Boolean = false
   try {
-val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
+val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, 
new ThreadFactory {
+  private val factory = Executors.defaultThreadFactory()
+  private val threadNumber = new AtomicInteger(1)

Review comment:
   Why we need this counter? It seems to me `log-recovery` is good enough.

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File],
   val logDirAbsolutePath = dir.getAbsolutePath
   var hadCleanShutdown: Boolean = false
   try {
-val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
+val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, 
new ThreadFactory {

Review comment:
   line#495 (`shutdown`) creates thread poll also. Does it need a better 
naming?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10986: KAFKA-12983: reset needsJoinPrepare flag before rejoining the group

2021-07-12 Thread GitBox


ableegoldman commented on pull request #10986:
URL: https://github.com/apache/kafka/pull/10986#issuecomment-878748345


   Ok I realize we actually do have a test that reproduces this already: 
`ConsumerCoordinatorTest.testRebalanceWithMetadataChange`. This test sets up a 
case where a change in topic metadata triggers a rebalance after a member had 
joined the group, after which the change is reverted so that the metadata is 
ultimately the same. Then a `NOT_COORDINATOR` response is sent to fail the 
initial JoinGroup, and the test just verifies that the member attempts to 
rejoin until successful. It also verifies things like the number of times each 
rebalance callback is invoked, and the set of partitions that the callbacks 
receive.
   This test actually only failed in the COOPERATIVE case, which confirms that 
the behavior remains correct for the EAGER case. When following the COOPERATIVE 
protocol, the test was formerly assuming that the member would retain all 
partitions despite actually having its generation and memberId cleared when the 
initial JoinGroup is failed. So it was technically asserting the wrong behavior 
beforehand; just fixing this gives us a unit test for this patch after all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2021-07-12 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379536#comment-17379536
 ] 

Luke Chen commented on KAFKA-12495:
---

[~kkonstantine], could you take a look the PR? I suddenly found this bug is 
blocking a V3.0 blocker flaky test. Please help! Thank you.

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> but we didn't revoke any more C/T in this round, which cause unbalanced 
> distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> Because we didn't allow to do consecutive revoke in two consecutive 
> rebalances (under the same leader), we will have this uneven distribution 
> under this situation. We should allow consecutive rebalance to have another 
> round of revocation to revoke the C/T to the other members in this case.
> expected:
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> **and also revoke some C/T** 
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W4(delay: 0, assigned: [BT4, BT5], revoked: [])
> // another round of rebalance to assign the new revoked C/T to the other 
> members
> W1 rejoins with assignment: [AC0, AT1, AT2] 
> Rebalance is triggered 
> W2 joins with assignment: [AT4, AT5

[GitHub] [kafka] showuon commented on pull request #10367: KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector

2021-07-12 Thread GitBox


showuon commented on pull request #10367:
URL: https://github.com/apache/kafka/pull/10367#issuecomment-878731886


   @kkonstantine , I suddenly found this is a V3.0 blocker bug. Could you help 
take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12629) Failing Test: RaftClusterTest

2021-07-12 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379529#comment-17379529
 ] 

Luke Chen commented on KAFKA-12629:
---

After  KAFKA-12677 merged into trunk in build # 310. There's no RaftClusterTest 
releated tests failed in build #310, #311. Mark this ticket as resolved. Thanks.

> Failing Test: RaftClusterTest
> -
>
> Key: KAFKA-12629
> URL: https://issues.apache.org/jira/browse/KAFKA-12629
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> {quote} {{java.util.concurrent.ExecutionException: 
> java.lang.ClassNotFoundException: 
> org.apache.kafka.controller.NoOpSnapshotWriterBuilder
>   at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
>   at 
> kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364)
>   at 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13072) refactor RemoveMembersFromConsumerGroupHandler

2021-07-12 Thread Luke Chen (Jira)
Luke Chen created KAFKA-13072:
-

 Summary: refactor RemoveMembersFromConsumerGroupHandler
 Key: KAFKA-13072
 URL: https://issues.apache.org/jira/browse/KAFKA-13072
 Project: Kafka
  Issue Type: Sub-task
Reporter: Luke Chen
Assignee: Luke Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9648) kafka server should resize backlog when create serversocket

2021-07-12 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379515#comment-17379515
 ] 

Haruki Okada edited comment on KAFKA-9648 at 7/13/21, 2:05 AM:
---

Hi.

We operate a Kafka cluster in our company which has 130+ brokers, 1.3M+ total 
socket server connections and 25K+ partitions.
 We faced very similar issue to 
https://issues.apache.org/jira/browse/KAFKA-9211 (i.e. producer slowing down 
mysteriously with small TCP packets) recently, and we figured out the cause.
 We hope our knowledge could help.
h2. Environment:
 * Kafka broker version: 2.4.1 (but we suppose the version doesn't matter)
 * Kafka broker OS: CentOS7 (kernel 3.10.X)

h2. Phenomenon:
 * Restart a broker process, and execute preferred leader election after the 
broker became in-sync
 * Some producers's node-request-latency to the broker got insanely higher than 
usual
 ** However no such high produce response latency is observed on broker-side 
metrics
 * As the result, producer batches couldn't be sent out in sufficient pace, 
then caused batch expiration

h2. Analysis:
 * We observed TCP SYN cookies metric was increased at incidental time, with 
following dmesg message:
 ** 
{code:java}
TCP: request_sock_TCP: Possible SYN flooding on port 22991. Sending cookies.  
Check SNMP counters.{code}

 * So we also suspected the phenomenon is due to `wscale` drop (as like 
described in this issue), but we doubt it because:
 ** Even with TCP SYN cookies, `wscale` should be available as long as TCP 
timestamp is enabled. (refs: 
[https://blog.cloudflare.com/syn-packet-handling-in-the-wild/])
 ** In our environment, TCP timestamp is enabled.
 ** Also, generally, `wscale` (window scaling factor) is used for extending 
window beyond 65535 (max window size in TCP spec) on large-network round trip 
environment such as internet
 *** Our typical produce request size is smaller than that
 *** So it's hard to imagine that `wscale` drop causes such significant 
request-latency degradation
 * After several attempts to reproduce, we found out that receiver (i.e. broker 
in this context)'s `wscale` is inconsistent between producer and broker at 
incidental time.
 ** receiver's `wscale` advertised from broker -> producer along with SYN+ACK 
packet: 7
 *** 
{code:java}
17:32:05.161213 IP BROKER.HOST. > CLIENT.: Flags [S.], seq 29292755, 
ack 17469019, win 28960, options [mss 1460,sackOK,TS val 25589601 ecr 
9055245,nop,wscale 1], length 0{code}

 * 
 ** 
 *** (seq numbers are substituted with random value)
 ** actual receiver's `wscale` after established: 1
 *** 
{code:java}
[user@BROKER ~]$ /sbin/ss -e -i -t | less
...
ESTAB  0  0  BROKER.                CLIENT.                 
timer:(keepalive,21min,0) uid:503 ino:15143860 sk:9ba25dc4f440 <-> 
ts sack cubic wscale:7,7 rto:201 rtt:0.179/0.006 ato:140 mss:1448 rcvmss:1448 
advmss:1448 cwnd:10 bytes_acked:97054 bytes_received:18950537 segs_out:15094 
segs_in:13941 send 647.2Mbps lastsnd:217 lastrcv:17 lastack:217 pacing_rate 
1288.0Mbps rcv_rtt:1.875 rcv_space:29200{code}

 * 
 ** 
 *** `wscale:7,7` means that broker's receiver window scaling factor is 7
 * Okay, then this inconsistency could explain the phenomenon as below:
 ** Premise: When `wscale` is enabled, TCP window size is calculated by 
`window_size * 2^wscale`
 ** When broker calculates advertised window size, it's bit-shifted to the 
right by `wscale` (== 7)
 *** [https://github.com/torvalds/linux/blob/v3.10/net/ipv4/tcp_output.c#L290]
 ** On the other hand, producer multiplies advertised window size by 1, which 
is conveyed through SYN+ACK
 ** As the result, window size became 64 times smaller than expected
 ** Then producer splits TCP packets to much smaller size than usual (possibly 
under MSS)
 *** TCP acks are delayed due to the conditions are not met 
([https://github.com/torvalds/linux/blob/v3.10/net/ipv4/tcp_input.c#L4760])
 * Last remaining question is "why such wscale inconsistency happened?"
 ** Read through the kernel source code, then we found that there's an issue in 
the logic to calculate wscale if TCP connection is established through 
syncookies
 *** It's fixed in this commit: 
[https://github.com/torvalds/linux/commit/909172a149749242990a6e64cb55d55460d4e417]

h2. Solution:
 * 1. Upgrade linux kernel to at least v5.10 (which the patch is committed)
 * 2. Disable SYN cookies
 ** i.e. setting `net.ipv4.tcp_syncookies` kernel parameter to 0
 ** With SYN cookies disabled, some SYN's are dropped on same situations but 
typically it's not be a serious problem thanks to SYN retries. Clients should 
reconnect soon.
 * 3. Adjust backlog size (as this ticket suggests)
 ** Even disabling SYN cookies work thanks to SYN retries, it will cause 
certain delay in TCP establishment if reconnect happened.
 ** So generally it's preferred to adjust backlog size as w

[jira] [Commented] (KAFKA-9648) kafka server should resize backlog when create serversocket

2021-07-12 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379515#comment-17379515
 ] 

Haruki Okada commented on KAFKA-9648:
-

Hi.


 We operate a Kafka cluster in our company which has 130+ brokers, 1.3M+ total 
socket server connections and 25K+ partitions.
 We faced very similar issue to 
https://issues.apache.org/jira/browse/KAFKA-9211 (i.e. producer slowing down 
mysteriously with small TCP packets) recently, and we figured out the cause.
 We hope our knowledge could help.
h2. Environment:
 * Kafka broker version: 2.4.1 (but we suppose the version doesn't matter)
 * Kafka broker OS: CentOS7 (kernel 3.10.X)

h2. Phenomenon:
 * Restart a broker process, and execute preferred leader election after the 
broker became in-sync
 * Some producers's node-request-latency to the broker got insanely higher than 
usual
 ** However no such high produce response latency is observed on broker-side 
metrics
 * As the result, producer batches couldn't be sent out in sufficient pace, 
then caused batch expiration

h2. Analysis:
 * We observed TCP SYN cookies metric was increased at incidental time, with 
following dmesg message:
 ** 
{code:java}
TCP: request_sock_TCP: Possible SYN flooding on port 22991. Sending cookies.  
Check SNMP counters.{code}

 * So we also suspected the phenomenon is due to `wscale` drop (as like 
described in this issue), but we doubt it because:
 ** Even with TCP SYN cookies, `wscale` should be available as long as TCP 
timestamp is enabled. (refs: 
[https://blog.cloudflare.com/syn-packet-handling-in-the-wild/])
 ** In our environment, TCP timestamp is enabled.
 ** Also, generally, `wscale` (window scaling factor) is used for extending 
window beyond 65535 (max window size in TCP spec) on large-network round trip 
environment such as internet
 *** Our typical produce request size is smaller than that
 *** So it's hard to imagine that `wscale` drop causes such significant 
request-latency degradation
 * After several attempts to reproduce, we found out that receiver (i.e. broker 
in this context)'s `wscale` is inconsistent between producer and broker at 
incidental time.
 ** receiver's `wscale` advertised from broker -> producer along with SYN+ACK 
packet: 7
 *** 
{code:java}
17:32:05.161213 IP BROKER.HOST. > CLIENT.: Flags [S.], seq 29292755, 
ack 17469019, win 28960, options [mss 1460,sackOK,TS val 25589601 ecr 
9055245,nop,wscale 1], length 0{code}

 *** (seq numbers are substituted with random value)
 ** actual receiver's `wscale` after established: 1
 *** 
{code:java}
[user@BROKER ~]$ /sbin/ss -e -i -t | less
...
ESTAB  0  0  BROKER.                CLIENT.                 
timer:(keepalive,21min,0) uid:503 ino:15143860 sk:9ba25dc4f440 <-> 
ts sack cubic wscale:7,7 rto:201 rtt:0.179/0.006 ato:140 mss:1448 rcvmss:1448 
advmss:1448 cwnd:10 bytes_acked:97054 bytes_received:18950537 segs_out:15094 
segs_in:13941 send 647.2Mbps lastsnd:217 lastrcv:17 lastack:217 pacing_rate 
1288.0Mbps rcv_rtt:1.875 rcv_space:29200{code}

 *** `wscale:7,7` means that broker's receiver window scaling factor is 7
 * Okay, then this inconsistency could explain the phenomenon as below:
 ** Premise: When `wscale` is enabled, TCP window size is calculated by 
`window_size * 2^wscale`
 ** When broker calculates advertised window size, it's bit-shifted to the 
right by `wscale` (== 7)
 *** [https://github.com/torvalds/linux/blob/v3.10/net/ipv4/tcp_output.c#L290]
 ** On the other hand, producer multiplies advertised window size by 1, which 
is conveyed through SYN+ACK
 ** As the result, window size became 64 times smaller than expected
 ** Then producer splits TCP packets to much smaller size than usual (possibly 
under MSS)
 *** TCP acks are delayed due to the conditions are not met 
([https://github.com/torvalds/linux/blob/v3.10/net/ipv4/tcp_input.c#L4760])
 * Last remaining question is "why such wscale inconsistency happened?"
 ** Read through the kernel source code, then we found that there's an issue in 
the logic to calculate wscale if TCP connection is established through 
syncookies
 *** It's fixed in this commit: 
[https://github.com/torvalds/linux/commit/909172a149749242990a6e64cb55d55460d4e417]

h2. Solution:
 * 1. Upgrade linux kernel to at least v5.10 (which the patch is committed)
 * 2. Disable SYN cookies
 ** i.e. setting `net.ipv4.tcp_syncookies` kernel parameter to 0
 ** With SYN cookies disabled, some SYN's are dropped on same situations but 
typically it's not be a serious problem thanks to SYN retries. Clients should 
reconnect soon.
 * 3. Adjust backlog size (as this ticket suggests)
 ** Even disabling SYN cookies work thanks to SYN retries, it will cause 
certain delay in TCP establishment if reconnect happened.
 ** So generally it's preferable to adjust backlog size as well, to avoid SYN 
drops in the first place.

 

I think we should

[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag

2021-07-12 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379505#comment-17379505
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13008:


Thanks Guozhang, +1 on that approach (though I'll let Colin confirm whether #1 
does make sense or not). We'll definitely need a Streams/client side fix if the 
'real' fix is going to be on the broker side. My only question is whether this 
is something that might trip up other plain consumer client users in addition 
to Streams, and if so, whether there's anything we could do in the consumer 
client itself. But AFAIK it's only Streams that really relies on this metadata 
in this critical way, so I'm happy with the Streams-side fix as well

> Stream will stop processing data for a long time while waiting for the 
> partition lag
> 
>
> Key: KAFKA-13008
> URL: https://issues.apache.org/jira/browse/KAFKA-13008
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: image-2021-07-07-11-19-55-630.png
>
>
> In KIP-695, we improved the task idling mechanism by checking partition lag. 
> It's a good improvement for timestamp sync. But I found it will cause the 
> stream stop processing the data for a long time while waiting for the 
> partition metadata.
>  
> I've been investigating this case for a while, and figuring out the issue 
> will happen in below situation (or similar situation):
>  # start 2 streams (each with 1 thread) to consume from a topicA (with 3 
> partitions: A-0, A-1, A-2)
>  # After 2 streams started, the partitions assignment are: (I skipped some 
> other processing related partitions for simplicity)
>  stream1-thread1: A-0, A-1 
>  stream2-thread1: A-2
>  # start processing some data, assume now, the position and high watermark is:
>  A-0: offset: 2, highWM: 2
>  A-1: offset: 2, highWM: 2
>  A-2: offset: 2, highWM: 2
>  # Now, stream3 joined, so trigger rebalance with this assignment:
>  stream1-thread1: A-0 
>  stream2-thread1: A-2
>  stream3-thread1: A-1
>  # Suddenly, stream3 left, so now, rebalance again, with the step 2 
> assignment:
>  stream1-thread1: A-0, *A-1* 
>  stream2-thread1: A-2
>  (note: after initialization, the  position of A-1 will be: position: null, 
> highWM: null)
>  # Now, note that, the partition A-1 used to get assigned to stream1-thread1, 
> and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 
> record per 30 mins), and partition A-0 has fast input (ex: 10K records / 
> sec). So, now, the stream1-thread1 won't process any data until we got input 
> from partition A-1 (even if partition A-0 is buffered a lot, and we have 
> `{{max.task.idle.ms}}` set to 0).
>  
> The reason why the stream1-thread1 won't process any data is because we can't 
> get the lag of partition A-1. And why we can't get the lag? It's because
>  # In KIP-695, we use consumer's cache to get the partition lag, to avoid 
> remote call
>  # The lag for a partition will be cleared if the assignment in this round 
> doesn't have this partition. check 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272].
>  So, in the above example, the metadata cache for partition A-1 will be 
> cleared in step 4, and re-initialized (to null) in step 5
>  # In KIP-227, we introduced a fetch session to have incremental fetch 
> request/response. That is, if the session existed, the client(consumer) will 
> get the update only when the fetched partition have update (ex: new data). 
> So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 
> mins), it won't have update until next 30 mins, or wait for the fetch session 
> become inactive for (default) 2 mins to be evicted. Either case, the metadata 
> won't be updated for a while.
>  
> In KIP-695, if we don't get the partition lag, we can't determine the 
> partition data status to do timestamp sync, so we'll keep waiting and not 
> processing any data. That's why this issue will happen.
>  
> *Proposed solution:*
>  # If we don't get the current lag for a partition, or the current lag > 0, 
> we start to wait for max.task.idle.ms, and reset the deadline when we get the 
> partition lag, like what we did in previous KIP-353
>  # Introduce a waiting time config when no partition lag, or partition lag 
> keeps > 0 (need KIP)
> [~vvcephei] [~guozhang] , any suggestions?
>  
> cc [~ableegoldman]  [~mjsax] , this is the root cause that in 
> [https://github.com/apache/kafka/pull/10736,] we discussed and thought 
> there's a data lose situation. FYI.



--
This message was sent by Atlassian Jira
(v8.3.4#803

[GitHub] [kafka] ableegoldman commented on pull request #10986: KAFKA-12983: reset needsJoinPrepare flag before rejoining the group

2021-07-12 Thread GitBox


ableegoldman commented on pull request #10986:
URL: https://github.com/apache/kafka/pull/10986#issuecomment-878706682


   @hachikuji in the EAGER case, after the first `onJoinPrepare` / 
`onPartitionsRevoked`, the subscription would have been cleared. So any 
subsequent invocations of `onPartitionsRevoked` would be with an empty set of 
partitions
   
   @everyone, I was having trouble getting a unit test that would actually 
verify this behavior but I wanted to kick off discussion on the fix ASAP (for 
obvious reasons) so I opened the PR without one. I do intended to add a test, I 
just haven't had time to pursue that yet. Suggestions welcome :P


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10811: KAFKA-12598: remove zookeeper support on configCommand except security config

2021-07-12 Thread GitBox


showuon commented on pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#issuecomment-878703335


   @rondagostino , thanks for the comments. Actually, I've already added 2 test 
cases for that: 
   `shouldNotAllowDescribeBrokerWhileBrokerUpUsingZookeeper` -> to test 
`describe` is not allowed while brokers are up
   `shouldSupportDescribeBrokerBeforeBrokerUpUsingZookeeper` -> to test 
`describe` should be supported before brokers are up
   
   Please let me know if there's any other suggestion. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13071) Deprecate and remove --authorizer option in kafka-acls.sh

2021-07-12 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13071:
---

 Summary: Deprecate and remove --authorizer option in kafka-acls.sh
 Key: KAFKA-13071
 URL: https://issues.apache.org/jira/browse/KAFKA-13071
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


Now that we have all of the ACL APIs implemented through the admin client, we 
should consider deprecating and removing support for the --authorizer flag in 
kafka-acls.sh.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-12 Thread GitBox


mjsax edited a comment on pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#issuecomment-878690334


   > Otherwise, Streams app that run with 2.8 and before might not be 
compatible with Streams 3.0 because the retention time of the changelog topics 
created with older Streams apps will be smaller than the assumed retention time 
for Streams apps in 3.0.
   
   Why would this be fatal? It seems in 3.0 the retention time would 
potentially become _larger_ (if a new changlog topic is created). And for any 
existing application, we won't reconfigure the topic config. While I agree that 
we should keep the behavior in-sync, I don't see how there could be data loss? 
-- Note that if the old behavior has a shorter retention time, it would be a 
"bug" if the state store keeps stuff longer now (even if retention-time is a 
_lower_ bound an keeping stuff longer is still correct; retention it's not a 
strict bound), but it won't be a regression with regard to data loss, as we 
never guaranteed to keep the data longer than in the old behavior case. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-12 Thread GitBox


mjsax edited a comment on pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#issuecomment-878687476


   Correct, sliding-windows were added later and never had a default grace. 
Also, we deprecated `until() / maintainMs()` before we added sliding-windows 
and because they do not inherit from `Windows` is never added `until() / 
maintainMs()` -- thus, I don't think we need to update them. Note that there is 
an interplay between grace period and retention time in the old API.
   
   However, looking into the code I am a little bit confused about 
`maintainMs()` in `TimeWindows`:
   ```
   public long maintainMs() {
   return Math.max(maintainDurationMs, sizeMs + gracePeriodMs());
   }
   ```
   
   In contrast, `JoinWindows` used `Math.max(maintainDurationMs, size());` and 
`SessionWindows` used `Math.max(maintainDurationMs, gapMs);`.
   
   Btw: yes, I believe we need to update JoinWindows, too. However, IIRC, we 
never allowed to set retention time: because for stream-stream joins, stores 
are not exposed via IQ, it does not make sense to set a larger retention time 
than size+grace (ie, we ignore `maintainMs()` / `retention()` configuration).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-12 Thread GitBox


mjsax edited a comment on pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#issuecomment-878687476


   Correct, sliding-windows were added later and never had a default grace. 
Also, we deprecated `until() / maintainMs()` before we added sliding-windows 
and because they do not inherit from `Windows` is never added `until() / 
maintainMs()` -- thus, I don't think we need to update them. Note that there is 
an interplay between grace period and retention time in the old API.
   
   However, looking into the code I am a little bit confused about 
`maintainMs()` in `TimeWindows`:
   ```
   public long maintainMs() {
   return Math.max(maintainDurationMs, sizeMs + gracePeriodMs());
   }
   ```
   
   In contrast, `JoinWindows` used `Math.max(maintainDurationMs, size());` and 
`SessionWindows` used `Math.max(maintainDurationMs, gapMs);`.
   
   Btw: yes, I believe we need to update JoinWindows, too. However, IIRC, we 
never allowed to set retention time using the new API: because for 
stream-stream joins, stores are not exposed via IQ, it does not make sense to 
set a larger retention time than size+grace (ie, we ignore `maintainMs()` / 
`retention()` configuration anyway).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-12 Thread GitBox


mjsax edited a comment on pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#issuecomment-878687476


   Correct, sliding-windows were added later and never had a default grace. 
Also, we deprecated `until() / maintainMs()` before we added sliding-windows 
and because they do not inherit from `Windows` is never added `until() / 
maintainMs()` -- thus, I don't think we need to update them. Note that there is 
an interplay between grace period and retention time in the old API.
   
   However, looking into the code I am a little bit confused about 
`maintainMs()` in `TimeWindows`:
   ```
   public long maintainMs() {
   return Math.max(maintainDurationMs, sizeMs + gracePeriodMs());
   }
   ```
   
   In contrast, `JoinWindows` used `Math.max(maintainDurationMs, size());` and 
`SessionWindows` used `Math.max(maintainDurationMs, gapMs);`.
   
   Btw: yes, I believe we (technically) need to update JoinWindows, too. 
However, IIRC, we never allowed to set retention time using the new API: 
because for stream-stream joins, stores are not exposed via IQ, it does not 
make sense to set a larger retention time than size+grace. (Ie, we ignore 
`maintainMs()` / `retention()` configuration anyway).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-12 Thread GitBox


mjsax edited a comment on pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#issuecomment-878690334


   > Otherwise, Streams app that run with 2.8 and before might not be 
compatible with Streams 3.0 because the retention time of the changelog topics 
created with older Streams apps will be smaller than the assumed retention time 
for Streams apps in 3.0.
   
   Why would this be fatal? It seems in 3.0 the retention time would 
potentially become _larger_ (if a new changlog topic is created). And for any 
existing application, we won't reconfigure the topic config. While I agree that 
we should keep the behavior in-sync, I don't see how there could be data loss? 
-- Note that if the old behavior has a shorter retention time, it would be a 
"bug" if the state store keeps stuff longer now, but it won't be a regression 
with regard to data loss, as we never guaranteed to keep the data longer than 
in the old behavior case. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-12 Thread GitBox


mjsax edited a comment on pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#issuecomment-878690334


   > Otherwise, Streams app that run with 2.8 and before might not be 
compatible with Streams 3.0 because the retention time of the changelog topics 
created with older Streams apps will be smaller than the assumed retention time 
for Streams apps in 3.0.
   
   Why would this be fatal? It seems in 3.0 the retention time would 
potentially become _larger_ (if a new changlog topic is created). And for any 
existing application, we won't reconfigure the topic config. While I agree that 
we should keep the behavior in-sync, I don't see how there could be data loss? 
-- Note that if the old behavior has a shorter retention time, it would be a 
"bug" if the state store keeps stuff longer now, but it won't be a regression 
with regard to data loss, as we never guaranteed to keep the data longer than 
in the old behavior case. (Ie, we ignore `maintainMs()` / `retention()` 
configuration anyway).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-12 Thread GitBox


mjsax edited a comment on pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#issuecomment-878687476


   Correct, sliding-windows were added later and never had a default grace. 
Also, we deprecated `until() / maintainMs()` before we added sliding-windows 
and because they do not inherit from `Windows` is never added `until() / 
maintainMs()` -- thus, I don't think we need to update them. Note that there is 
an interplay between grace period and retention time in the old API.
   
   However, looking into the code I am a little bit confused about 
`maintainMs()` in `TimeWindows`:
   ```
   public long maintainMs() {
   return Math.max(maintainDurationMs, sizeMs + gracePeriodMs());
   }
   ```
   
   In contrast, `JoinWindows` used `Math.max(maintainDurationMs, size());` and 
`SessionWindows` used `Math.max(maintainDurationMs, gapMs);`.
   
   Btw: yes, I believe me need to update JoinWindows, too. However, IIRC, we 
never allowed to set retention time using the new API: because for 
stream-stream joins, stores are not exposed via IQ, it does not make sense to 
set a larger retention time than size+grace.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12993) Formatting of Streams 'Memory Management' docs is messed up

2021-07-12 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12993:
---
Fix Version/s: (was: 2.8.1)

> Formatting of Streams 'Memory Management' docs is messed up 
> 
>
> Key: KAFKA-12993
> URL: https://issues.apache.org/jira/browse/KAFKA-12993
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The formatting of this page is all messed up, starting in the RocksDB 
> section. It looks like there's a missing closing tag after the example 
> BoundedMemoryRocksDBConfig class



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12993) Formatting of Streams 'Memory Management' docs is messed up

2021-07-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379486#comment-17379486
 ] 

ASF GitHub Bot commented on KAFKA-12993:


ableegoldman merged pull request #361:
URL: https://github.com/apache/kafka-site/pull/361


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Formatting of Streams 'Memory Management' docs is messed up 
> 
>
> Key: KAFKA-12993
> URL: https://issues.apache.org/jira/browse/KAFKA-12993
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.0.0, 2.8.1
>
>
> The formatting of this page is all messed up, starting in the RocksDB 
> section. It looks like there's a missing closing tag after the example 
> BoundedMemoryRocksDBConfig class



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7493) Rewrite test_broker_type_bounce_at_start

2021-07-12 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-7493:
-

Assignee: (was: A. Sophie Blee-Goldman)

> Rewrite test_broker_type_bounce_at_start
> 
>
> Key: KAFKA-7493
> URL: https://issues.apache.org/jira/browse/KAFKA-7493
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: John Roesler
>Priority: Major
> Fix For: 3.0.0
>
>
> Currently, the test test_broker_type_bounce_at_start in 
> streams_broker_bounce_test.py is ignored.
> As written, there are a couple of race conditions that lead to flakiness.
> It should be possible to re-write the test to wait on log messages, as the 
> other tests do, instead of just sleeping to more deterministically transition 
> the test from one state to the next.
> Once the test is fixed, the fix should be back-ported to all prior branches, 
> back to 0.10.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7493) Rewrite test_broker_type_bounce_at_start

2021-07-12 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-7493:
--
Fix Version/s: (was: 3.0.0)
   3.1.0

> Rewrite test_broker_type_bounce_at_start
> 
>
> Key: KAFKA-7493
> URL: https://issues.apache.org/jira/browse/KAFKA-7493
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: John Roesler
>Priority: Major
> Fix For: 3.1.0
>
>
> Currently, the test test_broker_type_bounce_at_start in 
> streams_broker_bounce_test.py is ignored.
> As written, there are a couple of race conditions that lead to flakiness.
> It should be possible to re-write the test to wait on log messages, as the 
> other tests do, instead of just sleeping to more deterministically transition 
> the test from one state to the next.
> Once the test is fixed, the fix should be back-ported to all prior branches, 
> back to 0.10.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-12 Thread GitBox


mjsax commented on a change in pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#discussion_r668344535



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##
@@ -140,7 +140,7 @@ public static SessionWindows with(final Duration 
inactivityGap) {
 final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
 final long inactivityGapMs = 
validateMillisecondDuration(inactivityGap, msgPrefix);
 
-return new SessionWindows(inactivityGapMs, 
DEPRECATED_OLD_24_HR_GRACE_PERIOD);
+return new SessionWindows(inactivityGapMs, 
Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - inactivityGapMs, 0));

Review comment:
   Is this correct? The old `maintainMs` did use `Math.max(grace, 
inactivityGap)` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-12 Thread GitBox


mjsax edited a comment on pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#issuecomment-878690334


   > Otherwise, Streams app that run with 2.8 and before might not be 
compatible with Streams 3.0 because the retention time of the changelog topics 
created with older Streams apps will be smaller than the assumed retention time 
for Streams apps in 3.0.
   
   Why would this be fatal? It seems in 3.0 the retention time would 
potentially become _larger_ (if a new changlog topic is created). And for any 
existing application, we won't reconfigure the topic config. While I agree that 
we should keep the behavior in-sync, I don't see how there could be data loss? 
-- Note that if the old behavior has a shorter retention time, it would be a 
"bug" if the state store keeps stuff longer now, but it won't be a regression 
with regard to data loss, as we never guaranteed to keep the data longer than 
in the old behavior case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-12 Thread GitBox


mjsax commented on pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#issuecomment-878690334


   > Otherwise, Streams app that run with 2.8 and before might not be 
compatible with Streams 3.0 because the retention time of the changelog topics 
created with older Streams apps will be smaller than the assumed retention time 
for Streams apps in 3.0.
   
   Why would this be fatal? It seems in 3.0 the retention time would 
potentially become _larger_ (if a new changlog topic is created). And for any 
existing application, we won't reconfigure the topic config. While I agree that 
we should keep the behavior in-sync, I don't see how there could be data loss?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag

2021-07-12 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379482#comment-17379482
 ] 

Guozhang Wang commented on KAFKA-13008:
---

Thanks for the great find [~showuon]!

I took a look at the server-side code, and I think we can consider two things:

1) slightly augment the session handling logic so that within a session, if a 
partition was newly requested (here, we would not try to distinguish whether it 
was requested for the very first time, or it was re-added after a while), even 
if the requested position has reached the log end i.e. there's no data to 
return, we still return in the response to encode the log end information. WDYT 
[~cmccabe]?

2) since 1) would be a broker change and even if we do that, it may not help 
all cases for streams, we would still need some remedies. One (somewhat 
hacky..) idea is to actually pay the round-trip in such cases (only when the 
config is set to >= 0), but that since fetch request would not do for old 
versioned brokers, we would use the offset request (either consumer or admin 
has the API to do that) to get the log end offset. In fact, in Streams when we 
get the assigned partitions we always need to get the log end offset for 
changes at first to check if any restoration is needed, we can, just add source 
topic partitions as well in that phase and expose as the initial values for the 
main consumer as well. Note this is only done once after every rebalance, and 
no more.

> Stream will stop processing data for a long time while waiting for the 
> partition lag
> 
>
> Key: KAFKA-13008
> URL: https://issues.apache.org/jira/browse/KAFKA-13008
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: image-2021-07-07-11-19-55-630.png
>
>
> In KIP-695, we improved the task idling mechanism by checking partition lag. 
> It's a good improvement for timestamp sync. But I found it will cause the 
> stream stop processing the data for a long time while waiting for the 
> partition metadata.
>  
> I've been investigating this case for a while, and figuring out the issue 
> will happen in below situation (or similar situation):
>  # start 2 streams (each with 1 thread) to consume from a topicA (with 3 
> partitions: A-0, A-1, A-2)
>  # After 2 streams started, the partitions assignment are: (I skipped some 
> other processing related partitions for simplicity)
>  stream1-thread1: A-0, A-1 
>  stream2-thread1: A-2
>  # start processing some data, assume now, the position and high watermark is:
>  A-0: offset: 2, highWM: 2
>  A-1: offset: 2, highWM: 2
>  A-2: offset: 2, highWM: 2
>  # Now, stream3 joined, so trigger rebalance with this assignment:
>  stream1-thread1: A-0 
>  stream2-thread1: A-2
>  stream3-thread1: A-1
>  # Suddenly, stream3 left, so now, rebalance again, with the step 2 
> assignment:
>  stream1-thread1: A-0, *A-1* 
>  stream2-thread1: A-2
>  (note: after initialization, the  position of A-1 will be: position: null, 
> highWM: null)
>  # Now, note that, the partition A-1 used to get assigned to stream1-thread1, 
> and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 
> record per 30 mins), and partition A-0 has fast input (ex: 10K records / 
> sec). So, now, the stream1-thread1 won't process any data until we got input 
> from partition A-1 (even if partition A-0 is buffered a lot, and we have 
> `{{max.task.idle.ms}}` set to 0).
>  
> The reason why the stream1-thread1 won't process any data is because we can't 
> get the lag of partition A-1. And why we can't get the lag? It's because
>  # In KIP-695, we use consumer's cache to get the partition lag, to avoid 
> remote call
>  # The lag for a partition will be cleared if the assignment in this round 
> doesn't have this partition. check 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272].
>  So, in the above example, the metadata cache for partition A-1 will be 
> cleared in step 4, and re-initialized (to null) in step 5
>  # In KIP-227, we introduced a fetch session to have incremental fetch 
> request/response. That is, if the session existed, the client(consumer) will 
> get the update only when the fetched partition have update (ex: new data). 
> So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 
> mins), it won't have update until next 30 mins, or wait for the fetch session 
> become inactive for (default) 2 mins to be evicted. Either case, the metadata 
> won't be updated for a while.
>  
> In KIP-695, if we don't get the partition lag, we can't determine the 
> partitio

[GitHub] [kafka] mjsax commented on a change in pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-12 Thread GitBox


mjsax commented on a change in pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#discussion_r668344535



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##
@@ -140,7 +140,7 @@ public static SessionWindows with(final Duration 
inactivityGap) {
 final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
 final long inactivityGapMs = 
validateMillisecondDuration(inactivityGap, msgPrefix);
 
-return new SessionWindows(inactivityGapMs, 
DEPRECATED_OLD_24_HR_GRACE_PERIOD);
+return new SessionWindows(inactivityGapMs, 
Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - inactivityGapMs, 0));

Review comment:
   Is this correct? The old `maintainMs` did use `Math.max(grace, 
inactivityGap)` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-12 Thread GitBox


mjsax commented on pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#issuecomment-878687476


   Correct, sliding-windows were added later and never had a default grace. 
Also, we deprecated `until() / maintainMs()` before and because sliding-windows 
do not inherit from `Windows` is never added `until() / maintainMs()` -- 
thus, I don't think we need to update them. Note that there is an interplay 
between grace period and retention time in the old API.
   
   However, looking into the code of the different classes, it seems only 
`TimeWindow` needs to be updated, because it implements:
   ```
   public long maintainMs() {
   return Math.max(maintainDurationMs, sizeMs + gracePeriodMs());
   }
   ```
   
   In contrast, `JoinWindows` used `Math.max(maintainDurationMs, size());` and 
`SessionWindows` used `Math.max(maintainDurationMs, gapMs);` -- thus, there 
retention-time should not be affected by any change to grace-period?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10944: MINOR: Loose verification of startup in EOS system tests

2021-07-12 Thread GitBox


mjsax commented on a change in pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#discussion_r668337811



##
File path: tests/kafkatest/tests/streams/streams_eos_test.py
##
@@ -128,45 +128,61 @@ def run_failure_and_recovery(self, processor1, 
processor2, processor3, verifier)
 verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % 
verifier.STDOUT_FILE, allow_fail=False)
 
 def add_streams(self, processor):
-with processor.node.account.monitor_log(processor.STDOUT_FILE) as 
monitor:
-processor.start()
-self.wait_for_startup(monitor, processor)
+with processor.node.account.monitor_log(processor.LOG_FILE) as 
log_monitor:
+with processor.node.account.monitor_log(processor.STDOUT_FILE) as 
stdout_monitor:
+processor.start()
+self.wait_for_running(stdout_monitor, processor)
+self.wait_for_commit(log_monitor, processor)
 
 def add_streams2(self, running_processor, processor_to_be_started):
-with 
running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as 
monitor:
-self.add_streams(processor_to_be_started)
-self.wait_for_startup(monitor, running_processor)
+with 
running_processor.node.account.monitor_log(running_processor.LOG_FILE) as 
log_monitor:
+with 
running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as 
stdout_monitor:
+self.add_streams(processor_to_be_started)
+self.wait_for_running(stdout_monitor, running_processor)
+self.wait_for_commit(log_monitor, running_processor)
 
 def add_streams3(self, running_processor1, running_processor2, 
processor_to_be_started):
-with 
running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as 
monitor:
-self.add_streams2(running_processor2, processor_to_be_started)
-self.wait_for_startup(monitor, running_processor1)
+with 
running_processor1.node.account.monitor_log(running_processor1.LOG_FILE) as 
log_monitor:
+with 
running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as 
stdout_monitor:
+self.add_streams2(running_processor2, processor_to_be_started)
+self.wait_for_running(stdout_monitor, running_processor1)
+self.wait_for_commit(log_monitor, running_processor1)
 
 def stop_streams(self, processor_to_be_stopped):
 with 
processor_to_be_stopped.node.account.monitor_log(processor_to_be_stopped.STDOUT_FILE)
 as monitor2:
 processor_to_be_stopped.stop()
 self.wait_for(monitor2, processor_to_be_stopped, "StateChange: 
PENDING_SHUTDOWN -> NOT_RUNNING")
 
 def stop_streams2(self, keep_alive_processor, processor_to_be_stopped):
-with 
keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) 
as monitor:
-self.stop_streams(processor_to_be_stopped)
-self.wait_for_startup(monitor, keep_alive_processor)
+with 
keep_alive_processor.node.account.monitor_log(keep_alive_processor.LOG_FILE) as 
log_monitor:
+with 
keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) 
as stdout_monitor:
+self.stop_streams(processor_to_be_stopped)
+self.wait_for_running(stdout_monitor, keep_alive_processor)
+self.wait_for_commit(log_monitor, keep_alive_processor)
 
 def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, 
processor_to_be_stopped):
-with 
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE)
 as monitor:
-self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
-self.wait_for_startup(monitor, keep_alive_processor1)
+with 
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.LOG_FILE) 
as log_monitor:
+with 
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE)
 as stdout_monitor:
+self.stop_streams2(keep_alive_processor2, 
processor_to_be_stopped)
+self.wait_for_running(stdout_monitor, keep_alive_processor1)
+self.wait_for_commit(log_monitor, keep_alive_processor1)
 
 def abort_streams(self, keep_alive_processor1, keep_alive_processor2, 
processor_to_be_aborted):
-with 
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE)
 as monitor1:
-with 
keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE)
 as monitor2:
-processor_to_be_aborted.stop_nodes(False)
-self.wait_for_startup(monitor2, keep_alive_processor2)
-self.wait_for_startup(monitor1, keep_alive_processor1)
-
-def wait_for_startup(self, monitor, processor):
+with 
keep_alive_processor1.n

[GitHub] [kafka] cmccabe commented on pull request #11031: KAFKA-13067 Add internal config to lower the metadata log segment size

2021-07-12 Thread GitBox


cmccabe commented on pull request #11031:
URL: https://github.com/apache/kafka/pull/11031#issuecomment-878677225


   Looks like a test needs to be fixed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

2021-07-12 Thread GitBox


mjsax commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668331269



##
File path: docs/streams/developer-guide/processor-api.html
##
@@ -86,12 +86,48 @@ Overviewclose() method. Note that Kafka Streams may re-use a 
single
   Processor object by calling
   init() on it again after close().
-When records are forwarded via downstream processors they also 
get a timestamp assigned. There are two different default behaviors:
-  (1) If #forward() is called within #process() the output record inherits 
the input record timestamp.
-  (2) If #forward() is called within punctuate() the output record 
inherits the current punctuation timestamp (either current 'stream time' or 
system wall-clock time).
-  Note, that #forward() also allows to change the default behavior 
by passing a custom timestamp for the output record.
-Specifically, ProcessorContext#schedule() accepts a user Punctuator callback 
interface, which triggers its punctuate()
-API method periodically based on the PunctuationType. The PunctuationType 
determines what notion of time is used
+  
+The Processor interface takes two sets of generic 
parameters:
+KIn, VIn, KOut, 
VOut. These define the input and output types
+that the processor implementation can handle. KIn and
+VIn 
define the key and value types that will be passed
+to process().
+Likewise, KOut and VOut
+define the forwarded key and value types that ProcessorContext#forward()
+will accept. If your processor does not forward any records at all 
(or if it only forwards
+null keys or values),
+a best practice is to set the output generic type argument to
+Void.
+If it needs to forward multiple types that don't share a common 
superclass, you will
+have to set the output generic type argument to Object.
+  
+  
+Both the Processor#process()
+and the ProcessorContext#forward()
+methods handle precords in the form of the Record
+data class. This class gives you access to the key components of a 
Kafka record:
+the key, value, timestamp and headers. When forwarding records, 
you can use the
+constructor to create a new Record
+from scratch, or you can use the convenience builder methods to 
replace one of the
+Record's properties

Review comment:
   `properties` -> `fields` (or `elements`) ?

##
File path: docs/streams/developer-guide/processor-api.html
##
@@ -86,12 +86,48 @@ Overviewclose() method. Note that Kafka Streams may re-use a 
single
   Processor object by calling
   init() on it again after close().
-When records are forwarded via downstream processors they also 
get a timestamp assigned. There are two different default behaviors:
-  (1) If #forward() is called within #process() the output record inherits 
the input record timestamp.
-  (2) If #forward() is called within punctuate() the output record 
inherits the current punctuation timestamp (either current 'stream time' or 
system wall-clock time).
-  Note, that #forward() also allows to change the default behavior 
by passing a custom timestamp for the output record.
-Specifically, ProcessorContext#schedule() accepts a user Punctuator callback 
interface, which triggers its punctuate()
-API method periodically based on the PunctuationType. The PunctuationType 
determines what notion of time is used
+  
+The Processor interface takes two sets of generic 
parameters:
+KIn, VIn, KOut, 
VOut. These define the input and output types
+that the processor implementation can handle. KIn and
+VIn 
define the key and value types that will be passed
+to process().
+Likewise, KOut and VOut
+define the forwarded key and value types that ProcessorContext#forward()
+will accept. If your processor does not forward any records at all 
(or if it only forwards
+null keys or values),
+a best practice is to set the output generic type argument to
+Void.
+If it needs to forward multiple types that don't share a common 
superclass, you will
+have to set the output generic type argument to Object.
+  
+  
+Both the Processor#process()
+and the ProcessorContext#forward()
+methods handle precords in the form of the Record
+data class. This class gives you access to the key components of a 
Kafka record:

Review comment:
   `key` -> `main` (to avoid confusion 

[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag

2021-07-12 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379462#comment-17379462
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13008:


{quote}Re-reading KIP-227, it seems like there should be a way for the client 
to add re-acquired partitions like this to the incremental fetch request so 
that it can reinitialize its metadata cache. In other words, it seems like 
getting a partition assigned that you haven't owned for a while is effectively 
the same case as getting a partition that you've never owned, and there does 
seem to be a mechanism for the latter.
{quote}
Thanks John, that is exactly what I was trying to suggest above, but I may have 
mungled it with my lack of understanding of the incremental fetch design. Given 
how long this bug went unnoticed and the in-depth investigation it took to 
uncover the bug (again, nicely done [~showuon]), it seems like any user of the 
plain consumer client in addition to Streams could be easily tripped up by 
this. And just personally, I had to read the analysis twice to really 
understand what was going on, since the behavior was/is so unintuitive to me.

> Stream will stop processing data for a long time while waiting for the 
> partition lag
> 
>
> Key: KAFKA-13008
> URL: https://issues.apache.org/jira/browse/KAFKA-13008
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: image-2021-07-07-11-19-55-630.png
>
>
> In KIP-695, we improved the task idling mechanism by checking partition lag. 
> It's a good improvement for timestamp sync. But I found it will cause the 
> stream stop processing the data for a long time while waiting for the 
> partition metadata.
>  
> I've been investigating this case for a while, and figuring out the issue 
> will happen in below situation (or similar situation):
>  # start 2 streams (each with 1 thread) to consume from a topicA (with 3 
> partitions: A-0, A-1, A-2)
>  # After 2 streams started, the partitions assignment are: (I skipped some 
> other processing related partitions for simplicity)
>  stream1-thread1: A-0, A-1 
>  stream2-thread1: A-2
>  # start processing some data, assume now, the position and high watermark is:
>  A-0: offset: 2, highWM: 2
>  A-1: offset: 2, highWM: 2
>  A-2: offset: 2, highWM: 2
>  # Now, stream3 joined, so trigger rebalance with this assignment:
>  stream1-thread1: A-0 
>  stream2-thread1: A-2
>  stream3-thread1: A-1
>  # Suddenly, stream3 left, so now, rebalance again, with the step 2 
> assignment:
>  stream1-thread1: A-0, *A-1* 
>  stream2-thread1: A-2
>  (note: after initialization, the  position of A-1 will be: position: null, 
> highWM: null)
>  # Now, note that, the partition A-1 used to get assigned to stream1-thread1, 
> and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 
> record per 30 mins), and partition A-0 has fast input (ex: 10K records / 
> sec). So, now, the stream1-thread1 won't process any data until we got input 
> from partition A-1 (even if partition A-0 is buffered a lot, and we have 
> `{{max.task.idle.ms}}` set to 0).
>  
> The reason why the stream1-thread1 won't process any data is because we can't 
> get the lag of partition A-1. And why we can't get the lag? It's because
>  # In KIP-695, we use consumer's cache to get the partition lag, to avoid 
> remote call
>  # The lag for a partition will be cleared if the assignment in this round 
> doesn't have this partition. check 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272].
>  So, in the above example, the metadata cache for partition A-1 will be 
> cleared in step 4, and re-initialized (to null) in step 5
>  # In KIP-227, we introduced a fetch session to have incremental fetch 
> request/response. That is, if the session existed, the client(consumer) will 
> get the update only when the fetched partition have update (ex: new data). 
> So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 
> mins), it won't have update until next 30 mins, or wait for the fetch session 
> become inactive for (default) 2 mins to be evicted. Either case, the metadata 
> won't be updated for a while.
>  
> In KIP-695, if we don't get the partition lag, we can't determine the 
> partition data status to do timestamp sync, so we'll keep waiting and not 
> processing any data. That's why this issue will happen.
>  
> *Proposed solution:*
>  # If we don't get the current lag for a partition, or the current lag > 0, 
> we start to wait for max.task.idle.ms, and reset the deadline when we get the 
> pa

[GitHub] [kafka] ableegoldman commented on a change in pull request #11009: MINOR: update doc for default assignor change

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #11009:
URL: https://github.com/apache/kafka/pull/11009#discussion_r668314609



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
##
@@ -114,15 +114,17 @@
 "ordered by preference, of supported partition assignment strategies 
that the client will use to distribute " +
 "partition ownership amongst consumer instances when group management 
is used. Available options are:" +
 "" +
-"org.apache.kafka.clients.consumer.RangeAssignor: The 
default assignor, which works on a per-topic basis." +
+"org.apache.kafka.clients.consumer.RangeAssignor: 
Assigns partitions on a per-topic basis." +
 
"org.apache.kafka.clients.consumer.RoundRobinAssignor: Assigns 
partitions to consumers in a round-robin fashion." +
 "org.apache.kafka.clients.consumer.StickyAssignor: 
Guarantees an assignment that is " +
 "maximally balanced while preserving as many existing partition 
assignments as possible." +
 
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor: 
Follows the same StickyAssignor " +
 "logic, but allows for cooperative rebalancing." +
 "" +
+"The default assignor is [RangeAssignor, 
CooperativeStickyAssignor], which will use RangeAssignor to do assignment, " +
+"but just needs 1 rolling bounce to upgrade to 
CooperativeStickyAssignor" +

Review comment:
   ```suggestion
   "The default assignor is [RangeAssignor, 
CooperativeStickyAssignor], which will use the RangeAssignor by default, " +
   "but allows upgrading to the CooperativeStickyAssignor with just a 
single rolling bounce that removes the RangeAssignor from the list" +
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-07-12 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r668302886



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##
@@ -246,6 +265,19 @@ public boolean isTransactional() {
 return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
 }
 
+@Override
+public boolean hasDeleteHorizonMs() {
+return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
+}
+
+@Override
+public long deleteHorizonMs() {

Review comment:
   I've removed the `hasDeleteHorizonMs` function from the interface. I've 
made it so the DefaultRecordBatch still has the function as a private helper, 
but the logic overall is pretty much unchanged
   
   I've also made the change to return OptionalLong for `deleteHorizonMs`, 
trying to keep the overall logic the same




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-07-12 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r668302886



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##
@@ -246,6 +265,19 @@ public boolean isTransactional() {
 return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
 }
 
+@Override
+public boolean hasDeleteHorizonMs() {
+return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
+}
+
+@Override
+public long deleteHorizonMs() {

Review comment:
   I've removed the `hasDeleteHorizonMs` function from the interface. I've 
made it so the DefaultRecordBatch still has the function as a private helper, 
but the logic overall is pretty much unchanged
   
   I've also made the change to return OptionalLong for `deleteHorizonMs` to 
see how it looks. I think that the current usage in the cases for which this 
value returns `OptionalLong.Empty` is to just use `RecordBatch.NO_TIMESTAMP`, 
so I am thinking it could be cleaner to keep it as a `long` and contain the 
logic for returning the deleteHorizonMs value or `RecordBatch.NO_TIMESTAMP`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-07-12 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r668302886



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##
@@ -246,6 +265,19 @@ public boolean isTransactional() {
 return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
 }
 
+@Override
+public boolean hasDeleteHorizonMs() {
+return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
+}
+
+@Override
+public long deleteHorizonMs() {

Review comment:
   I've removed the `hasDeleteHorizonMs` function from the interface. I've 
made it so the DefaultRecordBatch still has the function as a private helper, 
but the logic overall is pretty much unchanged
   
   I've also made the change to return OptionalLong for `deleteHorizonMs` to 
see how it looks. I think that the current usage in the cases for which this 
value returns `OptionalLong.Empty` is to just use `RecordBatch.NO_TIMESTAMP`, 
so I am thinking it could be cleaner to keep it as a `long` and contain the 
logic for the deleteHorizonMs value or `RecordBatch.NO_TIMESTAMP`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-07-12 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r668304063



##
File path: checkstyle/suppressions.xml
##
@@ -57,7 +57,7 @@
 
 

Review comment:
   the same suppression is on L54




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-07-12 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r668303344



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##
@@ -167,21 +167,9 @@ public void ensureValid() {
  * {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
  */
 public long baseTimestamp() {
-return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
-}
-
-/**
- * Get the timestamp of the first record in this batch. It is usually the 
create time of the record even if the
- * timestamp type of the batch is log append time.
- * 
- * @return The first timestamp if a record has been appended, unless the 
delete horizon has been set
- * {@link RecordBatch#NO_TIMESTAMP} if the batch is empty or if 
the delete horizon is set
- */
-public long firstTimestamp() {
-final long baseTimestamp = baseTimestamp();
 if (hasDeleteHorizonMs())
 return RecordBatch.NO_TIMESTAMP;

Review comment:
   yes you are right. sorry for the mistake. It should be fixed now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-07-12 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r668302886



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##
@@ -246,6 +265,19 @@ public boolean isTransactional() {
 return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
 }
 
+@Override
+public boolean hasDeleteHorizonMs() {
+return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
+}
+
+@Override
+public long deleteHorizonMs() {

Review comment:
   I've removed the `hasDeleteHorizonMs` function from the interface. I've 
made it so the DefaultRecordBatch still has the function as a private helper, 
but the logic overall is pretty much unchanged
   
   I've also made the change to return OptionalLong for `deleteHorizonMs` to 
see how it looks. I think that the current usage in the cases for which this 
value returns `OptionalLong.Empty` is to just use `RecordBatch.NO_TIMESTAMP`, 
so I am thinking it's just cleaner to keep it as a `long` and contain the logic 
for the deleteHorizonMs value or `RecordBatch.NO_TIMESTAMP`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-07-12 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r668300841



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##
@@ -156,13 +161,27 @@ public void ensureValid() {
 }
 
 /**
- * Get the timestamp of the first record in this batch. It is always the 
create time of the record even if the
+ * Gets the base timestamp of the batch which is used to calculate the 
timestamp deltas.
+ * 
+ * @return The base timestamp or
+ * {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+ */
+public long baseTimestamp() {

Review comment:
   I've updated RecordIterator. I also expanded on 
`MemoryRecordsTest.testBaseTimestampToDeleteHorizonConversion` to check the 
record timestamp to verify it is the correct value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #10280: KAFKA-12554: Refactor Log layer

2021-07-12 Thread GitBox


kowshik commented on pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#issuecomment-878641126


   @junrao Thanks for the review. I ran load tests on the changes from this PR, 
there weren't any new regressions (i.e. latency regressions or errors) that I 
noticed, except for an issue that I found which looks unrelated to this PR, its 
described in this jira: https://issues.apache.org/jira/browse/KAFKA-13070.
   
   The load test was run on a 6-broker cluster with 250GB SSD disks:
* Produce consume on a test topic 2000 partitions (~1000+ replica count per 
broker).
* Per topic # of producers = 6.
* Produce ingress per broker = ~20.5MBps.
* Per topic # of consumers = 6.
* \# of consumer groups = 3.
* Test duration: ~1h.
   
   Mid-way through the test, I rolled the cluster under load to check how the 
cluster behaved. Overall things looked OK.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13070) LogManager shutdown races with periodic work scheduled by the instance

2021-07-12 Thread Kowshik Prakasam (Jira)
Kowshik Prakasam created KAFKA-13070:


 Summary: LogManager shutdown races with periodic work scheduled by 
the instance
 Key: KAFKA-13070
 URL: https://issues.apache.org/jira/browse/KAFKA-13070
 Project: Kafka
  Issue Type: Bug
Reporter: Kowshik Prakasam


In the LogManager shutdown sequence (in LogManager.shutdown()), we don't cancel 
the periodic work scheduled by it prior to shutdown. As a result, the periodic 
work could race with the shutdown sequence causing some unwanted side effects. 
This is reproducible by a unit test in LogManagerTest.

 

```

// set val maxLogAgeMs = 6 in the test

@Test
 def testRetentionPeriodicWorkAfterShutdown(): Unit = {
    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = 
None)
     val logFile = new File(logDir, name + "-0")
     assertTrue(logFile.exists)

    log.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), 
leaderEpoch = 0)
     log.updateHighWatermark(log.logEndOffset)

    logManager.shutdown()

    assertTrue(Files.exists(new File(logDir, 
LogLoader.CleanShutdownFile).toPath))

    time.sleep(maxLogAgeMs + logManager.InitialTaskDelayMs +           
logManager.retentionCheckMs + 1)

    logManager = null
 }

```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


guozhangwang commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668293781



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -186,16 +209,25 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
 List assignedPartitions = new ArrayList<>();
-// Reassign previously owned partitions to the expected number
+// Reassign previously owned partitions, up to the expected number of 
partitions per consumer
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
 
+for (TopicPartition doublyClaimedPartition : 
partitionsWithMultiplePreviousOwners) {
+if (ownedPartitions.contains(doublyClaimedPartition)) {
+log.warn("Found partition {} still claimed as owned by 
consumer {}, despite being claimed by multiple "

Review comment:
   Same here: in our current code this should never happen, so what about 
log as ERROR?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
 // this consumer is potential maxQuota candidate since we're 
still under the number of expected members
 // with more than the minQuota partitions. Note, if the number 
of expected members with more than
-// the minQuota partitions is 0, it means minQuota == 
maxQuota, so they won't be put into unfilledMembers
+// the minQuota partitions is 0, it means minQuota == 
maxQuota, and there are no potentially unfilled
 if (numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota) {
-unfilledMembers.add(consumer);
+potentiallyUnfilledMembersAtMinQuota.add(consumer);

Review comment:
   Honestly it took me quite a while to understand the fix :P After 
understanding that I think maybe it's better to rename these two collections 
more explicitly:
   
   1) `unfilledMembers` -> `MembersWithLessThanMinQuotaPartitions`.
   2) `potentiallyUnfilledMembersAtMinQuota` -> 
`MembersWithExactMinQuotaPartitions`.
   
   And also (since the maxQuota is always either == minQuota or minQuota + 1):
   
   3) `expectedNumMembersAssignedOverMinQuota` -> 
`expectedNumMembersWithMaxQuota` 
   4) `numMembersAssignedOverMinQuota` -> `numMembersWithMaxQuota`

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
##
@@ -555,7 +558,7 @@ public void 
testLargeAssignmentAndGroupWithUniformSubscription() {
 assignor.assign(partitionsPerTopic, subscriptions);
 }
 
-@Timeout(40)
+@Timeout(60)

Review comment:
   +1

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -130,19 +146,26 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 for (final TopicPartition tp : memberData.partitions) {
 // filter out any topics that no longer exist or aren't 
part of the current subscription
 if (allTopics.contains(tp.topic())) {
-ownedPartitions.add(tp);
+
+if (!allPreviousPartitionsToOwner.containsKey(tp)) {
+allPreviousPartitionsToOwner.put(tp, consumer);
+ownedPartitions.add(tp);
+} else {
+String otherConsumer = 
allPreviousPartitionsToOwner.get(tp);
+log.warn("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "

Review comment:
   nit: do you think we should log at ERROR since this is not expected 
really? Right now we would sort of "hide" such bugs and still be able to 
proceed silently; I feel we should shouting out such scenarios a bit louder in 
logs.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -205,6 +237,9 @@ private boolean allSubscriptionsEqual(Set allTopics,
 // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected members
 // with more than the minQuota partitions, so keep "maxQuota" 
of the owned partitions, and revoke the rest of the partit

[GitHub] [kafka] dielhennr commented on a change in pull request #11011: KAFKA-13051: Require Principal Serde and add default

2021-07-12 Thread GitBox


dielhennr commented on a change in pull request #11011:
URL: https://github.com/apache/kafka/pull/11011#discussion_r668286722



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalSerde.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.message.DefaultPrincipalData;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
+
+import java.nio.ByteBuffer;
+
+public interface DefaultKafkaPrincipalSerde extends KafkaPrincipalSerde {

Review comment:
   @hachikuji Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13069) Add magic number to DefaultKafkaPrincipalBuilder.KafkaPrincipalSerde

2021-07-12 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-13069:
-

 Summary: Add magic number to 
DefaultKafkaPrincipalBuilder.KafkaPrincipalSerde
 Key: KAFKA-13069
 URL: https://issues.apache.org/jira/browse/KAFKA-13069
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.8.0, 3.0.0
Reporter: Ron Dagostino
Assignee: Ron Dagostino
 Fix For: 3.0.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] d8tltanc commented on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings

2021-07-12 Thread GitBox


d8tltanc commented on pull request #11002:
URL: https://github.com/apache/kafka/pull/11002#issuecomment-878622218


   @ijuma @rajinisivaram Please let me know if we are good to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings

2021-07-12 Thread GitBox


d8tltanc commented on pull request #11002:
URL: https://github.com/apache/kafka/pull/11002#issuecomment-878621846


   Failed tests: 
   Build / JDK 11 and Scala 2.13 / 
kafka.api.TransactionsTest.testAbortTransactionTimeout()
   Build / JDK 16 and Scala 2.13 / 
kafka.api.TransactionsTest.testSendOffsetsToTransactionTimeout()
   Build / JDK 8 and Scala 2.12 / 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
   
   My Local runs:
   > Task :core:test
   
   ...
   
   TransactionsTest > testSendOffsetsToTransactionTimeout() PASSED
   
   TransactionsTest > testAbortTransactionTimeout() PASSED
   
   ...
   
   ConsumerBounceTest > testCloseDuringRebalance() FAILED
   org.opentest4j.AssertionFailedError: Rebalance did not complete in time 
==> expected:  but was: 
   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
   at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
   at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193)
   at 
kafka.api.ConsumerBounceTest.waitForRebalance$1(ConsumerBounceTest.scala:400)
   at 
kafka.api.ConsumerBounceTest.checkCloseDuringRebalance(ConsumerBounceTest.scala:414)
   at 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance(ConsumerBounceTest.scala:381)
   
   But testCloseDuringRebalance() seems irrlevant


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10986: KAFKA-12983: reset needsJoinPrepare flag before rejoining the group

2021-07-12 Thread GitBox


hachikuji commented on pull request #10986:
URL: https://github.com/apache/kafka/pull/10986#issuecomment-878621043


   To clarify, from the perspective of the eager protocol, how would this case 
look? Would we get multiple calls to `onPartitionsRevoked` with the same set of 
partitions or something else?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11030: MINOR: Unmarking raft quorum configs as internal

2021-07-12 Thread GitBox


cmccabe merged pull request #11030:
URL: https://github.com/apache/kafka/pull/11030


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10091) Improve task idling

2021-07-12 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10091.
--
Resolution: Fixed

> Improve task idling
> ---
>
> Key: KAFKA-10091
> URL: https://issues.apache.org/jira/browse/KAFKA-10091
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> When Streams is processing a task with multiple inputs, each time it is ready 
> to process a record, it has to choose which input to process next. It always 
> takes from the input for which the next record has the least timestamp. The 
> result of this is that Streams processes data in timestamp order. However, if 
> the buffer for one of the inputs is empty, Streams doesn't know what 
> timestamp the next record for that input will be.
> Streams introduced a configuration "max.task.idle.ms" in KIP-353 to address 
> this issue.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization]
> The config allows Streams to wait some amount of time for data to arrive on 
> the empty input, so that it can make a timestamp-ordered decision about which 
> input to pull from next.
> However, this config can be hard to use reliably and efficiently, since what 
> we're really waiting for is the next poll that _would_ return data from the 
> empty input's partition, and this guarantee is a function of the poll 
> interval, the max poll interval, and the internal logic that governs when 
> Streams will poll again.
> The ideal case is you'd be able to guarantee at a minimum that _any_ amount 
> of idling would guarantee you poll data from the empty partition if there's 
> data to fetch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12360) Improve documentation of max.task.idle.ms (kafka-streams)

2021-07-12 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-12360.
--
Resolution: Fixed

> Improve documentation of max.task.idle.ms (kafka-streams)
> -
>
> Key: KAFKA-12360
> URL: https://issues.apache.org/jira/browse/KAFKA-12360
> Project: Kafka
>  Issue Type: Sub-task
>  Components: docs, streams
>Reporter: Domenico Delle Side
>Assignee: John Roesler
>Priority: Minor
>  Labels: beginner, newbie, trivial
>
> _max.task.idle.ms_ is an handy way to pause processing in a *_kafka-streams_* 
> application. This is very useful when you need to join two topics that are 
> out of sync, i.e when data in a topic may be produced _before_ you receive 
> join information in the other topic.
> In the documentation, however, it is not specified that the value of 
> _max.task.idle.ms_ *must* be lower than _max.poll.intervall.ms_, otherwise 
> you'll incur into an endless rebalancing problem.
> I think it is better to clearly state this in the documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12977) Eliminate temporary ProducerStateManager in Log recovery logic

2021-07-12 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam updated KAFKA-12977:
-
Parent: KAFKA-12551
Issue Type: Sub-task  (was: Improvement)

> Eliminate temporary ProducerStateManager in Log recovery logic
> --
>
> Key: KAFKA-12977
> URL: https://issues.apache.org/jira/browse/KAFKA-12977
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Assignee: Kowshik Prakasam
>Priority: Major
>
> The temporary ProducerStateManager (PSM) instance created in the Log recovery 
> logic (inside LogLoader) is a source of complexity and confusion. For 
> example, when fixing KAFKA-12964 (see [PR# 
> 10896|https://github.com/apache/kafka/pull/10896]) we figured that there are 
> cases where the temporary PSM instance's state goes out of sync with the real 
> PSM instance (within LoadLogParams). And we need to adjust the code suitably 
> to handle for the consequences of these 2 instances being out of sync. To fix 
> this, we should just get rid of the temporary PSM instance which is used in 
> the following places:
>  # In LogLoader.recoverLog(), we could just pass in the real PSM.
>  # In LogLoader.completeSwapOperations(), we try to avoid recovering segment 
> here in  [PR #10763|https://github.com/apache/kafka/pull/10763].
>  # In LogLoader.loadSegmentFiles(), we probably need to clean this part of 
> the logic a bit. If we are missing index file or the index file is corrupted, 
> typically we can just rebuild the index without changing PSM. If the segment 
> is truncated while rebuilding the index, we actually want to follow the 
> process in step 1, by just removing the rest of the segments. So, we could 
> also get rid of the temporary PSM in this case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #11003: KAFKA-12360: Document new time semantics

2021-07-12 Thread GitBox


vvcephei commented on pull request #11003:
URL: https://github.com/apache/kafka/pull/11003#issuecomment-878602756


   Merged and cherry-picked to 3.0 (cc @kkonstantine )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13068) Rename Log to UnifiedLog

2021-07-12 Thread Kowshik Prakasam (Jira)
Kowshik Prakasam created KAFKA-13068:


 Summary: Rename Log to UnifiedLog
 Key: KAFKA-13068
 URL: https://issues.apache.org/jira/browse/KAFKA-13068
 Project: Kafka
  Issue Type: Sub-task
Reporter: Kowshik Prakasam
Assignee: Kowshik Prakasam


Once KAFKA-12554 is completed, we can rename Log -> UnifiedLog as described in 
the doc:  
[https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12554) Split Log layer into Log and LocalLog

2021-07-12 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam updated KAFKA-12554:
-
Description: Split Log layer into Log and LocalLog based on the proposal 
described in this document: 
[https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#].
  (was: Split Log layer into UnifiedLog and LocalLog based on the proposal 
described in this document: 
https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#.)

> Split Log layer into Log and LocalLog
> -
>
> Key: KAFKA-12554
> URL: https://issues.apache.org/jira/browse/KAFKA-12554
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Assignee: Kowshik Prakasam
>Priority: Major
>
> Split Log layer into Log and LocalLog based on the proposal described in this 
> document: 
> [https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12554) Split Log layer into Log and LocalLog

2021-07-12 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam updated KAFKA-12554:
-
Summary: Split Log layer into Log and LocalLog  (was: Split Log layer into 
UnifiedLog and LocalLog)

> Split Log layer into Log and LocalLog
> -
>
> Key: KAFKA-12554
> URL: https://issues.apache.org/jira/browse/KAFKA-12554
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Assignee: Kowshik Prakasam
>Priority: Major
>
> Split Log layer into UnifiedLog and LocalLog based on the proposal described 
> in this document: 
> https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei merged pull request #11003: KAFKA-12360: Document new time semantics

2021-07-12 Thread GitBox


vvcephei merged pull request #11003:
URL: https://github.com/apache/kafka/pull/11003


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #11003: KAFKA-12360: Document new time semantics

2021-07-12 Thread GitBox


vvcephei commented on pull request #11003:
URL: https://github.com/apache/kafka/pull/11003#issuecomment-878599816


   Thanks, @JimGalasyn , @showuon , and @abbccdda !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11011: KAFKA-13051: Require Principal Serde and add default

2021-07-12 Thread GitBox


hachikuji commented on a change in pull request #11011:
URL: https://github.com/apache/kafka/pull/11011#discussion_r668258168



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalSerde.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.message.DefaultPrincipalData;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
+
+import java.nio.ByteBuffer;
+
+public interface DefaultKafkaPrincipalSerde extends KafkaPrincipalSerde {

Review comment:
   Fair enough. @dielhennr Can we move this back to where it was? I guess I 
don't see a problem letting the test principal builder implementations extend 
`DefaultKafkaPrincipalBuilder` directly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11011: KAFKA-13051: Require Principal Serde and add default

2021-07-12 Thread GitBox


hachikuji commented on a change in pull request #11011:
URL: https://github.com/apache/kafka/pull/11011#discussion_r668258168



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalSerde.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.message.DefaultPrincipalData;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
+
+import java.nio.ByteBuffer;
+
+public interface DefaultKafkaPrincipalSerde extends KafkaPrincipalSerde {

Review comment:
   Fair enough. @dielhennr Can we move this back to where it was? I guess I 
don't see a problem letting the test principal builder implementations extend 
`KafkaPrincipalBuilder` directly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13040) Increase minimum value of segment.ms and segment.bytes

2021-07-12 Thread Badai Aqrandista (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Badai Aqrandista resolved KAFKA-13040.
--
Resolution: Duplicate

> Increase minimum value of segment.ms and segment.bytes
> --
>
> Key: KAFKA-13040
> URL: https://issues.apache.org/jira/browse/KAFKA-13040
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Badai Aqrandista
>Assignee: Badai Aqrandista
>Priority: Minor
>
> Raised for KIP-760 (linked).
> Many times, Kafka brokers in production crash with "Too many open files" 
> error or "Out of memory" errors because some Kafka topics have a lot of 
> segment files as a result of small {{segment.ms}} or {{segment.bytes}}. These 
> two configuration can be set by any user who is authorized to create topic or 
> modify topic configuration.
> To prevent these two configuration from causing Kafka broker crash, they 
> should have a minimum value that is big enough.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-7760) Add broker configuration to set minimum value for segment.bytes and segment.ms

2021-07-12 Thread Badai Aqrandista (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Badai Aqrandista reopened KAFKA-7760:
-

Reopening issue and making this the main ticket for KIP-760

> Add broker configuration to set minimum value for segment.bytes and segment.ms
> --
>
> Key: KAFKA-7760
> URL: https://issues.apache.org/jira/browse/KAFKA-7760
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Badai Aqrandista
>Assignee: Badai Aqrandista
>Priority: Major
>  Labels: kip, newbie
>
> If someone set segment.bytes or segment.ms at topic level to a very small 
> value (e.g. segment.bytes=1000 or segment.ms=1000), Kafka will generate a 
> very high number of segment files. This can bring down the whole broker due 
> to hitting the maximum open file (for log) or maximum number of mmap-ed file 
> (for index).
> To prevent that from happening, I would like to suggest adding two new items 
> to the broker configuration:
>  * min.topic.segment.bytes, defaults to 1048576: The minimum value for 
> segment.bytes. When someone sets topic configuration segment.bytes to a value 
> lower than this, Kafka throws an error INVALID VALUE.
>  * min.topic.segment.ms, defaults to 360: The minimum value for 
> segment.ms. When someone sets topic configuration segment.ms to a value lower 
> than this, Kafka throws an error INVALID VALUE.
> Thanks
> Badai



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #10986: KAFKA-12983: reset needsJoinPrepare flag before rejoining the group

2021-07-12 Thread GitBox


guozhangwang commented on pull request #10986:
URL: https://github.com/apache/kafka/pull/10986#issuecomment-878589565


   @hachikuji I think the key idea behind this fix is that, if a rebalance 
failed with e.g. memberId lost, then conceptually we would just started a new 
rebalance in which we would call `onJoinPrepare` and in which we may call 
`onRepartitionsRevoked` again. This behavior would be the same for eager or 
cooperative.
   
   Personally I think this fix is fine -- @ableegoldman if you could just add a 
unit test for the case of memberId lost during a first rebalance, and check 
that we would re-triggered `onJoinPrepare` again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah opened a new pull request #11031: KAFKA-13067 Add internal config to lower the metadata log segment size

2021-07-12 Thread GitBox


mumrah opened a new pull request #11031:
URL: https://github.com/apache/kafka/pull/11031


   In order to facilitate system and integration tests that use a smaller log 
segment size, we are adding this internal config to lower the minimum. During 
normal operation, this config will use the default size of 8Mb (as defined by 
KafkaRaftClient).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13003) KafkaBroker advertises socket port instead of the configured advertised port

2021-07-12 Thread Uwe Eisele (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uwe Eisele resolved KAFKA-13003.

Resolution: Fixed

Pull Request #10935 has been merged.

> KafkaBroker advertises socket port instead of the configured advertised port
> 
>
> Key: KAFKA-13003
> URL: https://issues.apache.org/jira/browse/KAFKA-13003
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Uwe Eisele
>Assignee: Uwe Eisele
>Priority: Critical
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> In Kraft mode Apache Kafka 2.8.0 does advertise the socket port instead of 
> the configured advertised port.
> A broker given with the following configuration
> {code:java}
> listeners=PUBLIC://0.0.0.0:19092,REPLICATION://0.0.0.0:9091
> advertised.listeners=PUBLIC://envoy-kafka-broker:9091,REPLICATION://kafka-broker1:9091
> {code}
> advertises on the _PUBLIC_ listener _envoy-kafka-broker:19092_, however I 
> would expect that _envoy-kafka-broker:9091_ is advertised. In ZooKeeper mode 
> it works as expected.
> In a deployment with a L4 proxy in front of the Kafka cluster, it is 
> important, that the advertised port can be different from the actual socket 
> port.
> I tested it with a Docker-Compose setup which runs 3 Kafka Broker in Kraft 
> mode and an Envoy proxy in front of them. With Apache Kafka 2.8.0 it does not 
> work, because Kafka does not advertise the configured advertised port. For 
> more details see: 
> https://github.com/ueisele/kafka/tree/fix/kraft-advertisedlisteners-build/proxy-examples/proxyl4-kafkakraft-bug-2.8
> _Client -- 909[1-3] --> Envoy Proxy -- 19092 --> Kafka Broker [1-3]_
> || Envoy Host || Envoy Port || Kafka Broker || Kafka Port || Advertised 
> Listener ||
> | envoy-kafka-broker | 9091 | kafka-broker1 | 19092 | envoy-kafka-broker:9091 
> |
> | envoy-kafka-broker | 9092 | kafka-broker2 | 19092 | envoy-kafka-broker:9092 
> |
> | envoy-kafka-broker | 9093 | kafka-broker3 | 19092 | envoy-kafka-broker:9093 
> |
> {code:bash}
> > docker-compose exec kafkacat kafkacat -b envoy-kafka-broker:9091 -L
> Metadata for all topics (from broker -1: envoy-kafka-broker:9091/bootstrap):
>  3 brokers:
>   broker 101 at envoy-kafka-broker:19092
>   broker 102 at envoy-kafka-broker:19092 (controller)
>   broker 103 at envoy-kafka-broker:19092
>  0 topics:
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe merged pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port

2021-07-12 Thread GitBox


cmccabe merged pull request #10935:
URL: https://github.com/apache/kafka/pull/10935


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12886) Enable request forwarding by default in 3.1

2021-07-12 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-12886:
--
Fix Version/s: (was: 3.0.0)
   3.1.0

> Enable request forwarding by default in 3.1
> ---
>
> Key: KAFKA-12886
> URL: https://issues.apache.org/jira/browse/KAFKA-12886
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Ryan Dielhenn
>Priority: Major
> Fix For: 3.1.0
>
>
> KIP-590 documents that request forwarding will be enabled in 3.0 by default: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller.
>  This makes it a requirement for users with custom principal implementations 
> to provide a `KafkaPrincipalSerde` implementation. We waited until 3.0 
> because we saw this as a compatibility break. 
> The KIP documents that use of forwarding will be controlled by the IBP. So 
> once the IBP has been configured to 3.0 or above, then the brokers will begin 
> forwarding.
> (Note that forwarding has always been a requirement for kraft.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12886) Enable request forwarding by default in 3.1

2021-07-12 Thread Ryan Dielhenn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379403#comment-17379403
 ] 

Ryan Dielhenn commented on KAFKA-12886:
---

[~kkonstantine] targeting 3.1 now

> Enable request forwarding by default in 3.1
> ---
>
> Key: KAFKA-12886
> URL: https://issues.apache.org/jira/browse/KAFKA-12886
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Ryan Dielhenn
>Priority: Major
> Fix For: 3.0.0
>
>
> KIP-590 documents that request forwarding will be enabled in 3.0 by default: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller.
>  This makes it a requirement for users with custom principal implementations 
> to provide a `KafkaPrincipalSerde` implementation. We waited until 3.0 
> because we saw this as a compatibility break. 
> The KIP documents that use of forwarding will be controlled by the IBP. So 
> once the IBP has been configured to 3.0 or above, then the brokers will begin 
> forwarding.
> (Note that forwarding has always been a requirement for kraft.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12886) Enable request forwarding by default in 3.1

2021-07-12 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-12886:
--
Summary: Enable request forwarding by default in 3.1  (was: Enable request 
forwarding by default in 3.0)

> Enable request forwarding by default in 3.1
> ---
>
> Key: KAFKA-12886
> URL: https://issues.apache.org/jira/browse/KAFKA-12886
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Ryan Dielhenn
>Priority: Major
> Fix For: 3.0.0
>
>
> KIP-590 documents that request forwarding will be enabled in 3.0 by default: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller.
>  This makes it a requirement for users with custom principal implementations 
> to provide a `KafkaPrincipalSerde` implementation. We waited until 3.0 
> because we saw this as a compatibility break. 
> The KIP documents that use of forwarding will be controlled by the IBP. So 
> once the IBP has been configured to 3.0 or above, then the brokers will begin 
> forwarding.
> (Note that forwarding has always been a requirement for kraft.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dielhennr opened a new pull request #11030: MINOR: Unmarking raft quorum configs as internal

2021-07-12 Thread GitBox


dielhennr opened a new pull request #11030:
URL: https://github.com/apache/kafka/pull/11030


   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

2021-07-12 Thread GitBox


showuon opened a new pull request #11026:
URL: https://github.com/apache/kafka/pull/11026


   Some issues found in the ListConsumerGroupOffsetsHandler:
   
   1. if coordinator errors is put in the topic partition, we don't do retry
   2. Didn't handle possible partition level exception
   
   This is the old handle response logic. FYR:
   ```java
   void handleResponse(AbstractResponse abstractResponse) {
 final OffsetFetchResponse response = (OffsetFetchResponse) 
abstractResponse;
 final Map groupOffsetsListing = new 
HashMap<>();
   
 // If coordinator changed since we fetched it, retry
 if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
 Call call = getListConsumerGroupOffsetsCall(context);
 rescheduleFindCoordinatorTask(context, () -> call, this);
 return;
 }
   
 if (handleGroupRequestError(response.error(), context.future()))
 return;
   
 for (Map.Entry 
entry :
 response.responseData().entrySet()) {
 final TopicPartition topicPartition = entry.getKey();
 OffsetFetchResponse.PartitionData partitionData = entry.getValue();
 final Errors error = partitionData.error;
   
 if (error == Errors.NONE) {
 final Long offset = partitionData.offset;
 final String metadata = partitionData.metadata;
 final Optional leaderEpoch = 
partitionData.leaderEpoch;
 // Negative offset indicates that the group has no committed 
offset for this partition
 if (offset < 0) {
 groupOffsetsListing.put(topicPartition, null);
 } else {
 groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, leaderEpoch, metadata));
 }
 } else {
 log.warn("Skipping return offset for {} due to error {}.", 
topicPartition, error);
 }
 }
 context.future().complete(groupOffsetsListing);
 }
   ```
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >