guozhangwang commented on a change in pull request #11813:
URL: https://github.com/apache/kafka/pull/11813#discussion_r815537908
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -100,7 +100,7 @@ public void start(final NamedTopology initialTopology) {
/**
* Start up Streams with a collection of initial NamedTopologies (may be
empty)
*/
- public void start(final Collection<NamedTopology> initialTopologies) {
+ public synchronized void start(final Collection<NamedTopology>
initialTopologies) {
Review comment:
I took some time to understand why we want to synchronize here, as at
the first sight it looks a bit unclear to me:
/* means caller -> callee */
inherited.start: synchronized, public ->
addNamedTopology: unsynchronized, public, register topology metadata ->
completedFutureForUnstartedApp: synchronized, private, check state
removeNamedTopology: unsynchronized, public, unregister metadata topology ->
completedFutureForUnstartedApp: synchronized, private, check state
Register/unregister topology metadata is synchronized, and parent.start
would modify state.
I think I understand now that it's because `addNamedTopology` is not
synchronized, plus when we have multiple named topology we want to keep the
state unchanged while adding them one-by-one. Is that the case? If yes maybe
it's better to add such reasoning in the javadoc above.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -230,53 +233,72 @@ public RemoveNamedTopologyResult
removeNamedTopology(final String topologyToRemo
topologyMetadata.unregisterTopology(removeTopologyFuture,
topologyToRemove);
- if (resetOffsets) {
+ if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing
topology") && resetOffsets) {
log.info("Resetting offsets for the following partitions of {}
removed NamedTopology {}: {}",
removeTopologyFuture.isCompletedExceptionally() ?
"unsuccessfully" : "successfully",
topologyToRemove, partitionsToReset
);
- if (!partitionsToReset.isEmpty()) {
- removeTopologyFuture.whenComplete((v, throwable) -> {
- if (throwable != null) {
- removeTopologyFuture.completeExceptionally(throwable);
- }
- DeleteConsumerGroupOffsetsResult deleteOffsetsResult =
null;
- while (deleteOffsetsResult == null) {
- try {
- deleteOffsetsResult =
adminClient.deleteConsumerGroupOffsets(
-
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
partitionsToReset);
- deleteOffsetsResult.all().get();
- } catch (final InterruptedException ex) {
+ resetOffsets(removeTopologyFuture, partitionsToReset);
+ }
+ return new RemoveNamedTopologyResult(removeTopologyFuture);
+ }
+
+ /**
+ * @return true iff the application is still in CREATED and the future
was completed
+ */
+ private synchronized boolean completedFutureForUnstartedApp(final
KafkaFutureImpl<Void> updateTopologyFuture,
+ final String
operation) {
+ if (state == State.CREATED && !updateTopologyFuture.isDone()) {
+ updateTopologyFuture.complete(null);
+ log.info("Completed {} since application has not been started",
operation);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl<Void>
removeTopologyFuture,
Review comment:
I'm assuming this is just extracting the inlined function and hence
skipped and did not compare line by line :)
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -230,53 +233,72 @@ public RemoveNamedTopologyResult
removeNamedTopology(final String topologyToRemo
topologyMetadata.unregisterTopology(removeTopologyFuture,
topologyToRemove);
- if (resetOffsets) {
+ if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing
topology") && resetOffsets) {
Review comment:
nit: how about put `resetOffsets` as the first condition so that if it's
false, we would skip the synchronized function (not sure if JIT would really be
able to optimize this way)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]