[jira] [Updated] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-08-05 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13128:
---
Fix Version/s: (was: 2.8.1)
   (was: 3.0.0)

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0, 2.8.1
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: flaky-test
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-08-05 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reopened KAFKA-13128:

  Assignee: (was: A. Sophie Blee-Goldman)

Failed again for a different reason – just flaky, seems we need to wait for the 
thread to fully start up

 

{{java.lang.AssertionError: Unexpected exception thrown while getting the value 
from store.
Expected: is (a string containing "Cannot get state store source-table because 
the stream thread is PARTITIONS_ASSIGNED, not RUNNING" or a string containing 
"The state store, source-table, may have migrated to another instance")
 but: was "Cannot get state store source-table because the stream thread is 
STARTING, not RUNNING"}}

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0, 2.8.1
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.0.0, 2.8.1
>
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

2021-08-05 Thread GitBox


dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683960767



##
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##
@@ -214,6 +220,12 @@ class KafkaRaftManager[T](
 KafkaRaftManager.createLogDirectory(new File(config.metadataLogDir), 
logDirName)
   }
 
+  // visible for testing cleanup
+  private[raft] def deleteDataDir(): Unit = {
+val logDirName = Log.logDirName(topicPartition)
+KafkaRaftManager.deleteLogDirectory(new File(config.metadataLogDir), 
logDirName)

Review comment:
   @jsancio What do you think about exposing this api to the package for 
cleaning up log directories in tests? I'm not sure exactly how to override 
`createDataDir` with `tempDir` from the tests. It is called internally in 
`KafkaRaftManager` without an argument.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


guozhangwang commented on pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#issuecomment-893993452


   LGTM! Please feel free to merge after green builds.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683938654



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##
@@ -67,13 +72,31 @@
 public class NamedTopologyIntegrationTest {
 public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
+// TODO KAFKA-12648:

Review comment:
   SG.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683938587



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability

Review comment:
   > I was referring to moving the `builders` map to the `TopologyVersion` 
in the above, ie I want to save that for Pt. 4 if that's ok
   
   Yup, totally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11175: MINOR: Fix getting started documentation

2021-08-05 Thread GitBox


showuon commented on a change in pull request #11175:
URL: https://github.com/apache/kafka/pull/11175#discussion_r683905633



##
File path: config/kraft/README.md
##
@@ -26,7 +27,7 @@ xtzWWN4bTjitpL3kfd9s5g
 
 
 ## Format Storage Directories
-The next step is to format your storage directories.  If you are running in 
single-node mode, you can do this with one command:
+The next step is to format your storage directories.  If you are running in 
single-node mode, you can do this with one command. The string `` should 
be replaced with the value returned by `kafka-storage.sh random-uuid`:
 
 
 $ ./bin/kafka-storage.sh format -t  -c ./config/kraft/server.properties

Review comment:
   How about we put the last sentence as the command line comment. That is:
   
   
   # The string `` should be replaced with the value returned by 
`kafka-storage.sh random-uuid`
   
   $ ./bin/kafka-storage.sh format -t  -c ./config/kraft/server.properties
   Formatting /tmp/kraft-combined-logs
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683902306



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -517,9 +526,9 @@ void handleRevocation(final Collection 
revokedPartitions) {
 }
 
 if (!remainingRevokedPartitions.isEmpty()) {
-log.warn("The following partitions {} are missing from the task 
partitions. It could potentially " +
+log.debug("The following revoked partitions {} are missing from 
the current task partitions. It could potentially " +

Review comment:
   Making this debug since `warn` seems too intense, and I'm not sure it's 
even worthy of `info` -- also, with named topologies you would expect to see 
this almost every time a topology is removed since the thread will try to close 
those tasks as soon as it notices the topology's removal 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683902306



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -517,9 +526,9 @@ void handleRevocation(final Collection 
revokedPartitions) {
 }
 
 if (!remainingRevokedPartitions.isEmpty()) {
-log.warn("The following partitions {} are missing from the task 
partitions. It could potentially " +
+log.debug("The following revoked partitions {} are missing from 
the current task partitions. It could potentially " +

Review comment:
   Making this debug since `warn` seems too intense, and I'm not sure it's 
even worthy of `info` -- also, with named topologies you would expect to see 
this almost every time a topology is removed 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0

2021-08-05 Thread GitBox


showuon commented on a change in pull request #11184:
URL: https://github.com/apache/kafka/pull/11184#discussion_r683900989



##
File path: docs/streams/upgrade-guide.html
##
@@ -52,6 +52,15 @@ Upgrade Guide and API Changes
  restart all new ({{fullDotVersion}}) application instances 
 
 
+
+Since 3.0.0 release, Kafka Streams uses a newer RocksDB version which 
bumped its footer version persisted on files.
+This means that old versioned RocksDB would not be able to recognize 
the bytes written by that newer versioned RocksDB,

Review comment:
   nit: Should we put the summary of this paragraph in the first sentence?
   As you can see, this is the 1st sentence of the above paragraph: `Upgrading 
from any older version to 2.8.0 is possible: `, which is clear that we are 
introducing upgrading things.
   
   And in this paragraph, the main point we want to deliver is "Downgrading 
from 3.0.0 to older versions, additional works needed to be done". So, we might 
be able to put this main point at the front, and then explain why it needs this 
work, to make it clear.
   
   What do you think? (BTW, I'm OK with current version :) )




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683901855



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -775,26 +806,16 @@ private void completeTaskCloseClean(final Task task) {
 void shutdown(final boolean clean) {
 final AtomicReference firstException = new 
AtomicReference<>(null);
 
-final Set tasksToCloseDirty = new HashSet<>();

Review comment:
   No actual changes here, just pulled the cleanup of tasks out into a 
separate new `#closeAndCleanUpTasks` method so we can call that on tasks from 
removed topologies




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

2021-08-05 Thread GitBox


dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683894514



##
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##
@@ -262,6 +262,8 @@ class KafkaConfigTest {
 props.put(KafkaConfig.ProcessRolesProp, "controller")
 props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
 props.put(KafkaConfig.NodeIdProp, "1")
+props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")

Review comment:
   `props.put`

##
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##
@@ -96,6 +96,7 @@ class ControllerApisTest {
 props.put(KafkaConfig.NodeIdProp, nodeId: java.lang.Integer)
 props.put(KafkaConfig.ProcessRolesProp, "controller")
 props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
+props.put(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")

Review comment:
   remove brackets from `nodeId` string interpolation

##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -132,6 +132,8 @@ class KafkaApisTest {
   val properties = TestUtils.createBrokerConfig(brokerId, "")
   properties.put(KafkaConfig.NodeIdProp, brokerId.toString)
   properties.put(KafkaConfig.ProcessRolesProp, "broker")
+  val voterId = (brokerId + 1)
+  properties.setProperty(KafkaConfig.QuorumVotersProp, 
s"$voterId@localhost:9093")

Review comment:
   `props.put`

##
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##
@@ -1091,6 +1093,7 @@ class KafkaConfigTest {
 val largeBrokerId = 2000
 val props = new Properties()
 props.put(KafkaConfig.ProcessRolesProp, "broker")
+props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")

Review comment:
   Use `props.put` to stay consistent with method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zhaohaidao commented on pull request #11182: KAFKA-13074: Implement mayClean for MockLog

2021-08-05 Thread GitBox


zhaohaidao commented on pull request #11182:
URL: https://github.com/apache/kafka/pull/11182#issuecomment-893935065


   @jsancio  Could you please review this pr if you have time, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

2021-08-05 Thread GitBox


dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683880423



##
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##
@@ -34,21 +34,30 @@ import org.mockito.Mockito._
 
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, 
nodeId:String) = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: 
TopicPartition, processRoles: String, nodeId: String) = {

Review comment:
   I'll switch it to use `tempDir` shortly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-08-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13132.
-
Resolution: Fixed

> Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
> --
>
> Key: KAFKA-13132
> URL: https://issues.apache.org/jira/browse/KAFKA-13132
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.0.0
>
>
> With the change in 3.0 to how topic IDs are assigned to logs, a bug was 
> inadvertently introduced. Now, topic IDs will only be assigned on the load of 
> the log to a partition in LISR requests. This means we will only assign topic 
> IDs for newly created topics/partitions, on broker startup, or potentially 
> when a partition is reassigned.
>  
> In the case of upgrading from an IBP before 2.8, we may have a scenario where 
> we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller 
> is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last 
> broker upgrading, we will elect a new controller but its LISR request will 
> not result in topic IDs being assigned to logs of existing topics. They will 
> only be assigned in the cases mentioned above.
> *Keep in mind, in this scenario, topic IDs will be still be assigned in the 
> controller/ZK to all new and pre-existing topics and will show up in 
> metadata.*  This means we are not ensured the same guarantees we had in 2.8. 
> *It is just the LISR/partition.metadata part of the code that is affected.* 
>  
> The problem is two-fold
>  1. We ignore LISR requests when the partition leader epoch has not increased 
> (previously we assigned the ID before this check)
>  2. We only assign the topic ID when we are associating the log with the 
> partition in replicamanager for the first time. Though in the scenario 
> described above, we have logs associated with partitions that need to be 
> upgraded.
>  
> We should check the if the LISR request is resulting in a topic ID addition 
> and add logic to logs already associated to partitions in replica manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

2021-08-05 Thread GitBox


dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683879932



##
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##
@@ -34,21 +34,30 @@ import org.mockito.Mockito._
 
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, 
nodeId:String) = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: 
TopicPartition, processRoles: String, nodeId: String) = {

Review comment:
   I did this using 
   
   `scala.reflect.io.Directory.deleteRecursively`
   
   as you were reviewing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

2021-08-05 Thread GitBox


hachikuji merged pull request #11171:
URL: https://github.com/apache/kafka/pull/11171


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #11184:
URL: https://github.com/apache/kafka/pull/11184#discussion_r683876709



##
File path: docs/streams/upgrade-guide.html
##
@@ -52,6 +52,15 @@ Upgrade Guide and API Changes
  restart all new ({{fullDotVersion}}) application instances 
 
 
+
+Since 3.0.0 release, Kafka Streams uses a newer RocksDB version which 
bumped its footer version persisted on files.
+This means that old versioned RocksDB would not be able to recognize 
the bytes written by that newer versioned RocksDB,
+and hence it is harder to downgrade Kafka Streams with version 3.0.0 
or newer to older versions in-flight.
+Users need to wipe out the local RocksDB state stores written by the 
new versioned Kafka Streams before swapping in the
+older versioned Kafka Streams bytecode, which when then restore the 
state stores with the old versioned footer from the

Review comment:
   ```suggestion
   older versioned Kafka Streams bytecode, which would then restore the 
state stores with the old versioned footer from the
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683874039



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -719,6 +723,8 @@ void runOnce() {
 
 final long pollLatency = pollPhase();
 
+topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);

Review comment:
   Ack (although note that there's no wasted work on the restore phase 
since there's by definition nothing for the thread to do yet as it won't have 
been assigned any new tasks until it polls again). 
   
   I don't think it really matters much where we put this for that reason, 
except for the case in which we start up with no topology -- then it's a waste 
to join the group in the first place, so we may as well wait until we receive 
something to work on. So yes, I'll move it back ahead of poll




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683872205



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability

Review comment:
   > I'll revisit the best way to handle the builders map in that as well
   
   I was referring to moving the `builders` map to the `TopologyVersion` in the 
above, ie I want to save that for Pt. 4  if that's ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13174) Log Compaction Blocked Forever by Unstable Offset/Unclosed Transaction

2021-08-05 Thread Michael Jaschob (Jira)
Michael Jaschob created KAFKA-13174:
---

 Summary: Log Compaction Blocked Forever by Unstable 
Offset/Unclosed Transaction
 Key: KAFKA-13174
 URL: https://issues.apache.org/jira/browse/KAFKA-13174
 Project: Kafka
  Issue Type: Bug
  Components: core, log cleaner
Affects Versions: 2.5.1
Reporter: Michael Jaschob


Our production cluster experienced a single __consumer_offsets partition that 
was growing without ever being compacted. A closer inspection of the cleaner 
logs showed that the log was forever uncleanable at an offset from July 28, 
which had been written ~7 days previously:
{code:java}
[2021-08-02 19:08:39,650] DEBUG Finding range of cleanable offsets for 
log=__consumer_offsets-9. Last clean offset=Some(75795702964) now=1627956519650 
=> firstDirtyOffset=75795702964 firstUncleanableOffset=75868740168 
activeSegment.baseOffset=76691318694 (kafka.log.LogCleanerManager$)
{code}
Using the log dumper tool, we were able to examine the records/batches around 
this offset and determined that the proximate cause was an "open" transaction 
that was never committed or aborted. We saw this:
 - a consumer group offset commit for group {{foo-group}} for topic-partition 
{{foo-topic-46}} from pid 1686325 (log offset 75868731509)
 - a transactional COMMIT marker from pid 1686325 (log offset 75868731579)
 - another consumer group offset commit for group {{foo-group}} for 
topic-partition {{foo-topic-46}} for pid 1686325 (log offset 75868740168, our 
first uncleanable offset)

Here's the raw log dumper output:
{code:java}
baseOffset: 75868731509 lastOffset: 75868731509 count: 1 baseSequence: 0 
lastSequence: 0 producerId: 1686325 producerEpoch: 249 partitionLeaderEpoch: 
320 isTransactional: true isControl: false position: 98725764 CreateTime: 
1627495733656 size: 232 magic
: 2 compresscodec: NONE crc: 485368943 isvalid: true
| offset: 75868731509 CreateTime: 1627495733656 keysize: 126 valuesize: 36 
sequence: 0 headerKeys: [] key: 
offset_commit::group=foo-group,partition=foo-topic-46 payload: 
offset=59016695,metadata=AQAAAXruS8Fg
...

baseOffset: 75868731579 lastOffset: 75868731579 count: 1 baseSequence: -1 
lastSequence: -1 producerId: 1686325 producerEpoch: 249 partitionLeaderEpoch: 
320 isTransactional: true isControl: true position: 98732634 CreateTime: 
1627495733700 size: 78 magic
: 2 compresscodec: NONE crc: 785483064 isvalid: true
| offset: 75868731579 CreateTime: 1627495733700 keysize: 4 valuesize: 6 
sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 143
...

baseOffset: 75868740168 lastOffset: 75868740168 count: 1 baseSequence: 0 
lastSequence: 0 producerId: 1686325 producerEpoch: 249 partitionLeaderEpoch: 
320 isTransactional: true isControl: false position: 99599843 CreateTime: 
1627495737629 size: 232 magic: 2 compresscodec: NONE crc: 1222829008 isvalid: 
true
| offset: 75868740168 CreateTime: 1627495737629 keysize: 126 valuesize: 36 
sequence: 0 headerKeys: [] key: 
offset_commit::group=foo-group,partition=foo-topic-46 payload: 
offset=59016695,metadata=AQAAAXruS8Fg
...
{code}
There was no further activity from that pid 1686325. In fact, the KStream 
application in question stalled on that partition because of this unstable 
offset/open transaction: {{The following partitions still have unstable offsets 
which are not cleared on the broker side: [foo-topic-46], this could be either 
transactional offsets waiting for completion, or normal offsets waiting for 
replication after appending to local log}}

We then captured the producer snapshot file from the broker data directory and 
wrote a quick tool to dump it as text. From its output, we found that the 
transactional producer in question (pid 1686325) was still considered alive 
with its hanging transaction at 75868740168:
{code:java}
ArraySeq(ProducerStateEntry(producerId=1686325, producerEpoch=249, 
currentTxnFirstOffset=Some(75868740168), coordinatorEpoch=143, 
lastTimestamp=1627495737629, batchMetadata=Queue(BatchMetadata(firstSeq=0, 
lastSeq=0, firstOffset=75868740168, lastOffset=75868740168, 
timestamp=1627495737629)))
{code}
This was very perplexing. As far as we can tell, the code in both Apache Kafka 
2.5.1 and in trunk essentially treats an open transaction like we had as 
uncleanable, which in practice blocks the log from ever being compacted again, 
for all eternity. Once a pid has an open transaction - a defined 
{{currentTxnFirstOffset}} - {{ProducerStateManager}} will [never expire the 
producer|https://github.com/apache/kafka/blob/2.5.1/core/src/main/scala/kafka/log/ProducerStateManager.scala#L576-L577],
 even after {{transactional.id.expiration.ms}} has passed. This, obviously, has 
severe repercussions on a topic like __consumer_offsets (long coordinator load 
times, always-growing disk usage).

While we're not sure what led to this hanging open transaction (note: we were 
running partiti

[GitHub] [kafka] jsancio commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

2021-08-05 Thread GitBox


jsancio commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683859800



##
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##
@@ -262,6 +262,8 @@ class KafkaConfigTest {
 props.put(KafkaConfig.ProcessRolesProp, "controller")
 props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
 props.put(KafkaConfig.NodeIdProp, "1")
+props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, s"1@localhost:9092")

Review comment:
   This is regular string. String interpolation `s"..."` is not needed. 
This comment applies to a few places.

##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1949,6 +1951,19 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   )
 }
 
+val voterIds: Set[Integer] = if (usesSelfManagedQuorum) 
RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else 
Set.empty
+if (voterIds.nonEmpty) {
+  if (processRoles.contains(ControllerRole)) {
+// Ensure that controllers use their node.id as a voter in 
controller.quorum.voters 
+require(voterIds.contains(nodeId), s"If 
${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id 
$nodeId must be included in the set of voters 
${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+  } else {
+// Ensure that the broker's node.id is not an id in 
controller.quorum.voters
+require(!voterIds.contains(nodeId), s"If 
${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the 
node id $nodeId must not be included in the set of voters 
${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+  }
+} else if (usesSelfManagedQuorum) {
+  throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, 
${RaftConfig.QUORUM_VOTERS_CONFIG} must contain a parseable set of voters.")
+}

Review comment:
   ```scala
   if (usesSelfManagedQuorum) {
 val voterIds = 
RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet
 if (voterIds.isEmpty) {
   throw new ConfigException(s"If using 
${KafkaConfig.ProcessRolesProp}, ${RaftConfig.QUORUM_VOTERS_CONFIG} must 
contain a parseable set of voters.")
 } else if (processRoles.contains(ControllerRole)) {
   // Ensure that controllers use their node.id as a voter in 
controller.quorum.voters 
   require(voterIds.contains(nodeId), s"If 
${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id 
$nodeId must be included in the set of voters 
${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
 } else {
   // Ensure that the broker's node.id is not an id in 
controller.quorum.voters
   require(!voterIds.contains(nodeId), s"If 
${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the 
node id $nodeId must not be included in the set of voters 
${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
 }
   }
   ```

##
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##
@@ -34,21 +34,30 @@ import org.mockito.Mockito._
 
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, 
nodeId:String) = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: 
TopicPartition, processRoles: String, nodeId: String) = {
 def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): 
KafkaConfig = {
   val props = new Properties
   props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
   props.setProperty(KafkaConfig.NodeIdProp, nodeId)
   props.setProperty(KafkaConfig.ListenersProp, 
"PLAINTEXT://localhost:9093")
   props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-  props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, 
nodeId.concat("@localhost:9093"))
-  if (processRoles.contains("broker"))
+  if (processRoles.contains("broker")) {
 props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
 props.setProperty(KafkaConfig.AdvertisedListenersProp, 
"PLAINTEXT://localhost:9092")
+if (!processRoles.contains("controller")) {
+  val nodeIdMod = (nodeId.toInt + 1)
+  props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, 
s"${nodeIdMod.toString}@localhost:9093")

Review comment:
   String interpolation should call `toString` so you don't need to call it.

##
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##
@@ -34,21 +34,30 @@ import org.mockito.Mockito._
 
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, 
nodeId:String) = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: 
TopicPartition, processRoles: String, nodeId: String) = {

Review comment:
   The issue is that after every test you need to delete the directory 
create

[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683851998



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition 
topicPartition, final Lon
 }
 }
 
+/**
+ * Checks for added or removed NamedTopologies that correspond to any 
assigned tasks, and creates/freezes them if so
+ */
+void handleTopologyUpdates() {
+tasks.maybeCreateTasksFromNewTopologies();
+for (final Task task : activeTaskIterable()) {
+if 
(topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) {
+task.freezeProcessing();

Review comment:
   Well long story short I was trying to avoid mucking around in the task 
state management (which has historically been the source of many critical bugs) 
-- also we want to remove the `suspended` state soon anyways. But actually it 
seems simpler to just `close()` the tasks here and now altogether




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0

2021-08-05 Thread GitBox


guozhangwang commented on pull request #11184:
URL: https://github.com/apache/kafka/pull/11184#issuecomment-893876355


   ping @ableegoldman @cadonna for reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang opened a new pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0

2021-08-05 Thread GitBox


guozhangwang opened a new pull request #11184:
URL: https://github.com/apache/kafka/pull/11184


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13170) Flaky Test InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown

2021-08-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394358#comment-17394358
 ] 

Guozhang Wang commented on KAFKA-13170:
---

I think I know the reason of this flaky test. I have a PR 
https://github.com/apache/kafka/pull/11155 for another case within this class 
but it should be able to fix this one as well. [~ableegoldman] could you help 
me reviewing that one?

> Flaky Test InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown
> --
>
> Key: KAFKA-13170
> URL: https://issues.apache.org/jira/browse/KAFKA-13170
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11176/2/testReport/org.apache.kafka.streams.processor.internals/InternalTopicManagerTest/Build___JDK_8_and_Scala_2_12___shouldRetryDeleteTopicWhenTopicUnknown_2/]
> {code:java}
> Stacktracejava.lang.AssertionError: unexpected exception type thrown; 
> expected: but 
> was:
>   at org.junit.Assert.assertThrows(Assert.java:1020)
>   at org.junit.Assert.assertThrows(Assert.java:981)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenRetriableException(InternalTopicManagerTest.java:526)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown(InternalTopicManagerTest.java:497)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683823753



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability

Review comment:
   SGTM.
   
   What about the other comment, i.e. moving the `Map builders` into the `TopologyVersion` itself? Besides 
the constructors, the only modifiers to `builders` seem to be 
`register/deregister`, in which we would always try to `getAndIncrement` 
version. So what about consolidating the modification of builders along with 
version bump, and hence we would not need to use a `ConcurrentNavigableMap`?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition 
topicPartition, final Lon
 }
 }
 
+/**
+ * Checks for added or removed NamedTopologies that correspond to any 
assigned tasks, and creates/freezes them if so
+ */
+void handleTopologyUpdates() {
+tasks.maybeCreateTasksFromNewTopologies();
+for (final Task task : activeTaskIterable()) {
+if 
(topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) {
+task.freezeProcessing();

Review comment:
   Just another thought: if this is just for the very short temporary phase 
between when the topology is removed to when the rebalance is finally triggered 
to remove the tasks (which should usually be in the next poll), could we just 
call `task.suspend` instead of adding the new `freeze` logic? When we finally 
close the tasks we can still transit from suspended to closed?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -719,6 +723,8 @@ void runOnce() {
 
 final long pollLatency = pollPhase();
 
+topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);

Review comment:
   How about moving this ahead of `pollPhase()`? We are likely to be kicked 
out of the group while blocked waiting here, so it's better to be aware of that 
and re-join the group immediately, rather than doing the restore/etc still 
which may be all wasted work.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -867,7 +873,25 @@ private void initializeAndRestorePhase() {
 log.debug("Idempotent restore call done. Thread state has not 
changed.");
 }
 
+// Check if the topology has been updated since we last checked, ie via 
#addNamedTopology or #removeNamedTopology
+private void checkForTopologyUpdates() {
+if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) {
+lastSeenTopologyVersion = topologyMetadata.topologyVersion();
+taskManager.handleTopologyUpdates();
+
+log.info("StreamThread has detected an update to the topology, 
triggering a rebalance to refresh the assignment");

Review comment:
   very nit: just add a TODO that we can improve this case to not always 
enforce rebalance when version bumped, in case we forgot in future PRs?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11183: MINOR: Increase the Kafka shutdown timeout to 120

2021-08-05 Thread GitBox


ableegoldman commented on pull request #11183:
URL: https://github.com/apache/kafka/pull/11183#issuecomment-893863088


   Merged to trunk and cherrypicked to 3.0 to help stabilize system tests (cc 
@kkonstantine)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11183: MINOR: Increase the Kafka shutdown timeout to 120

2021-08-05 Thread GitBox


ableegoldman merged pull request #11183:
URL: https://github.com/apache/kafka/pull/11183


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11183: MINOR: Increase the Kafka shutdown timeout to 120

2021-08-05 Thread GitBox


ableegoldman commented on pull request #11183:
URL: https://github.com/apache/kafka/pull/11183#issuecomment-893861008


   One unrelated flaky test failure in ` 
kafka.api.PlaintextConsumerTest.testListTopics()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly

2021-08-05 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394319#comment-17394319
 ] 

Jason Gustafson commented on KAFKA-13173:
-

I upgraded this bug to a blocker because I think it can result in data loss. 
For example, in the example above, the second ISR change would be interpreted 
as an expansion, but there may have been committed writes to the log between 
the two ISR changes which were not reflected in the expansion.

> KRaft controller does not handle simultaneous broker expirations correctly
> --
>
> Key: KAFKA-13173
> URL: https://issues.apache.org/jira/browse/KAFKA-13173
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Niket Goel
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
> stale replicas and attempt to remove them from the ISR. However, when 
> multiple expirations occur at once, we do not properly accumulate the ISR 
> changes. For example, I ran a test where the ISR of a partition was 
> initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at 
> the same time. The records that were generated by `fenceStaleBrokers` were 
> the following:
> {code}
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
> {code}
> First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the 
> record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing 
> of broker 3 is handled. So we did not account for the fact that we had 
> already fenced broker 2 in the request. 
> A simple solution for now is to change the logic to handle fencing only one 
> broker at a time. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly

2021-08-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13173:

Description: 
In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
stale replicas and attempt to remove them from the ISR. However, when multiple 
expirations occur at once, we do not properly accumulate the ISR changes. For 
example, I ran a test where the ISR of a partition was initialized to [1, 2, 
3]. Then I triggered a timeout of replicas 2 and 3 at the same time. The 
records that were generated by `fenceStaleBrokers` were the following:

{code}
ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
removingReplicas=null, addingReplicas=null) at version 0), 
ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
removingReplicas=null, addingReplicas=null) at version 0), 
ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
{code}

First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the record 
to fence broker 2. Then the ISR is modified to [1, 2] as the fencing of broker 
3 is handled. So we did not account for the fact that we had already fenced 
broker 2 in the request. 

A simple solution for now is to change the logic to handle fencing only one 
broker at a time. 

  was:
In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
stale replicas and attempt to remove them from the ISR. However, when multiple 
expirations occur at once, we do not properly accumulate the ISR changes. For 
example, I ran a test where the ISR of a partition was initialized to [1, 2, 
3]. Then I triggered a timeout of replicas 2 and 3 at the same time. The 
records that were generated by `fenceStaleBrokers` were the following:

{code}
ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
removingReplicas=null, addingReplicas=null) at version 0), 
ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
removingReplicas=null, addingReplicas=null) at version 0), 
ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
{code}

First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the record 
to fence broker 2. Then the ISR is modified to [1, 2] as the fencing of broker 
3 is handled. So we did not account for the fact that we had already fenced 
broker 2 in the request. 

A simple solution for now is to change the logic to handle fencing only one 
broker at a time. 


> KRaft controller does not handle simultaneous broker expirations correctly
> --
>
> Key: KAFKA-13173
> URL: https://issues.apache.org/jira/browse/KAFKA-13173
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Niket Goel
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
> stale replicas and attempt to remove them from the ISR. However, when 
> multiple expirations occur at once, we do not properly accumulate the ISR 
> changes. For example, I ran a test where the ISR of a partition was 
> initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at 
> the same time. The records that were generated by `fenceStaleBrokers` were 
> the following:
> {code}
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
> {code}
> First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the 
> record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing 
> of broker 3 is handled. So we did not account for the fact that we had 
> already fenced broker 2 in the request. 
> A simple solution for now is to change the logic to handle fencing only one 
> broker at a time. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei merged pull request #11153: MINOR: Port fix to other StoreQueryIntegrationTests

2021-08-05 Thread GitBox


vvcephei merged pull request #11153:
URL: https://github.com/apache/kafka/pull/11153


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13172) Document in Streams 3.0 that due to rocksDB footer version in-filght downgrade is not supported

2021-08-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394313#comment-17394313
 ] 

Guozhang Wang commented on KAFKA-13172:
---

Yes, that's correct.

> Document in Streams 3.0 that due to rocksDB footer version in-filght 
> downgrade is not supported
> ---
>
> Key: KAFKA-13172
> URL: https://issues.apache.org/jira/browse/KAFKA-13172
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly

2021-08-05 Thread Niket Goel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niket Goel reassigned KAFKA-13173:
--

Assignee: Niket Goel

> KRaft controller does not handle simultaneous broker expirations correctly
> --
>
> Key: KAFKA-13173
> URL: https://issues.apache.org/jira/browse/KAFKA-13173
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Niket Goel
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
> stale replicas and attempt to remove them from the ISR. However, when 
> multiple expirations occur at once, we do not properly accumulate the ISR 
> changes. For example, I ran a test where the ISR of a partition was 
> initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at 
> the same time. The records that were generated by `fenceStaleBrokers` were 
> the following:
> {code}
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
> {code}
> First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the 
> record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing 
> of broker 3 is handled. So we did not account for the fact that we had 
> already fenced broker 2 in the request. 
> A simple solution for now is to change the logic to handle fencing only one 
> broker at a time. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly

2021-08-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13173:

Fix Version/s: 3.0.0

> KRaft controller does not handle simultaneous broker expirations correctly
> --
>
> Key: KAFKA-13173
> URL: https://issues.apache.org/jira/browse/KAFKA-13173
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
> stale replicas and attempt to remove them from the ISR. However, when 
> multiple expirations occur at once, we do not properly accumulate the ISR 
> changes. For example, I ran a test where the ISR of a partition was 
> initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at 
> the same time. The records that were generated by `fenceStaleBrokers` were 
> the following:
> {code}
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
> {code}
> First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the 
> record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing 
> of broker 3 is handled. So we did not account for the fact that we had 
> already fenced broker 2 in the request. 
> A simple solution for now is to change the logic to handle fencing only one 
> broker at a time. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly

2021-08-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13173:

Priority: Blocker  (was: Major)

> KRaft controller does not handle simultaneous broker expirations correctly
> --
>
> Key: KAFKA-13173
> URL: https://issues.apache.org/jira/browse/KAFKA-13173
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
>
> In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
> stale replicas and attempt to remove them from the ISR. However, when 
> multiple expirations occur at once, we do not properly accumulate the ISR 
> changes. For example, I ran a test where the ISR of a partition was 
> initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at 
> the same time. The records that were generated by `fenceStaleBrokers` were 
> the following:
> {code}
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
> {code}
> First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the 
> record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing 
> of broker 3 is handled. So we did not account for the fact that we had 
> already fenced broker 2 in the request. 
> A simple solution for now is to change the logic to handle fencing only one 
> broker at a time. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang merged pull request #11151: MINOR: Should commit a task if the consumer position advanced as well

2021-08-05 Thread GitBox


guozhangwang merged pull request #11151:
URL: https://github.com/apache/kafka/pull/11151


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly

2021-08-05 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13173:
---

 Summary: KRaft controller does not handle simultaneous broker 
expirations correctly
 Key: KAFKA-13173
 URL: https://issues.apache.org/jira/browse/KAFKA-13173
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
stale replicas and attempt to remove them from the ISR. However, when multiple 
expirations occur at once, we do not properly accumulate the ISR changes. For 
example, I ran a test where the ISR of a partition was initialized to [1, 2, 
3]. Then I triggered a timeout of replicas 2 and 3 at the same time. The 
records that were generated by `fenceStaleBrokers` were the following:

{code}
ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
removingReplicas=null, addingReplicas=null) at version 0), 
ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
removingReplicas=null, addingReplicas=null) at version 0), 
ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
{code}

First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the record 
to fence broker 2. Then the ISR is modified to [1, 2] as the fencing of broker 
3 is handled. So we did not account for the fact that we had already fenced 
broker 2 in the request. 

A simple solution for now is to change the logic to handle fencing only one 
broker at a time. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman merged pull request #11156: KAFKA-13046: add test coverage for AbstractStickyAssignorTest

2021-08-05 Thread GitBox


ableegoldman merged pull request #11156:
URL: https://github.com/apache/kafka/pull/11156


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11156: KAFKA-13046: add test coverage for AbstractStickyAssignorTest

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #11156:
URL: https://github.com/apache/kafka/pull/11156#discussion_r683763243



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -142,12 +142,11 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 for (final TopicPartition tp : memberData.partitions) {
 // filter out any topics that no longer exist or aren't 
part of the current subscription
 if (allTopics.contains(tp.topic())) {
-
-if (!allPreviousPartitionsToOwner.containsKey(tp)) {
-allPreviousPartitionsToOwner.put(tp, consumer);
+String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
+if (otherConsumer == null) {
+// this partition is not owned by other consumer 
in the same generation
 ownedPartitions.add(tp);
 } else {
-String otherConsumer = 
allPreviousPartitionsToOwner.get(tp);
 log.error("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "
 + "same generation {}, this will be 
invalidated and removed from their previous assignment.",
  consumer, otherConsumer, tp, 
maxGeneration);

Review comment:
   New logic makes sense to me 👍 -- I agree it doesn't really matter if we 
overwrite the previous consumer in the map since the thing we really care about 
is whether or not there was one




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13172) Document in Streams 3.0 that due to rocksDB footer version in-filght downgrade is not supported

2021-08-05 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394279#comment-17394279
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13172:


Just to clarify, a rolling downgrade is still possible, you would just need to 
clear the local state when bouncing each node –  is that correct?

> Document in Streams 3.0 that due to rocksDB footer version in-filght 
> downgrade is not supported
> ---
>
> Key: KAFKA-13172
> URL: https://issues.apache.org/jira/browse/KAFKA-13172
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jzaralim opened a new pull request #11183: MINOR: Increase the Kafka shutdown timeout to 120

2021-08-05 Thread GitBox


jzaralim opened a new pull request #11183:
URL: https://github.com/apache/kafka/pull/11183


   The streams static membership test has failed several times due to hitting 
the Kafka shutdown timeout, but the logs were showing that the shutdown did 
actually succeed after the 60 second timeout.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13172) Document in Streams 3.0 that due to rocksDB footer version in-filght downgrade is not supported

2021-08-05 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13172:
-

 Summary: Document in Streams 3.0 that due to rocksDB footer 
version in-filght downgrade is not supported
 Key: KAFKA-13172
 URL: https://issues.apache.org/jira/browse/KAFKA-13172
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang
Assignee: Guozhang Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #11172: MINOR: update stream-stream join docs

2021-08-05 Thread GitBox


mjsax commented on pull request #11172:
URL: https://github.com/apache/kafka/pull/11172#issuecomment-893689864


   Merged to `trunk` and cherry-picked to `3.0` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #11172: MINOR: update stream-stream join docs

2021-08-05 Thread GitBox


mjsax merged pull request #11172:
URL: https://github.com/apache/kafka/pull/11172


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

2021-08-05 Thread GitBox


dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683690243



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1949,6 +1951,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   )
 }
 
+val voterIds: Set[Integer] = if (usesSelfManagedQuorum) 
RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else 
Set.empty
+if (voterIds.nonEmpty) {
+  if (processRoles.contains(ControllerRole)) {
+// Ensure that controllers use their node.id as a voter in 
controller.quorum.voters 
+require(voterIds.contains(nodeId), s"If 
${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id 
$nodeId must be included in the set of voters 
${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+  } else {
+// Ensure that the broker's node.id is not an id in 
controller.quorum.voters
+require(!voterIds.contains(nodeId), s"If 
${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the 
node id $nodeId must not be included in the set of voters 
${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+  }
+}
+

Review comment:
   Good catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

2021-08-05 Thread GitBox


hachikuji commented on a change in pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#discussion_r683680618



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -553,6 +553,17 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on 
existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+// defensively check that any newly assign topic ID matches any that is 
already set
+_topicId.foreach { current =>
+  if (!current.equals(topicId)) {
+// we should never get here as the topic IDs should have been checked 
in becomeLeaderOrFollower

Review comment:
   nit: I think it's ok to leave this out. The point of adding the check is 
to reduce the coupling with ReplicaManager

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -553,6 +553,17 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on 
existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+// defensively check that any newly assign topic ID matches any that is 
already set
+_topicId.foreach { current =>
+  if (!current.equals(topicId)) {
+// we should never get here as the topic IDs should have been checked 
in becomeLeaderOrFollower
+throw new InconsistentTopicIdException(s"Tried to assign topic ID 
$topicId to log for topic partition $topicPartition," +
+  s"but log already contained topic ID $current")
+  }
+  // Topic ID already assigned so we can return
+  return

Review comment:
   I was thinking how we could avoid this return. How about something like 
this:
   ```scala
   _topicId match {
 case Some(currentId) =>
   if (!currentId.equals(topicId)) {
 throw new InconsistentTopicIdException(s"Tried to assign topic ID 
$topicId to log for topic partition $topicPartition," +
   s"but log already contained topic ID $current")
   }
   
 case None =>
   if (keepPartitionMetadataFile) {
 _topicId = Some(topicId)
 if (!partitionMetadataFile.exists()) {
   partitionMetadataFile.record(topicId)
   scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
 }
   }
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11154: KAFKA-13068: Rename Log to UnifiedLog

2021-08-05 Thread GitBox


junrao commented on a change in pull request #11154:
URL: https://github.com/apache/kafka/pull/11154#discussion_r683680502



##
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##
@@ -1738,12 +1747,12 @@ object Log extends Logging {
 logDirFailureChannel: LogDirFailureChannel,
 lastShutdownClean: Boolean = true,
 topicId: Option[Uuid],
-keepPartitionMetadataFile: Boolean): Log = {
+keepPartitionMetadataFile: Boolean): UnifiedLog = {
 // create the log directory if it doesn't exist
 Files.createDirectories(dir.toPath)
-val topicPartition = Log.parseTopicPartitionName(dir)
+val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
 val segments = new LogSegments(topicPartition)
-val leaderEpochCache = Log.maybeCreateLeaderEpochCache(
+val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(

Review comment:
   Should we change the logging prefix to UnifiedLog in line 1760 too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12616) Convert integration tests to use ClusterTest

2021-08-05 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-12616:
---
Fix Version/s: 3.1.0

> Convert integration tests to use ClusterTest 
> -
>
> Key: KAFKA-12616
> URL: https://issues.apache.org/jira/browse/KAFKA-12616
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: kip-500
> Fix For: 3.1.0
>
>
> We would like to convert integration tests to use the new ClusterTest 
> annotations so that we can easily test both the Zk and KRaft implementations. 
> This will require adding a bunch of support to the ClusterTest framework as 
> we go along.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

2021-08-05 Thread GitBox


mumrah commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r68365



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1949,6 +1951,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   )
 }
 
+val voterIds: Set[Integer] = if (usesSelfManagedQuorum) 
RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else 
Set.empty
+if (voterIds.nonEmpty) {
+  if (processRoles.contains(ControllerRole)) {
+// Ensure that controllers use their node.id as a voter in 
controller.quorum.voters 
+require(voterIds.contains(nodeId), s"If 
${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id 
$nodeId must be included in the set of voters 
${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+  } else {
+// Ensure that the broker's node.id is not an id in 
controller.quorum.voters
+require(!voterIds.contains(nodeId), s"If 
${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the 
node id $nodeId must not be included in the set of voters 
${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+  }
+}
+

Review comment:
   Right, but if the voters list _is_ empty, and we are in self-managed 
mode, I believe we should throw an error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13168) KRaft observers should not have a replica id

2021-08-05 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-13168:
-
Fix Version/s: 3.1.0

> KRaft observers should not have a replica id
> 
>
> Key: KAFKA-13168
> URL: https://issues.apache.org/jira/browse/KAFKA-13168
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Assignee: Ryan Dielhenn
>Priority: Blocker
>  Labels: kip-500
> Fix For: 3.0.0, 3.1.0
>
>
> To avoid miss configuration of a broker affecting the quorum of the cluster 
> metadata partition when a Kafka node is configure as broker only the replica 
> id for the KRaft client should be set to {{Optional::empty()}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13168) KRaft observers should not have a replica id

2021-08-05 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-13168.
--
Resolution: Fixed

> KRaft observers should not have a replica id
> 
>
> Key: KAFKA-13168
> URL: https://issues.apache.org/jira/browse/KAFKA-13168
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Assignee: Ryan Dielhenn
>Priority: Blocker
>  Labels: kip-500
> Fix For: 3.0.0, 3.1.0
>
>
> To avoid miss configuration of a broker affecting the quorum of the cluster 
> metadata partition when a Kafka node is configure as broker only the replica 
> id for the KRaft client should be set to {{Optional::empty()}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13167) KRaft broker should heartbeat immediately during controlled shutdown

2021-08-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13167.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

> KRaft broker should heartbeat immediately during controlled shutdown
> 
>
> Key: KAFKA-13167
> URL: https://issues.apache.org/jira/browse/KAFKA-13167
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0
>
>
> Controlled shutdown in KRaft is signaled through a heartbeat request with the 
> `shouldShutDown` flag set to true. When we begin controlled shutdown, we 
> should immediately schedule the next heartbeat instead of waiting for the 
> next periodic heartbeat so that we can shutdown more quickly. Otherwise 
> controlled shutdown can be delayed by several seconds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

2021-08-05 Thread GitBox


dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683626287



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1949,6 +1951,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   )
 }
 
+val voterIds: Set[Integer] = if (usesSelfManagedQuorum) 
RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else 
Set.empty
+if (voterIds.nonEmpty) {
+  if (processRoles.contains(ControllerRole)) {
+// Ensure that controllers use their node.id as a voter in 
controller.quorum.voters 
+require(voterIds.contains(nodeId), s"If 
${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id 
$nodeId must be included in the set of voters 
${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+  } else {
+// Ensure that the broker's node.id is not an id in 
controller.quorum.voters
+require(!voterIds.contains(nodeId), s"If 
${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the 
node id $nodeId must not be included in the set of voters 
${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+  }
+}
+

Review comment:
   This code already checks for that on line 1955.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #11154: KAFKA-13068: Rename Log to UnifiedLog

2021-08-05 Thread GitBox


kowshik commented on a change in pull request #11154:
URL: https://github.com/apache/kafka/pull/11154#discussion_r683616272



##
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##
@@ -248,16 +250,16 @@ case object SnapshotGenerated extends 
LogStartOffsetIncrementReason {
  *  will be deleted to avoid ID conflicts upon 
re-upgrade.
  */
 @threadsafe
-class Log(@volatile var logStartOffset: Long,
-  private val localLog: LocalLog,
-  brokerTopicStats: BrokerTopicStats,
-  val producerIdExpirationCheckIntervalMs: Int,
-  @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
-  val producerStateManager: ProducerStateManager,
-  @volatile private var _topicId: Option[Uuid],
-  val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
+class UnifiedLog(@volatile var logStartOffset: Long,
+ private val localLog: LocalLog,
+ brokerTopicStats: BrokerTopicStats,
+ val producerIdExpirationCheckIntervalMs: Int,
+ @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
+ val producerStateManager: ProducerStateManager,
+ @volatile private var _topicId: Option[Uuid],
+ val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
 
-  import kafka.log.Log._
+  import kafka.log.UnifiedLog._

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 edited a comment on pull request #11153: MINOR: Port fix to other StoreQueryIntegrationTests

2021-08-05 Thread GitBox


wcarlson5 edited a comment on pull request #11153:
URL: https://github.com/apache/kafka/pull/11153#issuecomment-893137087


   @vvcephei looks like it is working now
   
   EDIT: nvm odd failure, I don't think that should be a failure. Trying to 
reproduce locally
   
   `java.lang.AssertionError: Unexpected exception thrown while getting the 
value from store.
   Expected: is one of {, 
}
but: was "Cannot get state store source-table because the stream thread 
is PARTITIONS_ASSIGNED, not RUNNING"
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 edited a comment on pull request #11153: MINOR: Port fix to other StoreQueryIntegrationTests

2021-08-05 Thread GitBox


wcarlson5 edited a comment on pull request #11153:
URL: https://github.com/apache/kafka/pull/11153#issuecomment-893137087


   @vvcephei looks like it is working now
   
   EDIT: nvm odd failure
   
   `java.lang.AssertionError: Unexpected exception thrown while getting the 
value from store.
   Expected: is one of {, 
}
but: was "Cannot get state store source-table because the stream thread 
is PARTITIONS_ASSIGNED, not RUNNING"
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 edited a comment on pull request #11153: MINOR: Port fix to other StoreQueryIntegrationTests

2021-08-05 Thread GitBox


wcarlson5 edited a comment on pull request #11153:
URL: https://github.com/apache/kafka/pull/11153#issuecomment-893137087


   @vvcephei looks like it is working now
   
   EDIT: nvm odd failure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-08-05 Thread GitBox


ccding commented on a change in pull request #0:
URL: https://github.com/apache/kafka/pull/0#discussion_r683573626



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##
@@ -253,9 +253,9 @@ public RemoteLogManagerConfig(AbstractConfig config) {
  config.getInt(REMOTE_LOG_READER_THREADS_PROP),
  config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP),
  config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP),
- 
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)),
+ 
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)
 == null ? "" : config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)),

Review comment:
   We will get a null pointer exception when we call `new KafkaConfig()` 
without setting `REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP`. E.g., 
https://github.com/apache/kafka/blob/d6f6edd2b17cbaccd4c87bfa66f871a676760044/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala#L1057
   
   This happens everywhere in the codebase after this change: 
   
https://github.com/apache/kafka/pull/0/files#diff-cbe6a8b71b05ed22cf09d97591225b588e9fca6caaf95d3b34a43262cfd23aa6R1437-R1438
   
   Since `REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP` is not a required 
configuration if we don't enable KIP-405, I think we should pass an empty 
string to RemoteLogManagerConfig if it is null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-08-05 Thread GitBox


ccding commented on a change in pull request #0:
URL: https://github.com/apache/kafka/pull/0#discussion_r683573626



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##
@@ -253,9 +253,9 @@ public RemoteLogManagerConfig(AbstractConfig config) {
  config.getInt(REMOTE_LOG_READER_THREADS_PROP),
  config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP),
  config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP),
- 
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)),
+ 
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)
 == null ? "" : config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)),

Review comment:
   We will get a null pointer exception when we call `new KafkaConfig()` 
without setting `REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP`. E.g., 
https://github.com/apache/kafka/blob/d6f6edd2b17cbaccd4c87bfa66f871a676760044/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala#L1057
   
   This happens everywhere in the codebase after this change: 
   
https://github.com/apache/kafka/pull/0/files#diff-cbe6a8b71b05ed22cf09d97591225b588e9fca6caaf95d3b34a43262cfd23aa6R1437-R1438




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-08-05 Thread GitBox


ccding commented on a change in pull request #0:
URL: https://github.com/apache/kafka/pull/0#discussion_r683573626



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##
@@ -253,9 +253,9 @@ public RemoteLogManagerConfig(AbstractConfig config) {
  config.getInt(REMOTE_LOG_READER_THREADS_PROP),
  config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP),
  config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP),
- 
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)),
+ 
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)
 == null ? "" : config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)),

Review comment:
   We will get a null pointer exception when we call `new KafkaConfig()` 
without setting `REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP`. E.g., 
https://github.com/apache/kafka/blob/d6f6edd2b17cbaccd4c87bfa66f871a676760044/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala#L1057
   
   This happens everywhere in the codebase.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-08-05 Thread GitBox


ccding commented on a change in pull request #0:
URL: https://github.com/apache/kafka/pull/0#discussion_r683573626



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##
@@ -253,9 +253,9 @@ public RemoteLogManagerConfig(AbstractConfig config) {
  config.getInt(REMOTE_LOG_READER_THREADS_PROP),
  config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP),
  config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP),
- 
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)),
+ 
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)
 == null ? "" : config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)),

Review comment:
   We will get a null pointer exception everywhere when we call `new 
KafkaConfig()` without setting `REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP`. 
E.g., 
https://github.com/apache/kafka/blob/d6f6edd2b17cbaccd4c87bfa66f871a676760044/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala#L1057
   
   This happens everywhere in the codebase.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zhaohaidao opened a new pull request #11182: KAFKA-13074: Implement mayClean for MockLog

2021-08-05 Thread GitBox


zhaohaidao opened a new pull request #11182:
URL: https://github.com/apache/kafka/pull/11182


   The current implement of MockLog doesn't implement maybeClean. It is 
expected that MockLog has the same semantic as KafkaMetadataLog. This is 
assumed to be true for a few of the tests suite like the raft simulation and 
the kafka raft client test context.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #11178: KAFKA-13168: KRaft observers should not have a replica id

2021-08-05 Thread GitBox


mumrah commented on pull request #11178:
URL: https://github.com/apache/kafka/pull/11178#issuecomment-893549648


   cherry-picked to 3.0 as 765d2006a32d7a


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah merged pull request #11178: KAFKA-13168: KRaft observers should not have a replica id

2021-08-05 Thread GitBox


mumrah merged pull request #11178:
URL: https://github.com/apache/kafka/pull/11178


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

2021-08-05 Thread GitBox


mumrah commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683552057



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1949,6 +1951,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   )
 }
 
+val voterIds: Set[Integer] = if (usesSelfManagedQuorum) 
RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else 
Set.empty
+if (voterIds.nonEmpty) {
+  if (processRoles.contains(ControllerRole)) {
+// Ensure that controllers use their node.id as a voter in 
controller.quorum.voters 
+require(voterIds.contains(nodeId), s"If 
${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id 
$nodeId must be included in the set of voters 
${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+  } else {
+// Ensure that the broker's node.id is not an id in 
controller.quorum.voters
+require(!voterIds.contains(nodeId), s"If 
${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the 
node id $nodeId must not be included in the set of voters 
${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+  }
+}
+

Review comment:
   I think we need an additional check that the voter list is not empty if 
`process.roles` is set. Otherwise we will skip the validation that `node.id` is 
included in the voter list when we are a controller.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts.

2021-08-05 Thread GitBox


satishd commented on pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#issuecomment-893543468


   @junrao: Please review this PR once 
https://github.com/apache/kafka/pull/11060 is reviewed and merged. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore

2021-08-05 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson closed KAFKA-12935.
--

> Flaky Test 
> RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
> 
>
> Key: KAFKA-12935
> URL: https://issues.apache.org/jira/browse/KAFKA-12935
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Walker Carlson
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.1.0
>
>
> {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374)
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13171) KIP-500 Setup and named docker volumes

2021-08-05 Thread Claudio Carcaci (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claudio Carcaci updated KAFKA-13171:

Priority: Trivial  (was: Major)

> KIP-500 Setup and named docker volumes
> --
>
> Key: KAFKA-13171
> URL: https://issues.apache.org/jira/browse/KAFKA-13171
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.8.0
>Reporter: Claudio Carcaci
>Priority: Trivial
>
> Following the KIP-500 instructions to enable the Kraft mode 
> ([https://github.com/apache/kafka/blob/trunk/config/kraft/README.md)] the 
> command:
> {code:java}
> $ ./bin/kafka-storage.sh format -t  -c ./config/kraft/server.properties
> {code}
> will create a "meta.properties" file in the logs folder (i.e. 
> /tmp/kraft-combined-logs).
> If I build a Docker image with Kraft mode enabled and I mount a named volume 
> on the /tmp/kraft-combined-logs the content of the folder will be overwritten 
> (emptied) by the Docker named volume content. So I will lose the 
> meta.properties file.
> A possible solution would be to create the meta.properties file in a 
> separated folder and store all the created logs elsewhere so that by mounting 
> a named volume the logs folder can start empty.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13171) KIP-500 Setup and named docker volumes

2021-08-05 Thread Claudio Carcaci (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claudio Carcaci updated KAFKA-13171:

Priority: Major  (was: Trivial)

> KIP-500 Setup and named docker volumes
> --
>
> Key: KAFKA-13171
> URL: https://issues.apache.org/jira/browse/KAFKA-13171
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.8.0
>Reporter: Claudio Carcaci
>Priority: Major
>
> Following the KIP-500 instructions to enable the Kraft mode 
> ([https://github.com/apache/kafka/blob/trunk/config/kraft/README.md)] the 
> command:
> {code:java}
> $ ./bin/kafka-storage.sh format -t  -c ./config/kraft/server.properties
> {code}
> will create a "meta.properties" file in the logs folder (i.e. 
> /tmp/kraft-combined-logs).
> If I build a Docker image with Kraft mode enabled and I mount a named volume 
> on the /tmp/kraft-combined-logs the content of the folder will be overwritten 
> (emptied) by the Docker named volume content. So I will lose the 
> meta.properties file.
> A possible solution would be to create the meta.properties file in a 
> separated folder and store all the created logs elsewhere so that by mounting 
> a named volume the logs folder can start empty.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13171) KIP-500 Setup and named docker volumes

2021-08-05 Thread Claudio Carcaci (Jira)
Claudio Carcaci created KAFKA-13171:
---

 Summary: KIP-500 Setup and named docker volumes
 Key: KAFKA-13171
 URL: https://issues.apache.org/jira/browse/KAFKA-13171
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.8.0
Reporter: Claudio Carcaci


Following the KIP-500 instructions to enable the Kraft mode 
([https://github.com/apache/kafka/blob/trunk/config/kraft/README.md)] the 
command:
{code:java}
$ ./bin/kafka-storage.sh format -t  -c ./config/kraft/server.properties
{code}
will create a "meta.properties" file in the logs folder (i.e. 
/tmp/kraft-combined-logs).

If I build a Docker image with Kraft mode enabled and I mount a named volume on 
the /tmp/kraft-combined-logs the content of the folder will be overwritten 
(emptied) by the Docker named volume content. So I will lose the 
meta.properties file.

A possible solution would be to create the meta.properties file in a separated 
folder and store all the created logs elsewhere so that by mounting a named 
volume the logs folder can start empty.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely

2021-08-05 Thread GitBox


mimaison commented on pull request #11174:
URL: https://github.com/apache/kafka/pull/11174#issuecomment-893504255


   Thanks for the PR, it looks good. Is it possible to add a test for it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #10807: KAFKA-12797: Log the evictor of fetch sessions

2021-08-05 Thread GitBox


tombentley commented on a change in pull request #10807:
URL: https://github.com/apache/kafka/pull/10807#discussion_r683481734



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -692,6 +692,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   fetchRequest.version,
   fetchRequest.metadata,
   fetchRequest.isFromFollower,
+  s"clientId=${request.context.clientId}, 
principal=${request.context.principal}",

Review comment:
   > It's not ideal that we have to build the string even if we don't use 
it.
   
   I couldn't see a nice way to do that. Basing it off the level of the 
`FetchSessionCache` logger seemed to break encapsulation, since at the call 
site we don't know how this string is going to be used.
   
   > 
   > In practice, this extra logging is useful if there's a malicious user 
forcing sessions to roll or if a user is using a broken client (like Sarama 
1.26.0). So I wonder if we really need the clientId. While it's nice to have, 
it's a user-controlled field so this could be problematic for large values. 
WDYT?
   
   It's a good point that the client controls the `clientId`, and could 
potentially choose a long one. The value of logging it here isn't that high, so 
I can remove it. If you're making a point about a clientId being maliciously 
chosen then that's a bigger problem.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #10277: KAFKA-9914: Fix replication cycle detection

2021-08-05 Thread GitBox


mimaison commented on a change in pull request #10277:
URL: https://github.com/apache/kafka/pull/10277#discussion_r683463480



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
##
@@ -489,7 +489,17 @@ boolean isCycle(String topic) {
 String source = replicationPolicy.topicSource(topic);
 if (source == null) {
 return false;
-} else if (source.equals(sourceAndTarget.target())) {
+}
+
+// Fix for https://issues.apache.org/jira/browse/KAFKA-9914
+final boolean condition;
+if (replicationPolicy instanceof IdentityReplicationPolicy) {

Review comment:
   This seems a bit brittle. Users could implement their own 
`ReplicationPolicy` that behaves like `IdentityReplicationPolicy` and this 
would not catch it. Can we detect what to do by making calls on the 
`replicationPolicy` instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-5966) Support ByteBuffer serialization in Kafka Streams

2021-08-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-5966:


Assignee: Luke Chen

> Support ByteBuffer serialization in Kafka Streams
> -
>
> Key: KAFKA-5966
> URL: https://issues.apache.org/jira/browse/KAFKA-5966
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Assignee: Luke Chen
>Priority: Major
>
> Currently Kafka Streams only supports serialization using byte arrays. This 
> means we generate a lot of garbage and spend unnecessary time copying bytes, 
> especially when working with windowed state stores that rely on composite 
> keys. In many places in the code we have extract parts of the composite key 
> to deserialize the either the timestamp or the message key from the state 
> store key (e.g. the methods in WindowStoreUtils)
> Having support for serde into/from ByteBuffers would allow us to reuse the 
> underlying bytearrays and just pass around slices of the underlying Buffers 
> to avoid the unnecessary copying.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-08-05 Thread GitBox


ijuma commented on pull request #0:
URL: https://github.com/apache/kafka/pull/0#issuecomment-893407158


   Thanks for explaining the motivation @ccding. In my opinion, this is a bit 
confusing. What makes remote log configs special when compared to local log 
configs? The same arguments regarding reuse, etc. could be said for those, 
right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #10807: KAFKA-12797: Log the evictor of fetch sessions

2021-08-05 Thread GitBox


mimaison commented on a change in pull request #10807:
URL: https://github.com/apache/kafka/pull/10807#discussion_r683367337



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -692,6 +692,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   fetchRequest.version,
   fetchRequest.metadata,
   fetchRequest.isFromFollower,
+  s"clientId=${request.context.clientId}, 
principal=${request.context.principal}",

Review comment:
   It's not ideal that we have to build the string even if we don't use it.
   
   In practice, this extra logging is useful if there's a malicious user 
forcing sessions to roll or if a user is using a broken client (like Sarama 
1.26.0). So I wonder if we really need the clientId. While it's nice to have, 
it's a user-controlled field so this could be problematic for large values. 
WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-08-05 Thread GitBox


satishd commented on a change in pull request #0:
URL: https://github.com/apache/kafka/pull/0#discussion_r683370891



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##
@@ -253,9 +253,9 @@ public RemoteLogManagerConfig(AbstractConfig config) {
  config.getInt(REMOTE_LOG_READER_THREADS_PROP),
  config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP),
  config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP),
- 
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)),
+ 
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)
 == null ? "" : config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)),

Review comment:
   What is the intent for this change here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11154: KAFKA-13068: Rename Log to UnifiedLog

2021-08-05 Thread GitBox


junrao commented on a change in pull request #11154:
URL: https://github.com/apache/kafka/pull/11154#discussion_r682801773



##
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##
@@ -248,16 +250,16 @@ case object SnapshotGenerated extends 
LogStartOffsetIncrementReason {
  *  will be deleted to avoid ID conflicts upon 
re-upgrade.
  */
 @threadsafe
-class Log(@volatile var logStartOffset: Long,
-  private val localLog: LocalLog,
-  brokerTopicStats: BrokerTopicStats,
-  val producerIdExpirationCheckIntervalMs: Int,
-  @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
-  val producerStateManager: ProducerStateManager,
-  @volatile private var _topicId: Option[Uuid],
-  val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
+class UnifiedLog(@volatile var logStartOffset: Long,
+ private val localLog: LocalLog,
+ brokerTopicStats: BrokerTopicStats,
+ val producerIdExpirationCheckIntervalMs: Int,
+ @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
+ val producerStateManager: ProducerStateManager,
+ @volatile private var _topicId: Option[Uuid],
+ val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
 
-  import kafka.log.Log._
+  import kafka.log.UnifiedLog._

Review comment:
   Should we rename the logging prefix to UnifiedLog too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] joel-hamill commented on pull request #11163: MINOR: doc change for minisr to clarify replicas in Kafka Config

2021-08-05 Thread GitBox


joel-hamill commented on pull request #11163:
URL: https://github.com/apache/kafka/pull/11163#issuecomment-892895687


   LGTM, had one comment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


ableegoldman commented on pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#issuecomment-893124886






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah merged pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft

2021-08-05 Thread GitBox


mumrah merged pull request #11166:
URL: https://github.com/apache/kafka/pull/11166


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] axrj commented on pull request #8906: KAFKA-10190: To set replication throttling configs at broker entity-default

2021-08-05 Thread GitBox


axrj commented on pull request #8906:
URL: https://github.com/apache/kafka/pull/8906#issuecomment-892663172


   Hey,
   
   Is there any plan to merge this? It would be great to have this patch 
backported too if possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11130: KAFKA-13138: FileConfigProvider#get should keep failure exception

2021-08-05 Thread GitBox


cmccabe merged pull request #11130:
URL: https://github.com/apache/kafka/pull/11130


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11168: KAFKA-13160: Fix BrokerMetadataPublisher to pass the correct resource name to the config handler when processing config updates

2021-08-05 Thread GitBox


cmccabe merged pull request #11168:
URL: https://github.com/apache/kafka/pull/11168


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11150: MINOR: Fix missing word in LogLoader logged warning

2021-08-05 Thread GitBox


ijuma merged pull request #11150:
URL: https://github.com/apache/kafka/pull/11150


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-05 Thread GitBox


vvcephei commented on a change in pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#discussion_r682637061



##
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##
@@ -25,15 +25,17 @@
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.streams.utils import extract_generation_from_logs, 
extract_generation_id
 from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
-LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, DEV_BRANCH, DEV_VERSION, KafkaVersion
+LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, LATEST_2_8, DEV_BRANCH, DEV_VERSION, KafkaVersion
 
 # broker 0.10.0 is not compatible with newer Kafka Streams versions
 broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), \
str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), 
str(LATEST_2_3), \
-   str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(DEV_BRANCH)]
+   str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(LATEST_2_8), str(DEV_BRANCH)]
 
 metadata_1_versions = [str(LATEST_0_10_0)]
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
+metadata_3_10_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), 
str(LATEST_2_3), str(LATEST_2_4),

Review comment:
   It's probably a typo, but it didn't matter, since this was the wrong 
test to add new versions to, anyway. I backed out these changes and added the 
missing versions to streams_application_upgrade_test.py.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #11177: KAFKA-13167; KRaft broker should send heartbeat immediately after starting controlled shutdown

2021-08-05 Thread GitBox


hachikuji merged pull request #11177:
URL: https://github.com/apache/kafka/pull/11177


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11175: MINOR: Fix getting started documentation

2021-08-05 Thread GitBox


rondagostino commented on a change in pull request #11175:
URL: https://github.com/apache/kafka/pull/11175#discussion_r682834005



##
File path: config/kraft/README.md
##
@@ -14,8 +14,9 @@ Most important of all, KRaft mode is more scalable.  We 
expect to be able to [su
 # Quickstart
 
 ## Warning
-KRaft mode in Kafka 3.0 is provided for testing only, *NOT* for production.  
We do not yet support upgrading existing ZooKeeper-based Kafka clusters into 
this mode.  In fact, when Kafka 3.1 is released,
-it may not be possible to upgrade your KRaft clusters from 3.0 to 3.1.  There 
may be bugs, including serious ones.  You should *assume that your data could 
be lost at any time* if you try the preview release of KRaft mode.
+KRaft mode in Kafka 3.0 is provided for testing only, *NOT* for production.  
We do not yet support upgrading existing ZooKeeper-based Kafka clusters into 
KRaft mode.
+It is not possible to upgrade KRaft clusters from 2.8 to 3.0. Upgrading KRaft 
clusters from 3.0 to 3.1 will be supported. There may be bugs, including 
serious ones.

Review comment:
   > Upgrading KRaft clusters from 3.0 to 3.1 will be supported.
   
   Is that guaranteed to be true?  Would it be better to say something like 
"Upgrading KRaft clusters from 3.0 to 3.1 is likely to be supported, but this 
is not guaranteed."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11168: KAFKA-13160: Fix BrokerMetadataPublisher to pass the correct resource name to the config handler when processing config updates

2021-08-05 Thread GitBox


cmccabe commented on a change in pull request #11168:
URL: https://github.com/apache/kafka/pull/11168#discussion_r682803659



##
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##
@@ -203,7 +203,15 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
   }
   tag.foreach { t =>
 val newProperties = 
newImage.configs().configProperties(configResource)
-
dynamicConfigHandlers(t).processConfigChanges(configResource.name(), 
newProperties)
+val maybeDefaultName = if (conf.usesSelfManagedQuorum) {

Review comment:
   BrokerMetadataPublisher is only used when in KRaft mode, so this check 
is not necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

2021-08-05 Thread GitBox


hachikuji commented on a change in pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#discussion_r683019148



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on 
existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+// defensively check that any newly assign topic ID matches any that is 
already set
+_topicId.foreach { current =>
+  if (!current.equals(topicId))

Review comment:
   Can we shortcut return if the current topicId is already defined and 
matches the provided topicId?

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on 
existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+// defensively check that any newly assign topic ID matches any that is 
already set
+_topicId.foreach { current =>
+  if (!current.equals(topicId))
+  // we should never get here as the topic IDs should have been checked in 
becomeLeaderOrFollower

Review comment:
   nit: fix alignment (just use braces 😉 . I won't tell anyone)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-08-05 Thread GitBox


kowshik commented on a change in pull request #0:
URL: https://github.com/apache/kafka/pull/0#discussion_r683184053



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##
@@ -253,9 +253,9 @@ public RemoteLogManagerConfig(AbstractConfig config) {
  config.getInt(REMOTE_LOG_READER_THREADS_PROP),
  config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP),
  config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP),
- 
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)),
+ 
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)
 == null ? "" : config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)),

Review comment:
   Hmm, why is this change needed? It doesn't seem like the PR is altering 
behavior such as these but maybe I'm missing something.

##
File path: core/src/main/scala/kafka/log/LogConfig.scala
##
@@ -107,46 +107,52 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
   val LeaderReplicationThrottledReplicas = 
getList(LogConfig.LeaderReplicationThrottledReplicasProp)
   val FollowerReplicationThrottledReplicas = 
getList(LogConfig.FollowerReplicationThrottledReplicasProp)
   val messageDownConversionEnable = 
getBoolean(LogConfig.MessageDownConversionEnableProp)
-  val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp)
 
-  val localRetentionMs: Long = {
-val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp)
+  class RemoteLogConfig {
+val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp)
 
-// -2 indicates to derive value from retentionMs property.
-if(localLogRetentionMs == -2) retentionMs
-else {
-  // Added validation here to check the effective value should not be more 
than RetentionMs.
-  if(localLogRetentionMs == -1 && retentionMs != -1) {
-throw new ConfigException(LogConfig.LocalLogRetentionMsProp, 
localLogRetentionMs, s"Value must not be -1 as ${LogConfig.RetentionMsProp} 
value is set as $retentionMs.")
-  }
+val localRetentionMs: Long = {
+  val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp)
 
-  if (localLogRetentionMs > retentionMs) {
-throw new ConfigException(LogConfig.LocalLogRetentionMsProp, 
localLogRetentionMs, s"Value must not be more than property: 
${LogConfig.RetentionMsProp} value.")
-  }
+  // -2 indicates to derive value from retentionMs property.
+  if(localLogRetentionMs == -2) retentionMs
+  else {
+// Added validation here to check the effective value should not be 
more than RetentionMs.
+if(localLogRetentionMs == -1 && retentionMs != -1) {
+  throw new ConfigException(LogConfig.LocalLogRetentionMsProp, 
localLogRetentionMs, s"Value must not be -1 as ${LogConfig.RetentionMsProp} 
value is set as $retentionMs.")
+}
+
+if (localLogRetentionMs > retentionMs) {
+  throw new ConfigException(LogConfig.LocalLogRetentionMsProp, 
localLogRetentionMs, s"Value must not be more than property: 
${LogConfig.RetentionMsProp} value.")
+}
 
-  localLogRetentionMs
+localLogRetentionMs
+  }
 }
-  }
 
-  val localRetentionBytes: Long = {
-val localLogRetentionBytes = getLong(LogConfig.LocalLogRetentionBytesProp)
+val localRetentionBytes: Long = {
+  val localLogRetentionBytes = 
getLong(LogConfig.LocalLogRetentionBytesProp)
 
-// -2 indicates to derive value from retentionSize property.
-if(localLogRetentionBytes == -2) retentionSize;
-else {
-  // Added validation here to check the effective value should not be more 
than RetentionBytes.
-  if(localLogRetentionBytes == -1 && retentionSize != -1) {
-throw new ConfigException(LogConfig.LocalLogRetentionBytesProp, 
localLogRetentionBytes, s"Value must not be -1 as 
${LogConfig.RetentionBytesProp} value is set as $retentionSize.")
-  }
+  // -2 indicates to derive value from retentionSize property.
+  if(localLogRetentionBytes == -2) retentionSize;

Review comment:
   nit: it seems like we could remove the semicolon at the end.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah edited a comment on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft

2021-08-05 Thread GitBox


mumrah edited a comment on pull request #11166:
URL: https://github.com/apache/kafka/pull/11166#issuecomment-892067696






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr closed pull request #11159: MINOR: Change default node id in kraft broker properties

2021-08-05 Thread GitBox


dielhennr closed pull request #11159:
URL: https://github.com/apache/kafka/pull/11159


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-08-05 Thread GitBox


ccding commented on a change in pull request #0:
URL: https://github.com/apache/kafka/pull/0#discussion_r683068699



##
File path: core/src/main/scala/kafka/log/LogConfig.scala
##
@@ -107,46 +107,52 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
   val LeaderReplicationThrottledReplicas = 
getList(LogConfig.LeaderReplicationThrottledReplicasProp)
   val FollowerReplicationThrottledReplicas = 
getList(LogConfig.FollowerReplicationThrottledReplicasProp)
   val messageDownConversionEnable = 
getBoolean(LogConfig.MessageDownConversionEnableProp)
-  val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp)
 
-  val localRetentionMs: Long = {
-val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp)
+  class TieredLogConfig {

Review comment:
   Done. Thanks for the comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes

2021-08-05 Thread GitBox


jlprat commented on pull request #10784:
URL: https://github.com/apache/kafka/pull/10784#issuecomment-892521275


   Pinging @vvcephei
   Would you be able to merger this before more commits get in the way and we 
need another rebase?
   Thanks a ton!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft

2021-08-05 Thread GitBox


mumrah commented on pull request #11166:
URL: https://github.com/apache/kafka/pull/11166#issuecomment-892891735


   Spoke with @vvcephei about the streams smoke test and it is a known flaky. 
We are good-to-go with enabling KRaft for this test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-08-05 Thread GitBox


junrao commented on a change in pull request #0:
URL: https://github.com/apache/kafka/pull/0#discussion_r682820398



##
File path: core/src/main/scala/kafka/log/LogConfig.scala
##
@@ -107,46 +107,52 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
   val LeaderReplicationThrottledReplicas = 
getList(LogConfig.LeaderReplicationThrottledReplicasProp)
   val FollowerReplicationThrottledReplicas = 
getList(LogConfig.FollowerReplicationThrottledReplicasProp)
   val messageDownConversionEnable = 
getBoolean(LogConfig.MessageDownConversionEnableProp)
-  val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp)
 
-  val localRetentionMs: Long = {
-val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp)
+  class TieredLogConfig {

Review comment:
   This probably should be named RemoteLogConfig to match 
RemoteLogManagerConfig?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

2021-08-05 Thread GitBox


jolshan commented on pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#issuecomment-893066311






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >