lucasbru commented on code in PR #18233:
URL: https://github.com/apache/kafka/pull/18233#discussion_r1965533413
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -2289,4 +2298,45 @@ public <KIn, VIn, KOut, VOut>
WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wra
processorWrapper.wrapProcessorSupplier(name, processorSupplier)
);
}
+
+ public void addImplicitInternalNames(final InternalResourcesNaming
internalResourcesNaming) {
+ implicitInternalNames.add(internalResourcesNaming);
+ }
+
+ public void checkUnprovidedNames() {
+ if (!implicitInternalNames.isEmpty()) {
+ final StringBuilder result = new StringBuilder();
+ final List<String> changelogTopics = new ArrayList<>();
+ final List<String> stateStores = new ArrayList<>();
+ final List<String> repartitionTopics = new ArrayList<>();
+ for (final InternalResourcesNaming internalResourcesNaming :
implicitInternalNames) {
+ if (!Utils.isBlank(internalResourcesNaming.changelogTopic())) {
+
changelogTopics.add(internalResourcesNaming.changelogTopic());
+ }
+ if (!Utils.isBlank(internalResourcesNaming.stateStore())) {
+ stateStores.add(internalResourcesNaming.stateStore());
+ }
+ if
(!Utils.isBlank(internalResourcesNaming.repartitionTopic())) {
+
repartitionTopics.add(internalResourcesNaming.repartitionTopic());
+ }
+ }
+ if (!changelogTopics.isEmpty()) {
+ result.append(String.format("Following changelog topic(s) has
not been named: %s%n", String.join(", ", changelogTopics)));
+ }
+ if (!stateStores.isEmpty()) {
+ result.append(String.format("Following state store(s) has not
been named: %s%n", String.join(", ", stateStores)));
+ }
+ if (!repartitionTopics.isEmpty()) {
+ result.append(String.format("Following repartition topic(s)
has not been named: %s%n", String.join(", ", repartitionTopics)));
+ }
+ if (ensureExplicitInternalResourceNaming) {
+ throw new TopologyException(result.toString());
+ } else {
+ log.warn("Enforce explicit naming for all internal resources
is set to false. If you want" +
Review Comment:
Explicit naming for internal resources is currently disabled. If you want to
enforce user-defined names for all internal resources, set
"ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG" to true. Note: Changing
internal resource names may require a full streams application reset for an
already deployed application. Consult the documentation on naming operators for
more details.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]