Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax closed pull request #14951: KAFKA-15662: Add support for clientInstanceIds in Kafka Stream URL: https://github.com/apache/kafka/pull/14951 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on PR #14951: URL: https://github.com/apache/kafka/pull/14951#issuecomment-2272474949 Closing this PR. We are heading into 4.0 for which we plan to remove EOSv1, so maybe we won't need this any longer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
github-actions[bot] commented on PR #14951: URL: https://github.com/apache/kafka/pull/14951#issuecomment-1982287149 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax closed pull request #14864: KAFKA-15662: Add support for clientInstanceIds in Kafka Stream URL: https://github.com/apache/kafka/pull/14864 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on PR #14864: URL: https://github.com/apache/kafka/pull/14864#issuecomment-1862393058 We split this PR into multiple smaller ones. Closing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on PR #14936: URL: https://github.com/apache/kafka/pull/14936#issuecomment-1852434603 @stanislavkozlovski There is no 3.7 branch yet, so I merged this one. Must go into 3.7 release (is ready for days, but Jenkins did not cooperate...) -- If your cut does not include it, I'll cherry-pick to 3.7 later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax merged PR #14936: URL: https://github.com/apache/kafka/pull/14936 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax merged PR #14948: URL: https://github.com/apache/kafka/pull/14948 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14936: URL: https://github.com/apache/kafka/pull/14936#discussion_r1421112931 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -231,6 +240,69 @@ private void restoreTasks(final long now) { } } +private void maybeGetClientInstanceIds() { +if (fetchDeadlineClientInstanceId != -1) { +if (!clientInstanceIdFuture.isDone()) { +if (fetchDeadlineClientInstanceId >= time.milliseconds()) { +try { +// if the state-updated thread has active work: +//we pass in a timeout of zero into each `clientInstanceId()` call +//to just trigger the "get instance id" background RPC; +//we don't want to block the state updater thread that can do useful work in the meantime +// otherwise, we pass in 100ms to avoid busy waiting +clientInstanceIdFuture.complete( +restoreConsumer.clientInstanceId( +allWorkDone() ? Duration.ofMillis(100L) : Duration.ZERO Review Comment: Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax merged PR #14935: URL: https://github.com/apache/kafka/pull/14935 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
lucasbru commented on code in PR #14936: URL: https://github.com/apache/kafka/pull/14936#discussion_r1420236155 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -231,6 +240,69 @@ private void restoreTasks(final long now) { } } +private void maybeGetClientInstanceIds() { +if (fetchDeadlineClientInstanceId != -1) { +if (!clientInstanceIdFuture.isDone()) { +if (fetchDeadlineClientInstanceId >= time.milliseconds()) { +try { +// if the state-updated thread has active work: +//we pass in a timeout of zero into each `clientInstanceId()` call +//to just trigger the "get instance id" background RPC; +//we don't want to block the state updater thread that can do useful work in the meantime +// otherwise, we pass in 100ms to avoid busy waiting +clientInstanceIdFuture.complete( +restoreConsumer.clientInstanceId( +allWorkDone() ? Duration.ofMillis(100L) : Duration.ZERO Review Comment: I think this will always be false here, because of `fetchDeadlineClientInstanceId` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14936: URL: https://github.com/apache/kafka/pull/14936#discussion_r1419743348 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -772,6 +817,26 @@ public Set getStandbyTasks() { ); } +@Override +public KafkaFutureImpl restoreConsumerInstanceId(final Duration timeout) { +if (stateUpdaterThread.restoreConsumerInstanceIdFuture != null) { +return stateUpdaterThread.restoreConsumerInstanceIdFuture; Review Comment: I rewrote this and pushed into the state-updated thread directly (and let is align with other code). Hope this does the trick. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
lucasbru commented on code in PR #14922: URL: https://github.com/apache/kafka/pull/14922#discussion_r1419615776 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -713,6 +723,44 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { +// we pass in a timeout of zero into each `clientInstanceId()` call +// to just trigger the "get instance id" background RPC; +// we don't want to block the stream thread that can do useful work in the meantime + +if (fetchDeadlineClientInstanceId != -1) { +if (!mainConsumerInstanceIdFuture.isDone()) { +if (fetchDeadlineClientInstanceId >= time.milliseconds()) { +try { + mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO)); +maybeResetFetchDeadline(); +} catch (final IllegalStateException disabledError) { +mainConsumerInstanceIdFuture.complete(null); Review Comment: Sounds good. Yeah a comment will help here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on PR #14922: URL: https://github.com/apache/kafka/pull/14922#issuecomment-1845908617 Thanks. Will address the comment in a follow up PR and merge this as-is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax merged PR #14922: URL: https://github.com/apache/kafka/pull/14922 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14922: URL: https://github.com/apache/kafka/pull/14922#discussion_r1419283998 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -713,6 +723,44 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { +// we pass in a timeout of zero into each `clientInstanceId()` call +// to just trigger the "get instance id" background RPC; +// we don't want to block the stream thread that can do useful work in the meantime + +if (fetchDeadlineClientInstanceId != -1) { +if (!mainConsumerInstanceIdFuture.isDone()) { +if (fetchDeadlineClientInstanceId >= time.milliseconds()) { +try { + mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO)); +maybeResetFetchDeadline(); +} catch (final IllegalStateException disabledError) { +mainConsumerInstanceIdFuture.complete(null); +maybeResetFetchDeadline(); +} catch (final TimeoutException swallow) { +// swallow +} catch (final Exception error) { + mainConsumerInstanceIdFuture.completeExceptionally(error); +maybeResetFetchDeadline(); +} +} else { +mainConsumerInstanceIdFuture.completeExceptionally( +new TimeoutException("Could not retrieve main consumer client instance id.") +); +} +} +} +} + +private void maybeResetFetchDeadline() { Review Comment: It's not worth with this PR, but when we add other client is will. Keeping for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14922: URL: https://github.com/apache/kafka/pull/14922#discussion_r1419283420 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -713,6 +723,44 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { +// we pass in a timeout of zero into each `clientInstanceId()` call +// to just trigger the "get instance id" background RPC; +// we don't want to block the stream thread that can do useful work in the meantime + +if (fetchDeadlineClientInstanceId != -1) { +if (!mainConsumerInstanceIdFuture.isDone()) { +if (fetchDeadlineClientInstanceId >= time.milliseconds()) { +try { + mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO)); +maybeResetFetchDeadline(); +} catch (final IllegalStateException disabledError) { +mainConsumerInstanceIdFuture.complete(null); +maybeResetFetchDeadline(); +} catch (final TimeoutException swallow) { +// swallow +} catch (final Exception error) { + mainConsumerInstanceIdFuture.completeExceptionally(error); +maybeResetFetchDeadline(); +} +} else { +mainConsumerInstanceIdFuture.completeExceptionally( Review Comment: Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14922: URL: https://github.com/apache/kafka/pull/14922#discussion_r1419280528 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -713,6 +723,44 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { +// we pass in a timeout of zero into each `clientInstanceId()` call +// to just trigger the "get instance id" background RPC; +// we don't want to block the stream thread that can do useful work in the meantime + +if (fetchDeadlineClientInstanceId != -1) { +if (!mainConsumerInstanceIdFuture.isDone()) { +if (fetchDeadlineClientInstanceId >= time.milliseconds()) { +try { + mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO)); +maybeResetFetchDeadline(); +} catch (final IllegalStateException disabledError) { +mainConsumerInstanceIdFuture.complete(null); Review Comment: ISE is throws if telemetry is disabled on the client -- in general, we assume that it's enabled for all clients, or disabled for all clients, but it could also be, that it's disabled for _some_ client only. For the last case, we want to swallow the error what happens here (on general, we say, a single error on any client fails the call to `KafkaStreams#clientInstanceIds()` but for this case we need to make an exception). Let me know if you agree or disagree. Will add a comment. (Seems `disabledError` as var name is not descriptive enough). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
lucasbru commented on code in PR #14951: URL: https://github.com/apache/kafka/pull/14951#discussion_r1418947853 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -716,6 +734,126 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { +// we pass in a timeout of zero into each `clientInstanceId()` call +// to just trigger the "get instance id" background RPC; +// we don't want to block the stream thread that can do useful work in the meantime + +if (fetchDeadlineClientInstanceId != -1) { +if (processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) { +final Map activeTasks = taskManager.activeTaskMap(); + +// setup task futures if necessary +if (!producerInstanceIdFuture.isDone()) { +if (fetchDeadlineClientInstanceId >= time.milliseconds()) { +// only active tasks have a producer (standby tasks don't) +// producers are set up during task creation and thus all active tasks have a valid producer +for (final Map.Entry task : activeTasks.entrySet()) { +final TaskId taskId = task.getKey(); +KafkaFutureImpl future = taskProducersInstanceIdsFuture.get(taskId); +if (future == null || future.isCompletedExceptionally()) { +future = new KafkaFutureImpl<>(); +((StreamTask) task.getValue()).producerInstanceId = future; +taskProducersInstanceIdsFuture.put(taskId, future); +} +} +if (stateUpdaterEnabled) { Review Comment: Dropping some ideas how to implement this with state updater: - Add `clientInstanceIds` method to the `Task` interface, and also implement it in `ReadOnlyTask`. - NOOP for standby tasks and of EOS_ALPHA is not enabled - Since the producer is thread safe, you should then be able to call that function whether it's owned by the state updater or the stream thread. - Use `tasks.activeTasks` or something similar to get a collection of _all_ tasks, including state updater tasks. - Do it like it's defined here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
lucasbru commented on code in PR #14922: URL: https://github.com/apache/kafka/pull/14922#discussion_r1418773892 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -713,6 +723,44 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { +// we pass in a timeout of zero into each `clientInstanceId()` call +// to just trigger the "get instance id" background RPC; +// we don't want to block the stream thread that can do useful work in the meantime + +if (fetchDeadlineClientInstanceId != -1) { +if (!mainConsumerInstanceIdFuture.isDone()) { +if (fetchDeadlineClientInstanceId >= time.milliseconds()) { +try { + mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO)); +maybeResetFetchDeadline(); +} catch (final IllegalStateException disabledError) { +mainConsumerInstanceIdFuture.complete(null); +maybeResetFetchDeadline(); +} catch (final TimeoutException swallow) { +// swallow +} catch (final Exception error) { + mainConsumerInstanceIdFuture.completeExceptionally(error); +maybeResetFetchDeadline(); +} +} else { +mainConsumerInstanceIdFuture.completeExceptionally( +new TimeoutException("Could not retrieve main consumer client instance id.") +); +} +} +} +} + +private void maybeResetFetchDeadline() { Review Comment: nit: not sure the function is worth it, you seem to always call it when `reset` is true. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -713,6 +723,44 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { +// we pass in a timeout of zero into each `clientInstanceId()` call +// to just trigger the "get instance id" background RPC; +// we don't want to block the stream thread that can do useful work in the meantime + +if (fetchDeadlineClientInstanceId != -1) { +if (!mainConsumerInstanceIdFuture.isDone()) { +if (fetchDeadlineClientInstanceId >= time.milliseconds()) { +try { + mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO)); +maybeResetFetchDeadline(); +} catch (final IllegalStateException disabledError) { +mainConsumerInstanceIdFuture.complete(null); +maybeResetFetchDeadline(); +} catch (final TimeoutException swallow) { +// swallow +} catch (final Exception error) { + mainConsumerInstanceIdFuture.completeExceptionally(error); +maybeResetFetchDeadline(); +} +} else { +mainConsumerInstanceIdFuture.completeExceptionally( Review Comment: reset fetch deadline as well? Otherwise we'll keep allocating this timeoutexception every time we go around the loop ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -713,6 +723,44 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { +// we pass in a timeout of zero into each `clientInstanceId()` call +// to just trigger the "get instance id" background RPC; +// we don't want to block the stream thread that can do useful work in the meantime + +if (fetchDeadlineClientInstanceId != -1) { +if (!mainConsumerInstanceIdFuture.isDone()) { +if (fetchDeadlineClientInstanceId >= time.milliseconds()) { +try { + mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO)); +maybeResetFetchDeadline(); +} catch (final IllegalStateException disabledError) { +mainConsumerInstanceIdFuture.complete(null); Review Comment: why are we ignored ISE? ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1813,17 +1816,29 @@ public ClientInstanceIds clientInstanceIds(final Duration timeout) { throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); } +long remainingTimeMs = timeout.toMillis(); Review Comment: This could be a good use case
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
lucasbru commented on code in PR #14922: URL: https://github.com/apache/kafka/pull/14922#discussion_r1418730263 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -713,6 +727,48 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { Review Comment: hey, when I wrote that comment we still had a list of futures! Now it's not a problem anymore, agreed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
wcarlson5 commented on code in PR #14935: URL: https://github.com/apache/kafka/pull/14935#discussion_r1418058902 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -1477,6 +1528,27 @@ public Object getStateLock() { return stateLock; } +public Map> consumerClientInstanceIds(final Duration timeout) { Review Comment: and here ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -454,4 +485,19 @@ public void shutdown() { public Map consumerMetrics() { return Collections.unmodifiableMap(globalConsumer.metrics()); } + +public KafkaFuture globalConsumerInstanceId(final Duration timeout) { Review Comment: Same comment here if you don't mind -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
wcarlson5 commented on code in PR #14922: URL: https://github.com/apache/kafka/pull/14922#discussion_r1418054625 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1802,7 +1805,7 @@ protected int processStreamThread(final Consumer consumer) { * @throws TimeoutException Indicates that a request timed out. * @throws StreamsException For any other error that might occur. */ -public ClientInstanceIds clientInstanceIds(final Duration timeout) { +public synchronized ClientInstanceIds clientInstanceIds(final Duration timeout) { Review Comment: I think this should take care of most of the threading issues, but it does leave it easy to introduce bugs in the future ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -1477,6 +1528,27 @@ public Object getStateLock() { return stateLock; } +public Map> consumerClientInstanceIds(final Duration timeout) { Review Comment: Can we add a comment here that this isn't thread safe? Just for the next person who tries to use it an introduces a nasty race condition. The fact that it returns futures might make people think it is ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -713,6 +727,48 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { Review Comment: That is actually okay as each time the call is made it overwrites `mainConsumerInstanceIdFuture` anyways so there is only ever one future to complete. Not that this isn't an issue but with the caller being synchronized it won't be for this feature -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14922: URL: https://github.com/apache/kafka/pull/14922#discussion_r1417761864 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -1477,6 +1533,29 @@ public Object getStateLock() { return stateLock; } +public Map> consumerClientInstanceIds(final Duration timeout) { +final Map> result = new HashMap<>(); + +synchronized (fetchDeadlines) { +boolean addDeadline = false; Review Comment: This PR itself does not need it yet (but we need it later when we add support for restore consumer and producer) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
lucasbru commented on code in PR #14936: URL: https://github.com/apache/kafka/pull/14936#discussion_r1417082199 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -79,6 +84,10 @@ private class StateUpdaterThread extends Thread { private long totalCheckpointLatency = 0L; +private volatile long fetchDeadline = -1L; Review Comment: Same, rename please ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -231,6 +242,36 @@ private void restoreTasks(final long now) { } } +private void maybeGetClientInstanceIds() { +// we pass in a timeout of zero into each `clientInstanceId()` call +// to just trigger the "get instance id" background RPC; +// we don't want to block the state updater thread that can do useful work in the meantime + +if (fetchDeadline != -1) { +if (!restoreConsumerInstanceIdFuture.isDone()) { +if (fetchDeadline >= time.milliseconds()) { +try { +restoreConsumerClientInstanceId = restoreConsumer.clientInstanceId(Duration.ZERO); + restoreConsumerInstanceIdFuture.complete(restoreConsumerClientInstanceId); +fetchDeadline = -1L; +} catch (final IllegalStateException disabledError) { +restoreConsumerInstanceIdFuture.complete(null); +fetchDeadline = -1L; +} catch (final TimeoutException swallow) { +// swallow +} catch (final Exception error) { + restoreConsumerInstanceIdFuture.completeExceptionally(error); +fetchDeadline = -1L; +} +} else { +restoreConsumerInstanceIdFuture.completeExceptionally( +new TimeoutException("Could not retrieve main consumer client instance id.") Review Comment: that's the restore consumer client instance id ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -772,6 +817,26 @@ public Set getStandbyTasks() { ); } +@Override +public KafkaFutureImpl restoreConsumerInstanceId(final Duration timeout) { +if (stateUpdaterThread.restoreConsumerInstanceIdFuture != null) { +return stateUpdaterThread.restoreConsumerInstanceIdFuture; Review Comment: I still want to update the fetch deadline before returning, right? Otherwise I'll timeout too early. Also, what if the existing future is completed with a timeout exception already? How about this: - Update the `fetchDeadline` first in this function (max of new and current deadline) - Do not complete the future with a timeout exception if the deadline expires, instead just set the future to null, deadline to -1. - Use bounded-time `get` on the future in the application thread. - When the current future completed with an error, also set the future to null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
lucasbru commented on PR #14922: URL: https://github.com/apache/kafka/pull/14922#issuecomment-1842617271 Ah, I think you are using fetch deadlines to avoid calling the RPC forever. That makes sense. But then, we can probably still just keep a single deadline (the max of all calls) and a single future, and let the application thread use a time-bounded get, and we avoid some issues I pointed out above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
lucasbru commented on code in PR #14922: URL: https://github.com/apache/kafka/pull/14922#discussion_r1417077988 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -713,6 +727,48 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { Review Comment: I think there is a risk here that if the client calls `clientInstanceIds` faster than we are going around the poll loop, we will have an every-growing number of futures to complete, and in each iteration we can only complete one. We may also run out of time this way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
lucasbru commented on code in PR #14935: URL: https://github.com/apache/kafka/pull/14935#discussion_r1417064532 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -65,6 +69,9 @@ public class GlobalStreamThread extends Thread { private final AtomicLong cacheSize; private volatile StreamsException startupException; private java.util.function.Consumer streamsUncaughtExceptionHandler; +private volatile Uuid globalConsumerClientInstanceId = null; +private volatile long fetchDeadline = -1; Review Comment: Same, give this a name that is related to clientInstanceId -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
lucasbru commented on code in PR #14922: URL: https://github.com/apache/kafka/pull/14922#discussion_r1417036886 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -1477,6 +1533,29 @@ public Object getStateLock() { return stateLock; } +public Map> consumerClientInstanceIds(final Duration timeout) { +final Map> result = new HashMap<>(); + +synchronized (fetchDeadlines) { +boolean addDeadline = false; Review Comment: Why do we need this boolean? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -335,6 +342,11 @@ public boolean isStartingRunningOrPartitionAssigned() { private final boolean stateUpdaterEnabled; private final boolean processingThreadsEnabled; +private volatile Uuid mainConsumerClientInstanceId = null; + +private final List fetchDeadlines = new LinkedList<>(); Review Comment: maybe rename. fetch deadlines for what? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -713,6 +727,48 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { +synchronized (fetchDeadlines) { +// we pass in a timeout of zero into each `clientInstanceId()` call +// to just trigger the "get instance id" background RPC; +// we don't want to block the stream thread that can do useful work in the meantime + +if (!fetchDeadlines.isEmpty()) { +if (fetchDeadlines.get(0) >= time.milliseconds()) { +try { +mainConsumerClientInstanceId = mainConsumer.clientInstanceId(Duration.ZERO); +mainConsumerInstanceIdFutures.forEach(f -> f.complete(mainConsumerClientInstanceId)); Review Comment: It seems I can remove all the fetch deadlines here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax merged PR #14908: URL: https://github.com/apache/kafka/pull/14908 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on PR #14908: URL: https://github.com/apache/kafka/pull/14908#issuecomment-1841550699 Build issues: - Second run: `JDK 8 and Scala 2.12` - Third run: `JDK 17 and Scala 2.13` Failing KS tests pass locally (so they seems to be flaky only). Others are flaky/not related to this PR. Merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
wcarlson5 commented on PR #14908: URL: https://github.com/apache/kafka/pull/14908#issuecomment-1841271853 Looks like there is still a build issue :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on PR #14864: URL: https://github.com/apache/kafka/pull/14864#issuecomment-1837748113 Split out some parts to make reviewing simpler as this PR grows to large: https://github.com/apache/kafka/pull/14908 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1413296164 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java: ## @@ -136,8 +135,8 @@ StreamsProducer streamsProducerForTask(final TaskId taskId) { } StreamsProducer threadProducer() { -if (processingMode != EXACTLY_ONCE_V2) { -throw new IllegalStateException("Expected EXACTLY_ONCE_V2 to be enabled, but the processing mode was " + processingMode); +if (processingMode == EXACTLY_ONCE_ALPHA) { Review Comment: Yes. When this code was added, there was no reason to call this method for ALOS and to get the producer out -- only for EOSv2 it was required for error handling. Thus, the check was very restrictive. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
wcarlson5 commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1412696799 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -713,6 +735,97 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { +// we pass in a timeout of zero into each `clientInstanceId()` call +// to just trigger the "get instance id" background RPC; +// we don't want to block the stream thread that can do useful work in the meantime Review Comment: good comments :) ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -1477,6 +1590,67 @@ public Object getStateLock() { return stateLock; } +public Map> consumerClientInstanceIds(final Duration timeout) { +boolean setDeadline = false; + +final Map> result = new HashMap<>(); + +KafkaFutureImpl future = new KafkaFutureImpl<>(); +if (mainConsumerClientInstanceId != null) { +future.complete(mainConsumerClientInstanceId); +} else { +mainConsumerInstanceIdFuture = future; +setDeadline = true; +} +result.put(getName() + "-consumer", future); + +future = new KafkaFutureImpl<>(); +if (restoreConsumerClientInstanceId != null) { +future.complete(restoreConsumerClientInstanceId); +} else { +restoreConsumerInstanceIdFuture = future; +setDeadline = true; +} +result.put(getName() + "-restore-consumer", future); + +if (setDeadline) { +fetchDeadline = time.milliseconds() + timeout.toMillis(); +} + +return result; +} + +public KafkaFuture>> producersClientInstanceIds(final Duration timeout) { +final KafkaFutureImpl>> result = new KafkaFutureImpl<>(); + +if (processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) { +//for (final TaskId taskId : taskManager.activeTaskIds()) { +//future = new KafkaFutureImpl<>(); +//if (taskProducersClientInstanceIds.get(taskId) != null) { +// future.complete(taskProducersClientInstanceIds.get(taskId)); +//} else { +//taskProducersInstanceIdsFuture.put(taskId, future); +//setDeadline = true; +//} +//result.put(getName() + "-" + taskId + "-producer", future); +//}; +} else { +final KafkaFutureImpl producerFuture = new KafkaFutureImpl<>(); +if (threadProducerClientInstanceId != null) { +producerFuture.complete(threadProducerClientInstanceId); +} else { +threadProducerInstanceIdFuture = producerFuture; +if (fetchDeadline == -1) { +fetchDeadline = time.milliseconds() + timeout.toMillis(); +} +} + +result.complete(Collections.singletonMap(getName() + "-producer", producerFuture)); Review Comment: Could we be lazy and just treat any unavailable tasks as if they were set up with telemetry is disabled on the client itself? Not ideal but should keep things simple. Otherwise we might timeout all the time with restoring tasks ## streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.internals; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.streams.ClientInstanceIds; + +import java.util.HashMap; +import java.util.Map; + +public class ClientInstanceIdsImpl implements ClientInstanceIds { +private final Map consumerInstanceIds = new HashMap<>(); +private final Map producerInstanceIds = new HashMap<>(); +private Uuid adminInstanceId; + +public void addConsumerInstanceId(final String key, final Uuid instanceId) { +consumerInstanceIds.put(key, instanceId); +} + +public void addProducerInstanceId(fi
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1412011082 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1795,146 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return The internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalArgumentException If {@code timeout} is negative. Review Comment: Added this (in alignment to `consumer/producer/admin$clientInstanceId()` -- KIP needs to be updated accordingly ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1795,146 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return The internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalArgumentException If {@code timeout} is negative. + * @throws IllegalStateException If {@code KafkaStreams} is not running. + * @throws TimeoutException Indicates that a request timed out. + * @throws StreamsException For any other error that might occur. + */ +public ClientInstanceIds clientInstanceIds(final Duration timeout) { +if (timeout.isNegative()) { +throw new IllegalArgumentException("The timeout cannot be negative."); +} +if (state().hasNotStarted()) { +throw new IllegalStateException("KafkaStreams has not been started, you can retry after calling start()."); +} +if (state().isShuttingDown() || state.hasCompletedShutdown()) { +throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); +} + +final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl(); + +// (1) fan-out calls to threads + +// StreamThread for main/restore consumers and producer(s) +final Map> consumerFutures = new HashMap<>(); +final Map>>> producerFutures = new HashMap<>(); +for (final StreamThread streamThread : threads) { + consumerFutures.putAll(streamThread.consumerClientInstanceIds(timeout)); +producerFutures.put(streamThread.getName(), streamThread.producersClientInstanceIds(timeout)); +} +// GlobalThread +KafkaFuture globalThreadFuture = null; +if (globalStreamThread != null) { +globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout); +} + +// (2) get admin client instance id in a blocking fashion, while Stream/GlobalThreads work in parallel +try { + clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout)); +} catch (final IllegalStateException telemetryDisabledError) { Review Comment: This is new, base on other PRs from KIP-714 -- `adminClient.clientInstanceId` throw is telemetry is disable -- for this case, we might not want to throw, but return a "partial" result... ## streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.internals; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.streams.ClientInstanceIds; + +import java.util.HashMap; +import java.util.Map; + +public class ClientInstanceIdsImpl implements ClientInstanceIds { +private final Map consumerInstanceIds = new HashMap<>(); +private final Map producerInstanceIds = new HashMap<>(); +private Uuid adminInstanceId; + +public void addConsumerInstanceId(final String key, final Uuid instanceId) { +consumerInstanceIds.put(key, instanceId); +} + +public void addProducerInstanceId(final String key, final Uuid instanceId) { +producerInstanceIds.put(key, instanceId); +} + +p
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1412009752 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1794,74 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return the internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalStateException If {@code KafkaStreams} is not running. + * @throws TimeoutException Indicates that a request timed out. + */ +public ClientInstanceIds clientInstanceIds(final Duration timeout) { +if (state().hasNotStarted()) { +throw new IllegalStateException("KafkaStreams has not been started, you can retry after calling start()."); +} +if (state().isShuttingDown() || state.hasCompletedShutdown()) { +throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); +} + +final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl(); + +final Map> streamThreadFutures = new HashMap<>(); +for (final StreamThread streamThread : threads) { + streamThreadFutures.putAll(streamThread.clientInstanceIds(timeout)); +} + +KafkaFuture globalThreadFuture = null; +if (globalStreamThread != null) { +globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout); +} + +try { + clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout)); +} catch (final TimeoutException timeoutException) { +log.warn("Could not get admin client-instance-id due to timeout."); +} + +for (final Map.Entry> streamThreadFuture : streamThreadFutures.entrySet()) { +try { +clientInstanceIds.addConsumerInstanceId( +streamThreadFuture.getKey(), +streamThreadFuture.getValue().get() +); +} catch (final ExecutionException exception) { +if (exception.getCause() instanceof TimeoutException) { +log.warn("Could not get global consumer client-instance-id due to timeout."); +} else { +log.error("Could not get global consumer client-instance-id", exception); +} +} catch (final InterruptedException error) { +log.error("Could not get global consumer client-instance-id", error); +} +} + +if (globalThreadFuture != null) { +try { + clientInstanceIds.addConsumerInstanceId(globalStreamThread.getName(), globalThreadFuture.get()); +} catch (final ExecutionException exception) { +if (exception.getCause() instanceof TimeoutException) { +log.warn("Could not get global consumer client-instance-id due to timeout."); +} else { +log.error("Could not get global consumer client-instance-id", exception); +} +} catch (final InterruptedException error) { +log.error("Could not get global consumer client-instance-id", error); +} +} + +return clientInstanceIds; Review Comment: Does it buy us much? -- I actually would like to prefer somewhat better error handing, and better error messages. Did a larger rewrite of this. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
lucasbru commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1410975879 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1794,74 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return the internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalStateException If {@code KafkaStreams} is not running. + * @throws TimeoutException Indicates that a request timed out. + */ +public ClientInstanceIds clientInstanceIds(final Duration timeout) { +if (state().hasNotStarted()) { +throw new IllegalStateException("KafkaStreams has not been started, you can retry after calling start()."); +} +if (state().isShuttingDown() || state.hasCompletedShutdown()) { +throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); +} + +final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl(); + +final Map> streamThreadFutures = new HashMap<>(); +for (final StreamThread streamThread : threads) { + streamThreadFutures.putAll(streamThread.clientInstanceIds(timeout)); +} + +KafkaFuture globalThreadFuture = null; +if (globalStreamThread != null) { +globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout); +} + +try { + clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout)); +} catch (final TimeoutException timeoutException) { +log.warn("Could not get admin client-instance-id due to timeout."); +} + +for (final Map.Entry> streamThreadFuture : streamThreadFutures.entrySet()) { +try { +clientInstanceIds.addConsumerInstanceId( +streamThreadFuture.getKey(), +streamThreadFuture.getValue().get() +); +} catch (final ExecutionException exception) { +if (exception.getCause() instanceof TimeoutException) { +log.warn("Could not get global consumer client-instance-id due to timeout."); +} else { +log.error("Could not get global consumer client-instance-id", exception); +} +} catch (final InterruptedException error) { +log.error("Could not get global consumer client-instance-id", error); +} +} + +if (globalThreadFuture != null) { +try { + clientInstanceIds.addConsumerInstanceId(globalStreamThread.getName(), globalThreadFuture.get()); +} catch (final ExecutionException exception) { +if (exception.getCause() instanceof TimeoutException) { +log.warn("Could not get global consumer client-instance-id due to timeout."); +} else { +log.error("Could not get global consumer client-instance-id", exception); +} +} catch (final InterruptedException error) { +log.error("Could not get global consumer client-instance-id", error); +} +} + +return clientInstanceIds; Review Comment: You could consider using `KafkaFuture.allOf(...).get(duration)` for the futures of all clients, so apply the same timeout to all futures. It will throw on the first exception that is returned. It will return void, but you can inspect the original futures for the results, if `get` doesn't throw. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1410260721 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -454,4 +480,19 @@ public void shutdown() { public Map consumerMetrics() { return Collections.unmodifiableMap(globalConsumer.metrics()); } + +public KafkaFuture globalConsumerInstanceId(final Duration timeout) { Review Comment: Frankly, given that `fetchDeadline` might be modified (and pushed into the future) by a second call to `KafkaStreams#clientInstanceIds(...)` while the first was not completed yet, it seems we would need `synchronized` in addition? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1410258175 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1794,52 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return the internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalStateException If {@code KafkaStreams} is not running. + * @throws TimeoutException Indicates that a request timed out. + */ +public ClientInstanceIds clientInstanceIds(final Duration timeout) { +if (state().hasNotStarted()) { +throw new IllegalStateException("KafkaStreams has not been started, you can retry after calling start()."); +} +if (state().isShuttingDown() || state.hasCompletedShutdown()) { +throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); +} + +final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl(); + +KafkaFuture globalThreadFuture = null; +if (globalStreamThread != null) { +globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout); Review Comment: I like to think so :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on PR #14864: URL: https://github.com/apache/kafka/pull/14864#issuecomment-1833242672 @AndrewJSchofield -- updated this PR to cover more cases. Still not complete, but more review input is welcome. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
AndrewJSchofield commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1409822776 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -454,4 +480,19 @@ public void shutdown() { public Map consumerMetrics() { return Collections.unmodifiableMap(globalConsumer.metrics()); } + +public KafkaFuture globalConsumerInstanceId(final Duration timeout) { Review Comment: Yes, I saw the `volatile`. I agree that the ordering is sufficient to make it safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
AndrewJSchofield commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1409821677 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -310,6 +317,25 @@ public void run() { cache.resize(size); } stateConsumer.pollAndUpdate(); + +if (fetchDeadline != -1) { +if (fetchDeadline > time.milliseconds()) { +try { +globalConsumerClientInstanceId = globalConsumer.clientInstanceId(Duration.ZERO); + clientInstanceIdFuture.complete(globalConsumerClientInstanceId); +fetchDeadline = -1; +} catch (final TimeoutException swallow) { +// swallow Review Comment: Got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
AndrewJSchofield commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1409821009 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -310,6 +317,25 @@ public void run() { cache.resize(size); } stateConsumer.pollAndUpdate(); + +if (fetchDeadline != -1) { +if (fetchDeadline > time.milliseconds()) { +try { +globalConsumerClientInstanceId = globalConsumer.clientInstanceId(Duration.ZERO); Review Comment: Comments :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
AndrewJSchofield commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1409819732 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1794,52 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return the internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalStateException If {@code KafkaStreams} is not running. + * @throws TimeoutException Indicates that a request timed out. + */ +public ClientInstanceIds clientInstanceIds(final Duration timeout) { +if (state().hasNotStarted()) { +throw new IllegalStateException("KafkaStreams has not been started, you can retry after calling start()."); +} +if (state().isShuttingDown() || state.hasCompletedShutdown()) { +throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); +} + +final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl(); + +KafkaFuture globalThreadFuture = null; +if (globalStreamThread != null) { +globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout); Review Comment: OK, it sounds like you have a plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1409794850 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -310,6 +317,25 @@ public void run() { cache.resize(size); } stateConsumer.pollAndUpdate(); + +if (fetchDeadline != -1) { +if (fetchDeadline > time.milliseconds()) { +try { +globalConsumerClientInstanceId = globalConsumer.clientInstanceId(Duration.ZERO); + clientInstanceIdFuture.complete(globalConsumerClientInstanceId); +fetchDeadline = -1; +} catch (final TimeoutException swallow) { +// swallow +} catch (final Exception error) { + clientInstanceIdFuture.completeExceptionally(error); +fetchDeadline = -1; Review Comment: Yes. If the user calls `KafkaStreams#clientInstanceIds()` again, we would set a new fetch deadline -- if `fetchDeadline == -1` it means "nothing to be done", ie, no call to `KafkaStreams#clientInstanceIds()` was one / is in-flight waiting for completion. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -310,6 +317,25 @@ public void run() { cache.resize(size); } stateConsumer.pollAndUpdate(); + +if (fetchDeadline != -1) { +if (fetchDeadline > time.milliseconds()) { +try { +globalConsumerClientInstanceId = globalConsumer.clientInstanceId(Duration.ZERO); + clientInstanceIdFuture.complete(globalConsumerClientInstanceId); +fetchDeadline = -1; +} catch (final TimeoutException swallow) { +// swallow Review Comment: Yes, this happens in the `else` of `if (fetchDeadline > time.milliseconds()) {` (from above) further below. ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1794,52 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return the internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalStateException If {@code KafkaStreams} is not running. + * @throws TimeoutException Indicates that a request timed out. + */ +public ClientInstanceIds clientInstanceIds(final Duration timeout) { +if (state().hasNotStarted()) { +throw new IllegalStateException("KafkaStreams has not been started, you can retry after calling start()."); +} +if (state().isShuttingDown() || state.hasCompletedShutdown()) { +throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); +} + +final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl(); + +KafkaFuture globalThreadFuture = null; +if (globalStreamThread != null) { +globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout); Review Comment: How strict we can obey the given `timeout` is somewhat tricky, given that we need to call `clientInstanceId()` for each client we have. -- The idea was to basically "fan-out" all these calls and to them in parallel (note that `globalConsumerInstanceId` will return immediately and not block, but hand the execution from the user thread to the `GlobalStreamThread`; that's why we return a Future) -- thus it should be ok to provide the same timeout to each call (as all of them are done in parallel)? If you have any good suggestion how it could be done better, let me know. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -310,6 +317,25 @@ public void run() { cache.resize(size); } stateConsumer.pollAndUpdate(); + +if (fetchDeadline != -1) { +if (fetchDeadline > time.milliseconds()) { +try { +globalConsumerClientInstanceId = globalConsumer.clientInstanceId(Duration.ZERO); Review Comment: Yes, but this was intentionally. The `GlobalStreamThread` does this call "on the side", and thus the idea is to just call it with no timeout to just trigger the background RPC and not block the thread from doing its actually work at all. -- There won't be a busy wait, because the global thread will do other useful work in the meantime befor
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1409030872 ## streams/src/main/java/org/apache/kafka/streams/ClientInstanceIds.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.common.Uuid; + +import java.util.Map; + +/** + * Encapsulates the {@code client instance id} used for metrics collection by + * producers, consumers, and the admin client used by Kafka Streams. + */ +public interface ClientInstanceIds { Review Comment: I updated the KIP and changed this from `class` to `interface`. ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1794,56 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return the internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalStateException If {@code KafkaStreams} is not running. Review Comment: I updated the KIP and change this to `IllegalStateException` (it does not make sense to throw an sub-class of `InvalidStateStoreException` and other methods on `KafkaStreams` also use `IllegalStateException`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
AndrewJSchofield commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1409185585 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1794,52 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return the internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalStateException If {@code KafkaStreams} is not running. + * @throws TimeoutException Indicates that a request timed out. + */ +public ClientInstanceIds clientInstanceIds(final Duration timeout) { +if (state().hasNotStarted()) { +throw new IllegalStateException("KafkaStreams has not been started, you can retry after calling start()."); +} +if (state().isShuttingDown() || state.hasCompletedShutdown()) { +throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); +} + +final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl(); + +KafkaFuture globalThreadFuture = null; +if (globalStreamThread != null) { +globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout); +} + +try { + clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout)); +} catch (final TimeoutException timeoutException) { +log.warn("Could not get admin client-instance-id due to timeout."); Review Comment: I'd use `client instance id` personally. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -310,6 +317,25 @@ public void run() { cache.resize(size); } stateConsumer.pollAndUpdate(); + +if (fetchDeadline != -1) { +if (fetchDeadline > time.milliseconds()) { +try { +globalConsumerClientInstanceId = globalConsumer.clientInstanceId(Duration.ZERO); + clientInstanceIdFuture.complete(globalConsumerClientInstanceId); +fetchDeadline = -1; +} catch (final TimeoutException swallow) { +// swallow +} catch (final Exception error) { + clientInstanceIdFuture.completeExceptionally(error); +fetchDeadline = -1; Review Comment: This resets essentially so that it could in principle try again in future. Just an observation again. Maybe that's what you want to do. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java: ## @@ -296,6 +302,63 @@ public Set partitions() { assertFalse(new File(baseDirectoryName + File.separator + "testAppId" + File.separator + "global").exists()); } +@Test +public void shouldGetGlobalConsumerClientInstanceId() throws Exception { +initializeConsumer(); +startAndSwallowError(); + +final Uuid instanceId = Uuid.randomUuid(); +mockConsumer.setClientInstanceId(instanceId, Duration.ofMillis(1L)); + +try { +final KafkaFuture future = globalStreamThread.globalConsumerInstanceId(Duration.ofMillis(2L)); +final Uuid result = future.get(); + +assertThat(result, equalTo(instanceId)); +} finally { +globalStreamThread.shutdown(); +globalStreamThread.join(); + Review Comment: Extraneous blank line :) ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -454,4 +480,19 @@ public void shutdown() { public Map consumerMetrics() { return Collections.unmodifiableMap(globalConsumer.metrics()); } + +public KafkaFuture globalConsumerInstanceId(final Duration timeout) { Review Comment: What are the threading rules here? Might we end up with the `clientInstanceId` and `fetchDeadline` being changed surprisingly by multiple calls to this method. ## streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, softw