[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-14 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1263866826


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/util/SystemTimerReaperTest.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.util.timer.SystemTimer;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+public class SystemTimerReaperTest {
+private static class FutureTimerTask extends TimerTask {
+CompletableFuture future = new CompletableFuture<>();
+
+FutureTimerTask(long delayMs) {
+super(delayMs);
+}
+
+@Override
+public void run() {
+// We use org.apache.kafka.common.errors.TimeoutException to 
differentiate
+// from java.util.concurrent.TimeoutException.
+future.completeExceptionally(new TimeoutException(
+String.format("Future failed to be completed before timeout of 
%sMs ms was reached", delayMs)));
+}
+}
+
+private  CompletableFuture add(Timer timer, long delayMs) {
+FutureTimerTask task = new FutureTimerTask<>(delayMs);
+timer.add(task);
+return task.future;
+}
+
+@Test
+public void testReaper() throws Exception {
+Timer timer = new SystemTimerReaper("reaper", new 
SystemTimer("timer"));
+try {
+CompletableFuture t1 = add(timer, 100L);
+CompletableFuture t2 = add(timer, 200L);
+CompletableFuture t3 = add(timer, 300L);
+TestUtils.assertFutureThrows(t1, TimeoutException.class);

Review Comment:
   `TestUtils.assertFutureThrows(t1, TimeoutException.class)`
   I interpreted this to mean that the timeout exception was already thrown. I 
thought this would only happen after the timeout of 100-300 ms. I guess this is 
not the case and I didn't understand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-14 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1263865677


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+private static final long WORK_TIMEOUT_MS = 200L;

Review Comment:
   Thanks. I didn't mean to imply that so I'm sorry it came across that way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-13 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1263070668


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+private static final long WORK_TIMEOUT_MS = 200L;

Review Comment:
   I read some of this, but I guess I'm just not convinced you understood it 
when this code was written. Hence my concern on the code. I guess I will read 
it again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-13 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1263069869


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/util/SystemTimerReaperTest.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.util.timer.SystemTimer;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+public class SystemTimerReaperTest {
+private static class FutureTimerTask extends TimerTask {
+CompletableFuture future = new CompletableFuture<>();
+
+FutureTimerTask(long delayMs) {
+super(delayMs);
+}
+
+@Override
+public void run() {
+// We use org.apache.kafka.common.errors.TimeoutException to 
differentiate
+// from java.util.concurrent.TimeoutException.
+future.completeExceptionally(new TimeoutException(
+String.format("Future failed to be completed before timeout of 
%sMs ms was reached", delayMs)));
+}
+}
+
+private  CompletableFuture add(Timer timer, long delayMs) {
+FutureTimerTask task = new FutureTimerTask<>(delayMs);
+timer.add(task);
+return task.future;
+}
+
+@Test
+public void testReaper() throws Exception {
+Timer timer = new SystemTimerReaper("reaper", new 
SystemTimer("timer"));
+try {
+CompletableFuture t1 = add(timer, 100L);
+CompletableFuture t2 = add(timer, 200L);
+CompletableFuture t3 = add(timer, 300L);
+TestUtils.assertFutureThrows(t1, TimeoutException.class);

Review Comment:
   Is this subject to race conditions? How can we know that lines 58-60 are 
executed after the number of ms? Or am I reading this wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-13 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1263055145


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+private static final long WORK_TIMEOUT_MS = 200L;

Review Comment:
   I'm a bit uncertain about this change, and the uncertainty from this thread 
doesn't inspire confidence.  



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+private static final long WORK_TIMEOUT_MS = 200L;

Review Comment:
   I'm a bit uncertain about this change, and the uncertainty from this thread 
doesn't inspire confidence.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-13 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1263052783


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/util/SystemTimerReaperTest.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.util.timer.SystemTimer;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+public class SystemTimerReaperTest {
+private static class FutureTimerTask extends TimerTask {
+CompletableFuture future = new CompletableFuture<>();
+
+FutureTimerTask(long delayMs) {
+super(delayMs);
+}
+
+@Override
+public void run() {
+// We use org.apache.kafka.common.errors.TimeoutException to 
differentiate
+// from java.util.concurrent.TimeoutException.
+future.completeExceptionally(new TimeoutException(
+String.format("Future failed to be completed before timeout of 
%sMs ms was reached", delayMs)));
+}
+}
+
+private  CompletableFuture add(Timer timer, long delayMs) {
+FutureTimerTask task = new FutureTimerTask<>(delayMs);
+timer.add(task);
+return task.future;
+}
+
+@Test
+public void testReaper() throws Exception {
+Timer timer = new SystemTimerReaper("reaper", new 
SystemTimer("timer"));
+try {
+CompletableFuture t1 = add(timer, 100L);
+CompletableFuture t2 = add(timer, 200L);
+CompletableFuture t3 = add(timer, 300L);
+TestUtils.assertFutureThrows(t1, TimeoutException.class);

Review Comment:
   how does line 58 pass I guess is my question. Wouldn't we need the time to 
progress 100 (then 200 and 300 ms)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261758965


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+private static final long WORK_TIMEOUT_MS = 200L;

Review Comment:
   Yeah. I got confused by this as well. That was my understanding. That the 
clock doesn't actually advance unless we expire an event. And we only advance 
it to that time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261578373


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/util/SystemTimerReaperTest.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.util.timer.SystemTimer;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+public class SystemTimerReaperTest {
+private static class FutureTimerTask extends TimerTask {
+CompletableFuture future = new CompletableFuture<>();
+
+FutureTimerTask(long delayMs) {
+super(delayMs);
+}
+
+@Override
+public void run() {
+// We use org.apache.kafka.common.errors.TimeoutException to 
differentiate
+// from java.util.concurrent.TimeoutException.
+future.completeExceptionally(new TimeoutException(
+String.format("Future failed to be completed before timeout of 
%sMs ms was reached", delayMs)));
+}
+}
+
+private  CompletableFuture add(Timer timer, long delayMs) {
+FutureTimerTask task = new FutureTimerTask<>(delayMs);
+timer.add(task);
+return task.future;
+}
+
+@Test
+public void testReaper() throws Exception {
+Timer timer = new SystemTimerReaper("reaper", new 
SystemTimer("timer"));
+try {
+CompletableFuture t1 = add(timer, 100L);
+CompletableFuture t2 = add(timer, 200L);
+CompletableFuture t3 = add(timer, 300L);
+TestUtils.assertFutureThrows(t1, TimeoutException.class);

Review Comment:
   I'm a little confused at how we ensure that the given amount of time has 
passed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261559993


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2360,6 +2360,19 @@ public void testGroupIdsByTopics() {
 assertEquals(Collections.emptySet(), 
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
 }
 
+@Test
+public void testOnNewMetadataImageWithEmptyDelta() {

Review Comment:
   This is the test for the ofNullable change? Do we also need to check we 
don't notify any groups?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261559993


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2360,6 +2360,19 @@ public void testGroupIdsByTopics() {
 assertEquals(Collections.emptySet(), 
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
 }
 
+@Test
+public void testOnNewMetadataImageWithEmptyDelta() {

Review Comment:
   This is the test for the ofNullable change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261554932


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+private static final long WORK_TIMEOUT_MS = 200L;
+
+class Reaper extends ShutdownableThread {
+Reaper(String name) {
+super(name, false);
+}
+
+@Override
+public void doWork() {
+try {
+timer.advanceClock(WORK_TIMEOUT_MS);

Review Comment:
   How did we decide upon 200?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261554932


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+private static final long WORK_TIMEOUT_MS = 200L;
+
+class Reaper extends ShutdownableThread {
+Reaper(String name) {
+super(name, false);
+}
+
+@Override
+public void doWork() {
+try {
+timer.advanceClock(WORK_TIMEOUT_MS);

Review Comment:
   How did we decide upon 200? Is my understanding correct that we only execute 
the event if it expiration is within 200 ms of the current time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261521364


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {

Review Comment:
   We needed this because there wasn't a thread actually running the timer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261496901


##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
   }
 
-  @ClusterTest(serverProperties = Array(
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(

Review Comment:
   I guess we will do many of these test changes in a followup though?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261492024


##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
   }
 
-  @ClusterTest(serverProperties = Array(
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(

Review Comment:
   Hmmm does this also mean that we are no longer testing the old group 
coordinator (at least on this test)
   
   EDIT: I see the name of the test now  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261492024


##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
   }
 
-  @ClusterTest(serverProperties = Array(
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(

Review Comment:
   Hmmm does this also mean that we are no longer testing the old group 
coordinator (at least on this test)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261475273


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -175,7 +176,7 @@ object Defaults {
   val ConsumerGroupMinHeartbeatIntervalMs = 5000
   val ConsumerGroupMaxHeartbeatIntervalMs = 15000
   val ConsumerGroupMaxSize = Int.MaxValue
-  val ConsumerGroupAssignors = ""
+  val ConsumerGroupAssignors = List(classOf[RangeAssignor].getName).asJava

Review Comment:
   Did we need to do this because the default couldn't be empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org