This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/kip1071 by this push:
new f7642748f80 Add group.protocol config handling for streams (#18033)
f7642748f80 is described below
commit f7642748f800ef441379c42fb1772522a9e8c130
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Dec 4 13:25:09 2024 +0100
Add group.protocol config handling for streams (#18033)
- accept new streams group.protocol value
- do not forward to consumer clients
- log message that not for production
---
.../org/apache/kafka/streams/StreamsConfig.java | 22 +++++++++++++++---
.../apache/kafka/streams/StreamsConfigTest.java | 26 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 3 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 476e582933e..a3200376ace 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -841,6 +841,8 @@ public class StreamsConfig extends AbstractConfig {
public static final String
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG =
"windowstore.changelog.additional.retention.ms";
private static final String
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows
maintainMs to ensure data is not deleted from the log prematurely. Allows for
clock drift. Default is 1 day";
+ private static final String[] IGNORED_UNPREFIXED_CONSUMER_CONFIGS =
+ new String[] {ConsumerConfig.GROUP_PROTOCOL_CONFIG};
private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS =
new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
ConsumerConfig.GROUP_PROTOCOL_CONFIG};
private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS =
@@ -1490,6 +1492,11 @@ public class StreamsConfig extends AbstractConfig {
}
verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
verifyClientTelemetryConfigs();
+
+ if (doLog &&
getString(GROUP_PROTOCOL_CONFIG).equals(GroupProtocol.STREAMS.name().toLowerCase(Locale.ROOT)))
{
+ log.warn("The streams rebalance protocol is still in development
and should not be used in production. "
+ + "Please set group.protocol=classic (default) in all
production use cases.");
+ }
}
private void verifyEOSTransactionTimeoutCompatibility() {
@@ -1610,7 +1617,8 @@ public class StreamsConfig extends AbstractConfig {
}
private Map<String, Object> getCommonConsumerConfigs() {
- final Map<String, Object> clientProvidedProps =
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
+ final Map<String, Object> clientProvidedProps =
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames(),
+ IGNORED_UNPREFIXED_CONSUMER_CONFIGS);
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps,
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps,
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
@@ -1923,12 +1931,20 @@ public class StreamsConfig extends AbstractConfig {
}
private Map<String, Object> getClientPropsWithPrefix(final String prefix,
- final Set<String>
configNames) {
- final Map<String, Object> props = clientProps(configNames,
originals());
+ final Set<String>
configNames,
+ final String[]
ignoreFromGlobal) {
+ final Set<String> unprefixedConfigs = new HashSet<>(configNames);
+ Arrays.asList(ignoreFromGlobal).forEach(unprefixedConfigs::remove);
+ final Map<String, Object> props = clientProps(unprefixedConfigs,
originals());
props.putAll(originalsWithPrefix(prefix));
return props;
}
+ private Map<String, Object> getClientPropsWithPrefix(final String prefix,
+ final Set<String>
configNames) {
+ return getClientPropsWithPrefix(prefix, configNames, new String[0]);
+ }
+
/**
* Get a map of custom configs by removing from the originals all the
Streams, Consumer, Producer, and AdminClient configs.
* Prefixed properties are also removed because they are already added by
{@link #getClientPropsWithPrefix(String, Set)}.
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 4467e252b92..c59352a027b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1524,6 +1524,32 @@ public class StreamsConfigTest {
}
}
+ @Test
+ public void shouldHaveGroupConfigClassicDefault() {
+ streamsConfig = new StreamsConfig(props);
+ assertEquals("classic",
streamsConfig.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG));
+ }
+
+ @Test
+ public void shouldLogWarningWhenStreamsProtocolIsUsed() {
+ props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+ try (LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+ logCaptureAppender.setClassLogger(StreamsConfig.class, Level.WARN);
+ streamsConfig = new StreamsConfig(props);
+ assertTrue(logCaptureAppender.getMessages().contains("The streams
rebalance protocol is still in development and "
+ + "should not be used in production. Please set
group.protocol=classic (default) in all production use cases."));
+ }
+ assertEquals("streams",
streamsConfig.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG));
+ }
+
+ @Test
+ public void shouldNotApplyGroupConfigToConsumers() {
+ props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+ streamsConfig = new StreamsConfig(props);
+ assertEquals("classic", streamsConfig.getMainConsumerConfigs("a", "b",
threadIdx).get("group.protocol"));
+ assertEquals("classic",
streamsConfig.getRestoreConsumerConfigs(clientId).get("group.protocol"));
+ }
+
@SuppressWarnings("deprecation")
@Test
public void shouldUseOldProductionExceptionHandlerWhenOnlyOldConfigIsSet()
{