philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1136454999
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -437,29 +456,24 @@ public void wakeup() { */ @Override public void commitSync(final Duration timeout) { - final CommitApplicationEvent commitEvent = new CommitApplicationEvent(subscriptions.allConsumed()); - eventHandler.add(commitEvent); - - final CompletableFuture<Void> commitFuture = commitEvent.future(); - try { - commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - } catch (final TimeoutException e) { - throw new org.apache.kafka.common.errors.TimeoutException( - "timeout"); - } catch (final Exception e) { - // handle exception here - throw new RuntimeException(e); - } + commitSync(subscriptions.allConsumed(), timeout); } @Override public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) { - throw new KafkaException("method not implemented"); + commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs)); } @Override public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) { - throw new KafkaException("method not implemented"); + CompletableFuture<Void> commitFuture = commit(offsets); + try { + commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (final TimeoutException e) { Review Comment: Ditto above: InterruptedException -> kafka interruptException. TimeoutException -> kafka timeoutException. ExecutionException -> kafkaException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org