Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-03-03 Thread via GitHub


ijuma commented on PR #15430:
URL: https://github.com/apache/kafka/pull/15430#issuecomment-1975916553

   Unrelated to this PR, but the description of the class calls it `concurrent` 
while we seem to acquire locks during the operations. Perhaps thread-safe is a 
better description for what's happening here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-28 Thread via GitHub


dajac merged PR #15430:
URL: https://github.com/apache/kafka/pull/15430


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-27 Thread via GitHub


dajac commented on code in PR #15430:
URL: https://github.com/apache/kafka/pull/15430#discussion_r1504266300


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -53,53 +50,19 @@
 
 @Timeout(value = 60)
 public class MultiThreadedEventProcessorTest {
-private static class MockEventAccumulator extends 
EventAccumulator {
+private static class DelayEventAccumulator extends 
EventAccumulator {
 private final Time time;
-private final Queue events;
-private final long timeToPollMs;
-private final AtomicBoolean isClosed;
+private final long takeDelayMs;
 
-public MockEventAccumulator(Time time, long timeToPollMs) {
+public DelayEventAccumulator(Time time, long takeDelayMs) {
 this.time = time;
-this.events = new LinkedList<>();
-this.timeToPollMs = timeToPollMs;
-this.isClosed = new AtomicBoolean(false);
+this.takeDelayMs = takeDelayMs;
 }
 
 @Override
-public CoordinatorEvent poll() {
-synchronized (events) {

Review Comment:
   I think that it was already doing it. I am not sure why we implemented it 
this way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-27 Thread via GitHub


dajac commented on code in PR #15430:
URL: https://github.com/apache/kafka/pull/15430#discussion_r1504265561


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java:
##
@@ -137,31 +136,43 @@ public void add(T event) throws 
RejectedExecutionException {
 }
 
 /**
- * Returns the next {{@link Event}} available. This method block 
indefinitely until
- * one event is ready or the accumulator is closed.
+ * Returns the next {{@link Event}} available or null if no event is
+ * available.
  *
- * @return The next event.
+ * @return The next event available or null.
  */
 public T poll() {
-return poll(Long.MAX_VALUE, TimeUnit.SECONDS);
+lock.lock();
+try {
+K key = randomKey();
+if (key == null) return null;
+
+Queue queue = queues.get(key);
+T event = queue.poll();
+
+if (queue.isEmpty()) queues.remove(key);
+inflightKeys.add(key);
+size--;
+
+return event;
+} finally {
+lock.unlock();
+}
 }
 
 /**
- * Returns the next {{@link Event}} available. This method blocks for the 
provided
- * time and returns null of not event is available.
+ * Returns the next {{@link Event}} available. This method blocks until an
+ * event is available or the thread is interrupted.
  *
- * @param timeout   The timeout.
- * @param unit  The timeout unit.
  * @return The next event available or null.
  */
-public T poll(long timeout, TimeUnit unit) {
+public T take() {
 lock.lock();
 try {
 K key = randomKey();
-long nanos = unit.toNanos(timeout);
-while (key == null && !closed && nanos > 0) {
+while (key == null && !closed) {
 try {
-nanos = condition.awaitNanos(nanos);

Review Comment:
   Yeah, I don't know if it really makes a difference in the end. However, it 
seems better to improve the code as we only use 0 or max.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-26 Thread via GitHub


jolshan commented on code in PR #15430:
URL: https://github.com/apache/kafka/pull/15430#discussion_r1503400099


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java:
##
@@ -137,31 +136,43 @@ public void add(T event) throws 
RejectedExecutionException {
 }
 
 /**
- * Returns the next {{@link Event}} available. This method block 
indefinitely until
- * one event is ready or the accumulator is closed.
+ * Returns the next {{@link Event}} available or null if no event is
+ * available.
  *
- * @return The next event.
+ * @return The next event available or null.
  */
 public T poll() {
-return poll(Long.MAX_VALUE, TimeUnit.SECONDS);
+lock.lock();
+try {
+K key = randomKey();
+if (key == null) return null;
+
+Queue queue = queues.get(key);
+T event = queue.poll();
+
+if (queue.isEmpty()) queues.remove(key);
+inflightKeys.add(key);
+size--;
+
+return event;
+} finally {
+lock.unlock();
+}
 }
 
 /**
- * Returns the next {{@link Event}} available. This method blocks for the 
provided
- * time and returns null of not event is available.
+ * Returns the next {{@link Event}} available. This method blocks until an
+ * event is available or the thread is interrupted.
  *
- * @param timeout   The timeout.
- * @param unit  The timeout unit.
  * @return The next event available or null.
  */
-public T poll(long timeout, TimeUnit unit) {
+public T take() {
 lock.lock();
 try {
 K key = randomKey();
-long nanos = unit.toNanos(timeout);
-while (key == null && !closed && nanos > 0) {
+while (key == null && !closed) {
 try {
-nanos = condition.awaitNanos(nanos);

Review Comment:
   I think the idea was to simplify it. Since nanos was always 0 or max value, 
there's no need to specify specific values. Just have two methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-26 Thread via GitHub


jeffkbkim commented on code in PR #15430:
URL: https://github.com/apache/kafka/pull/15430#discussion_r1503398332


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java:
##
@@ -137,31 +136,43 @@ public void add(T event) throws 
RejectedExecutionException {
 }
 
 /**
- * Returns the next {{@link Event}} available. This method block 
indefinitely until
- * one event is ready or the accumulator is closed.
+ * Returns the next {{@link Event}} available or null if no event is
+ * available.
  *
- * @return The next event.
+ * @return The next event available or null.
  */
 public T poll() {
-return poll(Long.MAX_VALUE, TimeUnit.SECONDS);
+lock.lock();
+try {
+K key = randomKey();
+if (key == null) return null;
+
+Queue queue = queues.get(key);
+T event = queue.poll();
+
+if (queue.isEmpty()) queues.remove(key);
+inflightKeys.add(key);
+size--;
+
+return event;
+} finally {
+lock.unlock();
+}
 }
 
 /**
- * Returns the next {{@link Event}} available. This method blocks for the 
provided
- * time and returns null of not event is available.
+ * Returns the next {{@link Event}} available. This method blocks until an
+ * event is available or the thread is interrupted.
  *
- * @param timeout   The timeout.
- * @param unit  The timeout unit.
  * @return The next event available or null.
  */
-public T poll(long timeout, TimeUnit unit) {
+public T take() {
 lock.lock();
 try {
 K key = randomKey();
-long nanos = unit.toNanos(timeout);
-while (key == null && !closed && nanos > 0) {
+while (key == null && !closed) {
 try {
-nanos = condition.awaitNanos(nanos);

Review Comment:
   it seems to me that the purpose of this PR is to remove this. how much worse 
is using awaitNanos compared to await? i can imagine a subtle impact but i 
guess i'd like to know your expectation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-26 Thread via GitHub


jeffkbkim commented on code in PR #15430:
URL: https://github.com/apache/kafka/pull/15430#discussion_r1503396742


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -53,53 +50,19 @@
 
 @Timeout(value = 60)
 public class MultiThreadedEventProcessorTest {
-private static class MockEventAccumulator extends 
EventAccumulator {
+private static class DelayEventAccumulator extends 
EventAccumulator {
 private final Time time;
-private final Queue events;
-private final long timeToPollMs;
-private final AtomicBoolean isClosed;
+private final long takeDelayMs;
 
-public MockEventAccumulator(Time time, long timeToPollMs) {
+public DelayEventAccumulator(Time time, long takeDelayMs) {
 this.time = time;
-this.events = new LinkedList<>();
-this.timeToPollMs = timeToPollMs;
-this.isClosed = new AtomicBoolean(false);
+this.takeDelayMs = takeDelayMs;
 }
 
 @Override
-public CoordinatorEvent poll() {
-synchronized (events) {
-while (events.isEmpty() && !isClosed.get()) {
-try {
-events.wait();
-} catch (Exception ignored) {
-
-}
-}
-time.sleep(timeToPollMs);
-return events.poll();
-}
-}
-
-@Override
-public CoordinatorEvent poll(long timeout, TimeUnit unit) {
-return null;
-}
-
-@Override
-public void add(CoordinatorEvent event) throws 
RejectedExecutionException {
-synchronized (events) {
-events.add(event);
-events.notifyAll();
-}
-}
-
-@Override
-public void close() {
-isClosed.set(true);
-synchronized (events) {
-events.notifyAll();
-}
+public CoordinatorEvent take() {

Review Comment:
   thanks for the simplification!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-26 Thread via GitHub


jolshan commented on PR #15430:
URL: https://github.com/apache/kafka/pull/15430#issuecomment-1965265194

   Left one question -- otherwise lgtm


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-26 Thread via GitHub


jolshan commented on code in PR #15430:
URL: https://github.com/apache/kafka/pull/15430#discussion_r1503291909


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -53,53 +50,19 @@
 
 @Timeout(value = 60)
 public class MultiThreadedEventProcessorTest {
-private static class MockEventAccumulator extends 
EventAccumulator {
+private static class DelayEventAccumulator extends 
EventAccumulator {
 private final Time time;
-private final Queue events;
-private final long timeToPollMs;
-private final AtomicBoolean isClosed;
+private final long takeDelayMs;
 
-public MockEventAccumulator(Time time, long timeToPollMs) {
+public DelayEventAccumulator(Time time, long takeDelayMs) {
 this.time = time;
-this.events = new LinkedList<>();
-this.timeToPollMs = timeToPollMs;
-this.isClosed = new AtomicBoolean(false);
+this.takeDelayMs = takeDelayMs;
 }
 
 @Override
-public CoordinatorEvent poll() {
-synchronized (events) {

Review Comment:
   It's interesting that we basically made the real accumulator do what this 
mock was doing  
   But it also makes sense  
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-26 Thread via GitHub


jolshan commented on code in PR #15430:
URL: https://github.com/apache/kafka/pull/15430#discussion_r1503073175


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java:
##
@@ -137,31 +136,43 @@ public void add(T event) throws 
RejectedExecutionException {
 }
 
 /**
- * Returns the next {{@link Event}} available. This method block 
indefinitely until
- * one event is ready or the accumulator is closed.
+ * Returns the next {{@link Event}} available or null if no event is
+ * available.
  *
- * @return The next event.
+ * @return The next event available or null.
  */
 public T poll() {
-return poll(Long.MAX_VALUE, TimeUnit.SECONDS);
+lock.lock();
+try {
+K key = randomKey();
+if (key == null) return null;
+
+Queue queue = queues.get(key);
+T event = queue.poll();
+
+if (queue.isEmpty()) queues.remove(key);
+inflightKeys.add(key);
+size--;
+
+return event;
+} finally {
+lock.unlock();
+}
 }
 
 /**
- * Returns the next {{@link Event}} available. This method blocks for the 
provided
- * time and returns null of not event is available.
+ * Returns the next {{@link Event}} available. This method blocks until an
+ * event is available or the thread is interrupted.

Review Comment:
   I noticed we just catch interrupted exceptions. Is that correct? Or should 
we adjust the comment to say or the accumulator is closed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-26 Thread via GitHub


dajac opened a new pull request, #15430:
URL: https://github.com/apache/kafka/pull/15430

   `poll(long timeout, TimeUnit unit)` is either used with `Long.MAX_VALUE` or 
`0`. This patch replaces it with `poll` and `take`. It removes the `awaitNanos` 
usage.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org