Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2024-03-06 Thread via GitHub


github-actions[bot] commented on PR #14951:
URL: https://github.com/apache/kafka/pull/14951#issuecomment-1982287149

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-19 Thread via GitHub


mjsax closed pull request #14864: KAFKA-15662: Add support for 
clientInstanceIds in Kafka Stream
URL: https://github.com/apache/kafka/pull/14864


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-19 Thread via GitHub


mjsax commented on PR #14864:
URL: https://github.com/apache/kafka/pull/14864#issuecomment-1862393058

   We split this PR into multiple smaller ones. Closing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-12 Thread via GitHub


mjsax commented on PR #14936:
URL: https://github.com/apache/kafka/pull/14936#issuecomment-1852434603

   @stanislavkozlovski There is no 3.7 branch yet, so I merged this one. Must 
go into 3.7 release (is ready for days, but Jenkins did not cooperate...) -- If 
your cut does not include it, I'll cherry-pick to 3.7 later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-12 Thread via GitHub


mjsax merged PR #14936:
URL: https://github.com/apache/kafka/pull/14936


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-11 Thread via GitHub


mjsax merged PR #14948:
URL: https://github.com/apache/kafka/pull/14948


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-08 Thread via GitHub


mjsax commented on code in PR #14936:
URL: https://github.com/apache/kafka/pull/14936#discussion_r1421112931


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -231,6 +240,69 @@ private void restoreTasks(final long now) {
 }
 }
 
+private void maybeGetClientInstanceIds() {
+if (fetchDeadlineClientInstanceId != -1) {
+if (!clientInstanceIdFuture.isDone()) {
+if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
+try {
+// if the state-updated thread has active work:
+//we pass in a timeout of zero into each 
`clientInstanceId()` call
+//to just trigger the "get instance id" 
background RPC;
+//we don't want to block the state updater 
thread that can do useful work in the meantime
+// otherwise, we pass in 100ms to avoid busy 
waiting
+clientInstanceIdFuture.complete(
+restoreConsumer.clientInstanceId(
+allWorkDone() ? Duration.ofMillis(100L) : 
Duration.ZERO

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-08 Thread via GitHub


mjsax merged PR #14935:
URL: https://github.com/apache/kafka/pull/14935


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-08 Thread via GitHub


lucasbru commented on code in PR #14936:
URL: https://github.com/apache/kafka/pull/14936#discussion_r1420236155


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -231,6 +240,69 @@ private void restoreTasks(final long now) {
 }
 }
 
+private void maybeGetClientInstanceIds() {
+if (fetchDeadlineClientInstanceId != -1) {
+if (!clientInstanceIdFuture.isDone()) {
+if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
+try {
+// if the state-updated thread has active work:
+//we pass in a timeout of zero into each 
`clientInstanceId()` call
+//to just trigger the "get instance id" 
background RPC;
+//we don't want to block the state updater 
thread that can do useful work in the meantime
+// otherwise, we pass in 100ms to avoid busy 
waiting
+clientInstanceIdFuture.complete(
+restoreConsumer.clientInstanceId(
+allWorkDone() ? Duration.ofMillis(100L) : 
Duration.ZERO

Review Comment:
   I think this will always be false here, because of 
`fetchDeadlineClientInstanceId`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-07 Thread via GitHub


mjsax commented on code in PR #14936:
URL: https://github.com/apache/kafka/pull/14936#discussion_r1419743348


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -772,6 +817,26 @@ public Set getStandbyTasks() {
 );
 }
 
+@Override
+public KafkaFutureImpl restoreConsumerInstanceId(final Duration 
timeout) {
+if (stateUpdaterThread.restoreConsumerInstanceIdFuture != null) {
+return stateUpdaterThread.restoreConsumerInstanceIdFuture;

Review Comment:
   I rewrote this and pushed into the state-updated thread directly (and let is 
align with other code). Hope this does the trick.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-07 Thread via GitHub


lucasbru commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1419615776


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -713,6 +723,44 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {
+// we pass in a timeout of zero into each `clientInstanceId()` call
+// to just trigger the "get instance id" background RPC;
+// we don't want to block the stream thread that can do useful work in 
the meantime
+
+if (fetchDeadlineClientInstanceId != -1) {
+if (!mainConsumerInstanceIdFuture.isDone()) {
+if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
+try {
+
mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO));
+maybeResetFetchDeadline();
+} catch (final IllegalStateException disabledError) {
+mainConsumerInstanceIdFuture.complete(null);

Review Comment:
   Sounds good. Yeah a comment will help here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-07 Thread via GitHub


mjsax commented on PR #14922:
URL: https://github.com/apache/kafka/pull/14922#issuecomment-1845908617

   Thanks. Will address the comment in a follow up PR and merge this as-is.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-07 Thread via GitHub


mjsax merged PR #14922:
URL: https://github.com/apache/kafka/pull/14922


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-07 Thread via GitHub


mjsax commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1419283998


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -713,6 +723,44 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {
+// we pass in a timeout of zero into each `clientInstanceId()` call
+// to just trigger the "get instance id" background RPC;
+// we don't want to block the stream thread that can do useful work in 
the meantime
+
+if (fetchDeadlineClientInstanceId != -1) {
+if (!mainConsumerInstanceIdFuture.isDone()) {
+if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
+try {
+
mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO));
+maybeResetFetchDeadline();
+} catch (final IllegalStateException disabledError) {
+mainConsumerInstanceIdFuture.complete(null);
+maybeResetFetchDeadline();
+} catch (final TimeoutException swallow) {
+// swallow
+} catch (final Exception error) {
+
mainConsumerInstanceIdFuture.completeExceptionally(error);
+maybeResetFetchDeadline();
+}
+} else {
+mainConsumerInstanceIdFuture.completeExceptionally(
+new TimeoutException("Could not retrieve main consumer 
client instance id.")
+);
+}
+}
+}
+}
+
+private void maybeResetFetchDeadline() {

Review Comment:
   It's not worth with this PR, but when we add other client is will. Keeping 
for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-07 Thread via GitHub


mjsax commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1419283420


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -713,6 +723,44 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {
+// we pass in a timeout of zero into each `clientInstanceId()` call
+// to just trigger the "get instance id" background RPC;
+// we don't want to block the stream thread that can do useful work in 
the meantime
+
+if (fetchDeadlineClientInstanceId != -1) {
+if (!mainConsumerInstanceIdFuture.isDone()) {
+if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
+try {
+
mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO));
+maybeResetFetchDeadline();
+} catch (final IllegalStateException disabledError) {
+mainConsumerInstanceIdFuture.complete(null);
+maybeResetFetchDeadline();
+} catch (final TimeoutException swallow) {
+// swallow
+} catch (final Exception error) {
+
mainConsumerInstanceIdFuture.completeExceptionally(error);
+maybeResetFetchDeadline();
+}
+} else {
+mainConsumerInstanceIdFuture.completeExceptionally(

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-07 Thread via GitHub


mjsax commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1419280528


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -713,6 +723,44 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {
+// we pass in a timeout of zero into each `clientInstanceId()` call
+// to just trigger the "get instance id" background RPC;
+// we don't want to block the stream thread that can do useful work in 
the meantime
+
+if (fetchDeadlineClientInstanceId != -1) {
+if (!mainConsumerInstanceIdFuture.isDone()) {
+if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
+try {
+
mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO));
+maybeResetFetchDeadline();
+} catch (final IllegalStateException disabledError) {
+mainConsumerInstanceIdFuture.complete(null);

Review Comment:
   ISE is throws if telemetry is disabled on the client -- in general, we 
assume that it's enabled for all clients, or disabled for all clients, but it 
could also be, that it's disabled for _some_ client only. For the last case, we 
want to swallow the error what happens here (on general, we say, a single error 
on any client fails the call to `KafkaStreams#clientInstanceIds()` but for this 
case we need to make an exception).
   
   Let me know if you agree or disagree. Will add a comment. (Seems 
`disabledError` as var name is not descriptive enough).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-07 Thread via GitHub


lucasbru commented on code in PR #14951:
URL: https://github.com/apache/kafka/pull/14951#discussion_r1418947853


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -716,6 +734,126 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {
+// we pass in a timeout of zero into each `clientInstanceId()` call
+// to just trigger the "get instance id" background RPC;
+// we don't want to block the stream thread that can do useful work in 
the meantime
+
+if (fetchDeadlineClientInstanceId != -1) {
+if 
(processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) {
+final Map activeTasks = 
taskManager.activeTaskMap();
+
+// setup task futures if necessary
+if (!producerInstanceIdFuture.isDone()) {
+if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
+// only active tasks have a producer (standby tasks 
don't)
+// producers are set up during task creation and thus 
all active tasks have a valid producer
+for (final Map.Entry task : 
activeTasks.entrySet()) {
+final TaskId taskId = task.getKey();
+KafkaFutureImpl future = 
taskProducersInstanceIdsFuture.get(taskId);
+if (future == null || 
future.isCompletedExceptionally()) {
+future = new KafkaFutureImpl<>();
+((StreamTask) 
task.getValue()).producerInstanceId = future;
+taskProducersInstanceIdsFuture.put(taskId, 
future);
+}
+}
+if (stateUpdaterEnabled) {

Review Comment:
   Dropping some ideas how to implement this with state updater:
   
- Add `clientInstanceIds` method to the `Task` interface, and also 
implement it in `ReadOnlyTask`.
- NOOP for standby tasks and of EOS_ALPHA is not enabled
- Since the producer is thread safe, you should then be able to call that 
function whether it's owned by the state updater or the stream thread.
- Use `tasks.activeTasks` or something similar to get a collection of _all_ 
tasks, including state updater tasks.
- Do it like it's defined here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-07 Thread via GitHub


lucasbru commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1418773892


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -713,6 +723,44 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {
+// we pass in a timeout of zero into each `clientInstanceId()` call
+// to just trigger the "get instance id" background RPC;
+// we don't want to block the stream thread that can do useful work in 
the meantime
+
+if (fetchDeadlineClientInstanceId != -1) {
+if (!mainConsumerInstanceIdFuture.isDone()) {
+if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
+try {
+
mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO));
+maybeResetFetchDeadline();
+} catch (final IllegalStateException disabledError) {
+mainConsumerInstanceIdFuture.complete(null);
+maybeResetFetchDeadline();
+} catch (final TimeoutException swallow) {
+// swallow
+} catch (final Exception error) {
+
mainConsumerInstanceIdFuture.completeExceptionally(error);
+maybeResetFetchDeadline();
+}
+} else {
+mainConsumerInstanceIdFuture.completeExceptionally(
+new TimeoutException("Could not retrieve main consumer 
client instance id.")
+);
+}
+}
+}
+}
+
+private void maybeResetFetchDeadline() {

Review Comment:
   nit: not sure the function is worth it, you seem to always call it when 
`reset` is true.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -713,6 +723,44 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {
+// we pass in a timeout of zero into each `clientInstanceId()` call
+// to just trigger the "get instance id" background RPC;
+// we don't want to block the stream thread that can do useful work in 
the meantime
+
+if (fetchDeadlineClientInstanceId != -1) {
+if (!mainConsumerInstanceIdFuture.isDone()) {
+if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
+try {
+
mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO));
+maybeResetFetchDeadline();
+} catch (final IllegalStateException disabledError) {
+mainConsumerInstanceIdFuture.complete(null);
+maybeResetFetchDeadline();
+} catch (final TimeoutException swallow) {
+// swallow
+} catch (final Exception error) {
+
mainConsumerInstanceIdFuture.completeExceptionally(error);
+maybeResetFetchDeadline();
+}
+} else {
+mainConsumerInstanceIdFuture.completeExceptionally(

Review Comment:
   reset fetch deadline as well? Otherwise we'll keep allocating this 
timeoutexception every time we go around the loop



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -713,6 +723,44 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {
+// we pass in a timeout of zero into each `clientInstanceId()` call
+// to just trigger the "get instance id" background RPC;
+// we don't want to block the stream thread that can do useful work in 
the meantime
+
+if (fetchDeadlineClientInstanceId != -1) {
+if (!mainConsumerInstanceIdFuture.isDone()) {
+if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
+try {
+
mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO));
+maybeResetFetchDeadline();
+} catch (final IllegalStateException disabledError) {
+mainConsumerInstanceIdFuture.complete(null);

Review Comment:
   why are we ignored ISE?



##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1813,17 +1816,29 @@ public ClientInstanceIds clientInstanceIds(final 
Duration timeout) {
 throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
 }
 
+long remainingTimeMs = timeout.toMillis();

Review Comment:
   This could be a good use 

Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-07 Thread via GitHub


lucasbru commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1418730263


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -713,6 +727,48 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {

Review Comment:
   hey, when I wrote that comment we still had a list of futures! Now it's not 
a problem anymore, agreed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


mjsax opened a new pull request, #14951:
URL: https://github.com/apache/kafka/pull/14951

   Stacked on other PRs. Still not completed. But early feedback is welcome.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


mjsax opened a new pull request, #14948:
URL: https://github.com/apache/kafka/pull/14948

   Part of KIP-714.
   
   Adds support to expose producer client instance id. This PR only adds 
support for thread producer and state-updater disabled.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


wcarlson5 commented on code in PR #14935:
URL: https://github.com/apache/kafka/pull/14935#discussion_r1418058902


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -1477,6 +1528,27 @@ public Object getStateLock() {
 return stateLock;
 }
 
+public Map> consumerClientInstanceIds(final 
Duration timeout) {

Review Comment:
   and here



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -454,4 +485,19 @@ public void shutdown() {
 public Map consumerMetrics() {
 return Collections.unmodifiableMap(globalConsumer.metrics());
 }
+
+public KafkaFuture globalConsumerInstanceId(final Duration timeout) {

Review Comment:
   Same comment here if you don't mind



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


wcarlson5 commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1418054625


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1802,7 +1805,7 @@ protected int processStreamThread(final 
Consumer consumer) {
  * @throws TimeoutException Indicates that a request timed out.
  * @throws StreamsException For any other error that might occur.
  */
-public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+public synchronized ClientInstanceIds clientInstanceIds(final Duration 
timeout) {

Review Comment:
   I think this should take care of most of the threading issues, but it does 
leave it easy to introduce bugs in the future



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -1477,6 +1528,27 @@ public Object getStateLock() {
 return stateLock;
 }
 
+public Map> consumerClientInstanceIds(final 
Duration timeout) {

Review Comment:
   Can we add a comment here that this isn't thread safe? Just for the next 
person who tries to use it an introduces a nasty race condition. The fact that 
it returns futures might make people think it is



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -713,6 +727,48 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {

Review Comment:
   That is actually okay as each time the call is made it overwrites 
`mainConsumerInstanceIdFuture` anyways so there is only ever one future to 
complete.
   
   Not that this isn't an issue but with the caller being synchronized it won't 
be for this feature



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


mjsax commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1417761864


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -1477,6 +1533,29 @@ public Object getStateLock() {
 return stateLock;
 }
 
+public Map> consumerClientInstanceIds(final 
Duration timeout) {
+final Map> result = new HashMap<>();
+
+synchronized (fetchDeadlines) {
+boolean addDeadline = false;

Review Comment:
   This PR itself does not need it yet (but we need it later when we add 
support for restore consumer and producer)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


lucasbru commented on code in PR #14936:
URL: https://github.com/apache/kafka/pull/14936#discussion_r1417082199


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -79,6 +84,10 @@ private class StateUpdaterThread extends Thread {
 
 private long totalCheckpointLatency = 0L;
 
+private volatile long fetchDeadline = -1L;

Review Comment:
   Same, rename please



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -231,6 +242,36 @@ private void restoreTasks(final long now) {
 }
 }
 
+private void maybeGetClientInstanceIds() {
+// we pass in a timeout of zero into each `clientInstanceId()` call
+// to just trigger the "get instance id" background RPC;
+// we don't want to block the state updater thread that can do 
useful work in the meantime
+
+if (fetchDeadline != -1) {
+if (!restoreConsumerInstanceIdFuture.isDone()) {
+if (fetchDeadline >= time.milliseconds()) {
+try {
+restoreConsumerClientInstanceId = 
restoreConsumer.clientInstanceId(Duration.ZERO);
+
restoreConsumerInstanceIdFuture.complete(restoreConsumerClientInstanceId);
+fetchDeadline = -1L;
+} catch (final IllegalStateException disabledError) {
+restoreConsumerInstanceIdFuture.complete(null);
+fetchDeadline = -1L;
+} catch (final TimeoutException swallow) {
+// swallow
+} catch (final Exception error) {
+
restoreConsumerInstanceIdFuture.completeExceptionally(error);
+fetchDeadline = -1L;
+}
+} else {
+restoreConsumerInstanceIdFuture.completeExceptionally(
+new TimeoutException("Could not retrieve main 
consumer client instance id.")

Review Comment:
   that's the restore consumer client instance id



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -772,6 +817,26 @@ public Set getStandbyTasks() {
 );
 }
 
+@Override
+public KafkaFutureImpl restoreConsumerInstanceId(final Duration 
timeout) {
+if (stateUpdaterThread.restoreConsumerInstanceIdFuture != null) {
+return stateUpdaterThread.restoreConsumerInstanceIdFuture;

Review Comment:
   I still want to update the fetch deadline before returning, right? Otherwise 
I'll timeout too early.
   
   Also, what if the existing future is completed with a timeout exception 
already?
   
   How about this:
- Update the `fetchDeadline` first in this function (max of new and current 
deadline)
- Do not complete the future with a timeout exception if the deadline 
expires, instead just set the future to null, deadline to -1.
- Use bounded-time `get` on the future in the application thread.
- When the current future completed with an error, also set the future to 
null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


lucasbru commented on PR #14922:
URL: https://github.com/apache/kafka/pull/14922#issuecomment-1842617271

   Ah, I think you are using fetch deadlines to avoid calling the RPC forever. 
That makes sense. But then, we can probably still just keep a single deadline 
(the max of all calls) and a single future, and let the application thread use 
a time-bounded get, and we avoid some issues I pointed out above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


lucasbru commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1417077988


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -713,6 +727,48 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {

Review Comment:
   I think there is a risk here that if the client calls `clientInstanceIds` 
faster than we are going around the poll loop, we will have an every-growing 
number of futures to complete, and in each iteration we can only complete one. 
We may also run out of time this way. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


lucasbru commented on code in PR #14935:
URL: https://github.com/apache/kafka/pull/14935#discussion_r1417064532


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -65,6 +69,9 @@ public class GlobalStreamThread extends Thread {
 private final AtomicLong cacheSize;
 private volatile StreamsException startupException;
 private java.util.function.Consumer 
streamsUncaughtExceptionHandler;
+private volatile Uuid globalConsumerClientInstanceId = null;
+private volatile long fetchDeadline = -1;

Review Comment:
   Same, give this a name that is related to clientInstanceId



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


lucasbru commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1417036886


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -1477,6 +1533,29 @@ public Object getStateLock() {
 return stateLock;
 }
 
+public Map> consumerClientInstanceIds(final 
Duration timeout) {
+final Map> result = new HashMap<>();
+
+synchronized (fetchDeadlines) {
+boolean addDeadline = false;

Review Comment:
   Why do we need this boolean?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -335,6 +342,11 @@ public boolean isStartingRunningOrPartitionAssigned() {
 private final boolean stateUpdaterEnabled;
 private final boolean processingThreadsEnabled;
 
+private volatile Uuid mainConsumerClientInstanceId = null;
+
+private final List fetchDeadlines = new LinkedList<>();

Review Comment:
   maybe rename. fetch deadlines for what?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -713,6 +727,48 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {
+synchronized (fetchDeadlines) {
+// we pass in a timeout of zero into each `clientInstanceId()` call
+// to just trigger the "get instance id" background RPC;
+// we don't want to block the stream thread that can do useful 
work in the meantime
+
+if (!fetchDeadlines.isEmpty()) {
+if (fetchDeadlines.get(0) >= time.milliseconds()) {
+try {
+mainConsumerClientInstanceId = 
mainConsumer.clientInstanceId(Duration.ZERO);
+mainConsumerInstanceIdFutures.forEach(f -> 
f.complete(mainConsumerClientInstanceId));

Review Comment:
   It seems I can remove all the fetch deadlines here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-05 Thread via GitHub


mjsax opened a new pull request, #14935:
URL: https://github.com/apache/kafka/pull/14935

   Part of KIP-714.
   
   Add support to collect client instance id of the global consumer.
   
   
   +++
   This PR is on top of https://github.com/apache/kafka/pull/14922
   
   Only second commit needs a review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-05 Thread via GitHub


mjsax merged PR #14908:
URL: https://github.com/apache/kafka/pull/14908


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-05 Thread via GitHub


mjsax commented on PR #14908:
URL: https://github.com/apache/kafka/pull/14908#issuecomment-1841550699

   Build issues:
   - Second run: `JDK 8 and Scala 2.12`
   - Third run: `JDK 17 and Scala 2.13`
   
   Failing KS tests pass locally (so they seems to be flaky only). Others are 
flaky/not related to this PR. Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-05 Thread via GitHub


wcarlson5 commented on PR #14908:
URL: https://github.com/apache/kafka/pull/14908#issuecomment-1841271853

   Looks like there is still a build issue :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-04 Thread via GitHub


mjsax opened a new pull request, #14922:
URL: https://github.com/apache/kafka/pull/14922

   This PR is on top of https://github.com/apache/kafka/pull/14908
   
   Only second commit to be reviewed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-03 Thread via GitHub


mjsax commented on PR #14864:
URL: https://github.com/apache/kafka/pull/14864#issuecomment-1837748113

   Split out some parts to make reviewing simpler as this PR grows to large: 
https://github.com/apache/kafka/pull/14908


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-03 Thread via GitHub


mjsax opened a new pull request, #14908:
URL: https://github.com/apache/kafka/pull/14908

   - Part of KIP-714
   - Add new configs and public API for Kafka Streams
   - Implement support to get admin client instance id


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-03 Thread via GitHub


mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1413296164


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##
@@ -136,8 +135,8 @@ StreamsProducer streamsProducerForTask(final TaskId taskId) 
{
 }
 
 StreamsProducer threadProducer() {
-if (processingMode != EXACTLY_ONCE_V2) {
-throw new IllegalStateException("Expected EXACTLY_ONCE_V2 to be 
enabled, but the processing mode was " + processingMode);
+if (processingMode == EXACTLY_ONCE_ALPHA) {

Review Comment:
   Yes. When this code was added, there was no reason to call this method for 
ALOS and to get the producer out -- only for EOSv2 it was required for error 
handling. Thus, the check was very restrictive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-01 Thread via GitHub


wcarlson5 commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1412696799


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -713,6 +735,97 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {
+// we pass in a timeout of zero into each `clientInstanceId()` call
+// to just trigger the "get instance id" background RPC;
+// we don't want to block the stream thread that can do useful work in 
the meantime

Review Comment:
   good comments :)



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -1477,6 +1590,67 @@ public Object getStateLock() {
 return stateLock;
 }
 
+public Map> consumerClientInstanceIds(final 
Duration timeout) {
+boolean setDeadline = false;
+
+final Map> result = new HashMap<>();
+
+KafkaFutureImpl future = new KafkaFutureImpl<>();
+if (mainConsumerClientInstanceId != null) {
+future.complete(mainConsumerClientInstanceId);
+} else {
+mainConsumerInstanceIdFuture = future;
+setDeadline = true;
+}
+result.put(getName() + "-consumer", future);
+
+future = new KafkaFutureImpl<>();
+if (restoreConsumerClientInstanceId != null) {
+future.complete(restoreConsumerClientInstanceId);
+} else {
+restoreConsumerInstanceIdFuture = future;
+setDeadline = true;
+}
+result.put(getName() + "-restore-consumer", future);
+
+if (setDeadline) {
+fetchDeadline = time.milliseconds() + timeout.toMillis();
+}
+
+return result;
+}
+
+public KafkaFuture>> 
producersClientInstanceIds(final Duration timeout) {
+final KafkaFutureImpl>> result = new 
KafkaFutureImpl<>();
+
+if 
(processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) {
+//for (final TaskId taskId : taskManager.activeTaskIds()) {
+//future = new KafkaFutureImpl<>();
+//if (taskProducersClientInstanceIds.get(taskId) != null) {
+//
future.complete(taskProducersClientInstanceIds.get(taskId));
+//} else {
+//taskProducersInstanceIdsFuture.put(taskId, future);
+//setDeadline = true;
+//}
+//result.put(getName() + "-" + taskId + "-producer", future);
+//};
+} else {
+final KafkaFutureImpl producerFuture = new 
KafkaFutureImpl<>();
+if (threadProducerClientInstanceId != null) {
+producerFuture.complete(threadProducerClientInstanceId);
+} else {
+threadProducerInstanceIdFuture = producerFuture;
+if (fetchDeadline == -1) {
+fetchDeadline = time.milliseconds() + timeout.toMillis();
+}
+}
+
+result.complete(Collections.singletonMap(getName() + "-producer", 
producerFuture));

Review Comment:
   Could we be lazy and just treat any unavailable tasks as if they were set up 
with telemetry is disabled on the client itself?
   
   Not ideal but should keep things simple. Otherwise we might timeout all the 
time with restoring tasks



##
streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.streams.ClientInstanceIds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClientInstanceIdsImpl implements ClientInstanceIds {
+private final Map consumerInstanceIds = new HashMap<>();
+private final Map producerInstanceIds = new HashMap<>();
+private Uuid adminInstanceId;
+
+public void addConsumerInstanceId(final String key, final Uuid instanceId) 
{
+consumerInstanceIds.put(key, instanceId);
+}
+
+public void 

Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-01 Thread via GitHub


mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1412011082


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1795,146 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return The internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalArgumentException If {@code timeout} is negative.

Review Comment:
   Added this (in alignment to `consumer/producer/admin$clientInstanceId()` -- 
KIP needs to be updated accordingly



##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1795,146 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return The internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalArgumentException If {@code timeout} is negative.
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ * @throws StreamsException For any other error that might occur.
+ */
+public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+if (timeout.isNegative()) {
+throw new IllegalArgumentException("The timeout cannot be 
negative.");
+}
+if (state().hasNotStarted()) {
+throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+}
+if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+}
+
+final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+// (1) fan-out calls to threads
+
+// StreamThread for main/restore consumers and producer(s)
+final Map> consumerFutures = new HashMap<>();
+final Map>>> 
producerFutures = new HashMap<>();
+for (final StreamThread streamThread : threads) {
+
consumerFutures.putAll(streamThread.consumerClientInstanceIds(timeout));
+producerFutures.put(streamThread.getName(), 
streamThread.producersClientInstanceIds(timeout));
+}
+// GlobalThread
+KafkaFuture globalThreadFuture = null;
+if (globalStreamThread != null) {
+globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);
+}
+
+// (2) get admin client instance id in a blocking fashion, while 
Stream/GlobalThreads work in parallel
+try {
+
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
+} catch (final IllegalStateException telemetryDisabledError) {

Review Comment:
   This is new, base on other PRs from KIP-714 -- 
`adminClient.clientInstanceId` throw is telemetry is disable -- for this case, 
we might not want to throw, but return a "partial" result...



##
streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.streams.ClientInstanceIds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClientInstanceIdsImpl implements ClientInstanceIds {
+private final Map consumerInstanceIds = new HashMap<>();
+private final Map producerInstanceIds = new HashMap<>();
+private Uuid adminInstanceId;
+
+public void addConsumerInstanceId(final String key, final Uuid instanceId) 
{
+consumerInstanceIds.put(key, instanceId);
+}
+
+public void addProducerInstanceId(final String key, final Uuid instanceId) 
{
+producerInstanceIds.put(key, instanceId);
+}
+
+

Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-01 Thread via GitHub


mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1412009752


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1794,74 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ */
+public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+if (state().hasNotStarted()) {
+throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+}
+if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+}
+
+final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+final Map> streamThreadFutures = new 
HashMap<>();
+for (final StreamThread streamThread : threads) {
+
streamThreadFutures.putAll(streamThread.clientInstanceIds(timeout));
+}
+
+KafkaFuture globalThreadFuture = null;
+if (globalStreamThread != null) {
+globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);
+}
+
+try {
+
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
+} catch (final TimeoutException timeoutException) {
+log.warn("Could not get admin client-instance-id due to timeout.");
+}
+
+for (final Map.Entry> streamThreadFuture : 
streamThreadFutures.entrySet()) {
+try {
+clientInstanceIds.addConsumerInstanceId(
+streamThreadFuture.getKey(),
+streamThreadFuture.getValue().get()
+);
+} catch (final ExecutionException exception) {
+if (exception.getCause() instanceof TimeoutException) {
+log.warn("Could not get global consumer client-instance-id 
due to timeout.");
+} else {
+log.error("Could not get global consumer 
client-instance-id", exception);
+}
+} catch (final InterruptedException error) {
+log.error("Could not get global consumer client-instance-id", 
error);
+}
+}
+
+if (globalThreadFuture != null) {
+try {
+
clientInstanceIds.addConsumerInstanceId(globalStreamThread.getName(), 
globalThreadFuture.get());
+} catch (final ExecutionException exception) {
+if (exception.getCause() instanceof TimeoutException) {
+log.warn("Could not get global consumer client-instance-id 
due to timeout.");
+} else {
+log.error("Could not get global consumer 
client-instance-id", exception);
+}
+} catch (final InterruptedException error) {
+log.error("Could not get global consumer client-instance-id", 
error);
+}
+}
+
+return clientInstanceIds;

Review Comment:
   Does it buy us much? -- I actually would like to prefer somewhat better 
error handing, and better error messages. Did a larger rewrite of this. Let me 
know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-30 Thread via GitHub


lucasbru commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1410975879


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1794,74 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ */
+public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+if (state().hasNotStarted()) {
+throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+}
+if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+}
+
+final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+final Map> streamThreadFutures = new 
HashMap<>();
+for (final StreamThread streamThread : threads) {
+
streamThreadFutures.putAll(streamThread.clientInstanceIds(timeout));
+}
+
+KafkaFuture globalThreadFuture = null;
+if (globalStreamThread != null) {
+globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);
+}
+
+try {
+
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
+} catch (final TimeoutException timeoutException) {
+log.warn("Could not get admin client-instance-id due to timeout.");
+}
+
+for (final Map.Entry> streamThreadFuture : 
streamThreadFutures.entrySet()) {
+try {
+clientInstanceIds.addConsumerInstanceId(
+streamThreadFuture.getKey(),
+streamThreadFuture.getValue().get()
+);
+} catch (final ExecutionException exception) {
+if (exception.getCause() instanceof TimeoutException) {
+log.warn("Could not get global consumer client-instance-id 
due to timeout.");
+} else {
+log.error("Could not get global consumer 
client-instance-id", exception);
+}
+} catch (final InterruptedException error) {
+log.error("Could not get global consumer client-instance-id", 
error);
+}
+}
+
+if (globalThreadFuture != null) {
+try {
+
clientInstanceIds.addConsumerInstanceId(globalStreamThread.getName(), 
globalThreadFuture.get());
+} catch (final ExecutionException exception) {
+if (exception.getCause() instanceof TimeoutException) {
+log.warn("Could not get global consumer client-instance-id 
due to timeout.");
+} else {
+log.error("Could not get global consumer 
client-instance-id", exception);
+}
+} catch (final InterruptedException error) {
+log.error("Could not get global consumer client-instance-id", 
error);
+}
+}
+
+return clientInstanceIds;

Review Comment:
   You could consider using `KafkaFuture.allOf(...).get(duration)` for the 
futures of all clients, so apply the same timeout to all futures. It will throw 
on the first exception that is returned. It will return void, but you can 
inspect the original futures for the results, if `get` doesn't throw.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1410260721


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -454,4 +480,19 @@ public void shutdown() {
 public Map consumerMetrics() {
 return Collections.unmodifiableMap(globalConsumer.metrics());
 }
+
+public KafkaFuture globalConsumerInstanceId(final Duration timeout) {

Review Comment:
   Frankly, given that `fetchDeadline` might be modified (and pushed into the 
future) by a second call to `KafkaStreams#clientInstanceIds(...)` while the 
first was not completed yet, it seems we would need `synchronized` in addition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1410258175


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1794,52 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ */
+public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+if (state().hasNotStarted()) {
+throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+}
+if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+}
+
+final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+KafkaFuture globalThreadFuture = null;
+if (globalStreamThread != null) {
+globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);

Review Comment:
   I like to think so :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


mjsax commented on PR #14864:
URL: https://github.com/apache/kafka/pull/14864#issuecomment-1833242672

   @AndrewJSchofield -- updated this PR to cover more cases. Still not 
complete, but more review input is welcome.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


AndrewJSchofield commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409822776


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -454,4 +480,19 @@ public void shutdown() {
 public Map consumerMetrics() {
 return Collections.unmodifiableMap(globalConsumer.metrics());
 }
+
+public KafkaFuture globalConsumerInstanceId(final Duration timeout) {

Review Comment:
   Yes, I saw the `volatile`. I agree that the ordering is sufficient to make 
it safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


AndrewJSchofield commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409821677


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -310,6 +317,25 @@ public void run() {
 cache.resize(size);
 }
 stateConsumer.pollAndUpdate();
+
+if (fetchDeadline != -1) {
+if (fetchDeadline > time.milliseconds()) {
+try {
+globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);
+
clientInstanceIdFuture.complete(globalConsumerClientInstanceId);
+fetchDeadline = -1;
+} catch (final TimeoutException swallow) {
+// swallow

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


AndrewJSchofield commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409821009


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -310,6 +317,25 @@ public void run() {
 cache.resize(size);
 }
 stateConsumer.pollAndUpdate();
+
+if (fetchDeadline != -1) {
+if (fetchDeadline > time.milliseconds()) {
+try {
+globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);

Review Comment:
   Comments :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


AndrewJSchofield commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409819732


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1794,52 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ */
+public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+if (state().hasNotStarted()) {
+throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+}
+if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+}
+
+final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+KafkaFuture globalThreadFuture = null;
+if (globalStreamThread != null) {
+globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);

Review Comment:
   OK, it sounds like you have a plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409794850


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -310,6 +317,25 @@ public void run() {
 cache.resize(size);
 }
 stateConsumer.pollAndUpdate();
+
+if (fetchDeadline != -1) {
+if (fetchDeadline > time.milliseconds()) {
+try {
+globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);
+
clientInstanceIdFuture.complete(globalConsumerClientInstanceId);
+fetchDeadline = -1;
+} catch (final TimeoutException swallow) {
+// swallow
+} catch (final Exception error) {
+
clientInstanceIdFuture.completeExceptionally(error);
+fetchDeadline = -1;

Review Comment:
   Yes. If the user calls `KafkaStreams#clientInstanceIds()` again, we would 
set a new fetch deadline -- if `fetchDeadline == -1` it means "nothing to be 
done", ie, no call to `KafkaStreams#clientInstanceIds()` was one / is in-flight 
waiting for completion.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -310,6 +317,25 @@ public void run() {
 cache.resize(size);
 }
 stateConsumer.pollAndUpdate();
+
+if (fetchDeadline != -1) {
+if (fetchDeadline > time.milliseconds()) {
+try {
+globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);
+
clientInstanceIdFuture.complete(globalConsumerClientInstanceId);
+fetchDeadline = -1;
+} catch (final TimeoutException swallow) {
+// swallow

Review Comment:
   Yes, this happens in the `else` of `if (fetchDeadline > time.milliseconds()) 
{` (from above) further below.



##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1794,52 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ */
+public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+if (state().hasNotStarted()) {
+throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+}
+if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+}
+
+final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+KafkaFuture globalThreadFuture = null;
+if (globalStreamThread != null) {
+globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);

Review Comment:
   How strict we can obey the given `timeout` is somewhat tricky, given that we 
need to call `clientInstanceId()` for each client we have. -- The idea was to 
basically "fan-out" all these calls and to them in parallel (note that 
`globalConsumerInstanceId` will return immediately and not block, but hand the 
execution from the user thread to the `GlobalStreamThread`; that's why we 
return a Future) -- thus it should be ok to provide the same timeout to each 
call (as all of them are done in parallel)?
   
   If you have any good suggestion how it could be done better, let me know.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -310,6 +317,25 @@ public void run() {
 cache.resize(size);
 }
 stateConsumer.pollAndUpdate();
+
+if (fetchDeadline != -1) {
+if (fetchDeadline > time.milliseconds()) {
+try {
+globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);

Review Comment:
   Yes, but this was intentionally. The `GlobalStreamThread` does this call "on 
the side", and thus the idea is to just call it with no timeout to just trigger 
the background RPC and not block the thread from doing its actually work at 
all. -- There won't be a busy wait, because the global thread will do other 
useful work in the meantime 

Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409030872


##
streams/src/main/java/org/apache/kafka/streams/ClientInstanceIds.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Map;
+
+/**
+ * Encapsulates the {@code client instance id} used for metrics collection by
+ * producers, consumers, and the admin client used by Kafka Streams.
+ */
+public interface ClientInstanceIds {

Review Comment:
   I updated the KIP and changed this from `class` to `interface`.



##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1794,56 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.

Review Comment:
   I updated the KIP and change this to `IllegalStateException` (it does not 
make sense to throw an sub-class of `InvalidStateStoreException` and other 
methods on `KafkaStreams` also use `IllegalStateException`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


AndrewJSchofield commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409185585


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1794,52 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ */
+public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+if (state().hasNotStarted()) {
+throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+}
+if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+}
+
+final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+KafkaFuture globalThreadFuture = null;
+if (globalStreamThread != null) {
+globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);
+}
+
+try {
+
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
+} catch (final TimeoutException timeoutException) {
+log.warn("Could not get admin client-instance-id due to timeout.");

Review Comment:
   I'd use `client instance id` personally.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -310,6 +317,25 @@ public void run() {
 cache.resize(size);
 }
 stateConsumer.pollAndUpdate();
+
+if (fetchDeadline != -1) {
+if (fetchDeadline > time.milliseconds()) {
+try {
+globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);
+
clientInstanceIdFuture.complete(globalConsumerClientInstanceId);
+fetchDeadline = -1;
+} catch (final TimeoutException swallow) {
+// swallow
+} catch (final Exception error) {
+
clientInstanceIdFuture.completeExceptionally(error);
+fetchDeadline = -1;

Review Comment:
   This resets essentially so that it could in principle try again in future. 
Just an observation again. Maybe that's what you want to do.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java:
##
@@ -296,6 +302,63 @@ public Set partitions() {
 assertFalse(new File(baseDirectoryName + File.separator + "testAppId" 
+ File.separator + "global").exists());
 }
 
+@Test
+public void shouldGetGlobalConsumerClientInstanceId() throws Exception {
+initializeConsumer();
+startAndSwallowError();
+
+final Uuid instanceId = Uuid.randomUuid();
+mockConsumer.setClientInstanceId(instanceId, Duration.ofMillis(1L));
+
+try {
+final KafkaFuture future = 
globalStreamThread.globalConsumerInstanceId(Duration.ofMillis(2L));
+final Uuid result = future.get();
+
+assertThat(result, equalTo(instanceId));
+} finally {
+globalStreamThread.shutdown();
+globalStreamThread.join();
+

Review Comment:
   Extraneous blank line :)



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -454,4 +480,19 @@ public void shutdown() {
 public Map consumerMetrics() {
 return Collections.unmodifiableMap(globalConsumer.metrics());
 }
+
+public KafkaFuture globalConsumerInstanceId(final Duration timeout) {

Review Comment:
   What are the threading rules here? Might we end up with the 
`clientInstanceId` and `fetchDeadline` being changed surprisingly by multiple 
calls to this method.



##
streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, 

[PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-11-29 Thread via GitHub


mjsax opened a new pull request, #14864:
URL: https://github.com/apache/kafka/pull/14864

   This PR is still WIP, but wanted to open it on time to collect initial 
feedback.
   
   So far, I only added support for global-consumer client-instance-id.
   
   Part of KIP-714


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org