lucasbru commented on code in PR #18233:
URL: https://github.com/apache/kafka/pull/18233#discussion_r1952422436
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java:
##########
@@ -168,6 +169,15 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final
KStream<K, V1> lhs,
);
}
+ if (userProvidedBaseStoreName == null) {
+ addInternalResourceName(thisWindowStore);
+ addInternalResourceName(otherWindowStore);
+ if (outerJoinWindowStore.isPresent()) {
+ addInternalResourceName(outerJoinWindowStore.get());
+ }
+
+ }
Review Comment:
```suggestion
}
}
```
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1051,76 +1063,15 @@ public <VTable, VOut> KStream<K, VOut> leftJoin(final
KTable<K, VTable> table,
final KStreamImpl<K, V> thisStreamRepartitioned =
repartitionForJoin(
name != null ? name : this.name,
joinedInternal.keySerde(),
- joinedInternal.leftValueSerde()
+ joinedInternal.leftValueSerde(),
+ name != null
);
return thisStreamRepartitioned.doStreamTableJoin(table, joiner,
joinedInternal, true);
} else {
return doStreamTableJoin(table, joiner, joinedInternal, true);
}
}
- @SuppressWarnings({"unchecked", "resource"})
Review Comment:
Why did you move this method? It seems Matthias moved it on purpose and you
are reverting his change here?
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -592,6 +592,11 @@ public class StreamsConfig extends AbstractConfig {
public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable
pushing of internal client metrics for (main, restore, and global) consumers,
producers, and admin clients." +
" The cluster must have a client metrics subscription which
corresponds to a client.";
+ /** {@code ensure.explicit.internal.resource.naming} */
+ public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG
= "ensure.explicit.internal.resource.naming";
+ public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC =
"Whether to enforce explicit naming for all internal resources of the topology,
including internal " +
+ " topics (e.g., changelog and repartition topics) and their associated
state stores." +
+ " When enabled, the application will refuse to start if any internal
resource has an auto-generated name.";
/** {@code log.summary.interval.ms} */
Review Comment:
nit: missing newline above javadoc
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -592,6 +592,11 @@ public class StreamsConfig extends AbstractConfig {
public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable
pushing of internal client metrics for (main, restore, and global) consumers,
producers, and admin clients." +
" The cluster must have a client metrics subscription which
corresponds to a client.";
+ /** {@code ensure.explicit.internal.resource.naming} */
+ public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG
= "ensure.explicit.internal.resource.naming";
+ public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC =
"Whether to enforce explicit naming for all internal resources of the topology,
including internal " +
Review Comment:
```suggestion
public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC
= "Whether to enforce explicit naming for all internal resources of the
topology, including internal" +
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -2289,4 +2298,45 @@ public <KIn, VIn, KOut, VOut>
WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wra
processorWrapper.wrapProcessorSupplier(name, processorSupplier)
);
}
+
+ public void addImplicitInternalNames(final InternalResourcesNaming
internalResourcesNaming) {
+ implicitInternalNames.add(internalResourcesNaming);
+ }
+
+ public void checkUnprovidedNames() {
+ if (!implicitInternalNames.isEmpty()) {
+ final StringBuilder result = new StringBuilder();
+ final List<String> changelogTopics = new ArrayList<>();
+ final List<String> stateStores = new ArrayList<>();
+ final List<String> repartitionTopics = new ArrayList<>();
+ for (final InternalResourcesNaming internalResourcesNaming :
implicitInternalNames) {
+ if (!Utils.isBlank(internalResourcesNaming.changelogTopic())) {
+
changelogTopics.add(internalResourcesNaming.changelogTopic());
+ }
+ if (!Utils.isBlank(internalResourcesNaming.stateStore())) {
+ stateStores.add(internalResourcesNaming.stateStore());
+ }
+ if
(!Utils.isBlank(internalResourcesNaming.repartitionTopic())) {
+
repartitionTopics.add(internalResourcesNaming.repartitionTopic());
+ }
+ }
+ if (!changelogTopics.isEmpty()) {
+ result.append(String.format("Following changelog topic(s) has
not been named: %s%n", String.join(", ", changelogTopics)));
+ }
+ if (!stateStores.isEmpty()) {
+ result.append(String.format("Following state store(s) has not
been named: %s%n", String.join(", ", stateStores)));
+ }
+ if (!repartitionTopics.isEmpty()) {
+ result.append(String.format("Following repartition topic(s)
has not been named: %s%n", String.join(", ", repartitionTopics)));
+ }
+ if (ensureExplicitInternalResourceNaming) {
+ throw new TopologyException(result.toString());
+ } else {
+ log.warn("Enforce explicit naming for all internal resources
is set to false. If you want" +
Review Comment:
I'm not sure I'm mega happy with this log message, as every user trying a
basic example of Kafka Streams will be confronted with it. But I see it was
part of the KIP and this proposal was accepted. I added a little note to the
VOTE thread, to point out that this log was part of the KIP - I had missed it.
But I'm okay with going forward with the KIP as accepted if nobody shares my
concerns.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -64,12 +66,15 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import static
org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
import static
org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
+
Review Comment:
```suggestion
```
##########
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##########
@@ -384,7 +384,7 @@ public synchronized <K, V> GlobalKTable<K, V>
globalTable(final String topic,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>
materializedInternal =
new MaterializedInternal<>(
- Materialized.with(consumedInternal.keySerde(),
consumedInternal.valueSerde()),
+ Materialized.<K, V, KeyValueStore<Bytes,
byte[]>>with(consumedInternal.keySerde(),
consumedInternal.valueSerde()).withLoggingDisabled(),
Review Comment:
Ah, yes. Makes sense.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]