mjsax commented on code in PR #17422:
URL: https://github.com/apache/kafka/pull/17422#discussion_r1792596154
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1459,6 +1467,43 @@ private void verifyEOSTransactionTimeoutCompatibility() {
}
}
+ private void verifyClientTelemetryConfigs() {
+ final String mainConsumerMetricsPushKey =
mainConsumerPrefix(ENABLE_METRICS_PUSH_CONFIG);
+ final String adminClientMetricsPushKey =
adminClientPrefix(ENABLE_METRICS_PUSH_CONFIG);
+ final boolean streamTelemetryEnabled =
Review Comment:
Can we just use `getBoolean(ENABLE_METRICS_PUSH_CONFIG)` ?
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1459,6 +1467,43 @@ private void verifyEOSTransactionTimeoutCompatibility() {
}
}
+ private void verifyClientTelemetryConfigs() {
+ final String mainConsumerMetricsPushKey =
mainConsumerPrefix(ENABLE_METRICS_PUSH_CONFIG);
+ final String adminClientMetricsPushKey =
adminClientPrefix(ENABLE_METRICS_PUSH_CONFIG);
+ final boolean streamTelemetryEnabled =
+ !originals().containsKey(ENABLE_METRICS_PUSH_CONFIG) ||
(boolean) Objects.requireNonNull(
+ parseType(ENABLE_METRICS_PUSH_CONFIG,
originals().get(ENABLE_METRICS_PUSH_CONFIG), Type.BOOLEAN),
+ "Can't parse " + ENABLE_METRICS_PUSH_CONFIG + "
because it's null");
+
+ final boolean mainConsumerMetricsDisabled =
+ !originals().containsKey(mainConsumerMetricsPushKey) ||
(boolean) Objects.requireNonNull(
Review Comment:
Why `!originals().containsKey(mainConsumerMetricsPushKey) ||` ? If there is
no `main.consumer.enable.metric.push` that it should default to `true` what
mean enabled?
Also, if there is no `main.consumer.enable.metric.push` there could still be
`consumer.enable.metric.push` and I think we need to check this case if
`main.consumer.xxx` is not set.
Would it simplify the code, if we use `originalsWithPrefix(...)` helper
method?
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1323,6 +1327,51 @@ public void
shouldDisableMetricCollectionOnMainConsumerOnly() {
);
}
+ @Test
+ public void
shouldThrowConfigExceptionWhenAdminClientMetricsDisabledStreamsMetricsPushEnabled()
{
+
props.put(StreamsConfig.adminClientPrefix(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG),
false);
+
+ final Exception exception = assertThrows(ConfigException.class, () ->
new StreamsConfig(props));
+
+ assertThat(
+ exception.getMessage(),
+ containsString("KafkaStreams has metrics push enabled" +
+ " but the admin client metrics push is disabled.
Enable " +
+ " metrics push for the admin client")
+ );
+ assertNull(
Review Comment:
Seems this in unrelated to this test? If we want to verify this, might be
best to add new test case?
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1459,6 +1467,43 @@ private void verifyEOSTransactionTimeoutCompatibility() {
}
}
+ private void verifyClientTelemetryConfigs() {
+ final String mainConsumerMetricsPushKey =
mainConsumerPrefix(ENABLE_METRICS_PUSH_CONFIG);
+ final String adminClientMetricsPushKey =
adminClientPrefix(ENABLE_METRICS_PUSH_CONFIG);
+ final boolean streamTelemetryEnabled =
+ !originals().containsKey(ENABLE_METRICS_PUSH_CONFIG) ||
(boolean) Objects.requireNonNull(
+ parseType(ENABLE_METRICS_PUSH_CONFIG,
originals().get(ENABLE_METRICS_PUSH_CONFIG), Type.BOOLEAN),
+ "Can't parse " + ENABLE_METRICS_PUSH_CONFIG + "
because it's null");
+
+ final boolean mainConsumerMetricsDisabled =
+ !originals().containsKey(mainConsumerMetricsPushKey) ||
(boolean) Objects.requireNonNull(
+ parseType(mainConsumerMetricsPushKey,
originals().get(mainConsumerMetricsPushKey), Type.BOOLEAN),
+ "Can't parse " + mainConsumerMetricsPushKey + " because it's
null");
+
+ final boolean adminMetricsEnabled =
+ !originals().containsKey(adminClientMetricsPushKey) ||
(boolean) Objects.requireNonNull(
+ parseType(adminClientMetricsPushKey,
originals().get(adminClientMetricsPushKey), Type.BOOLEAN),
Review Comment:
Would it simplify the code, if we use `originalsWithPrefix(...)` helper
method?
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1459,6 +1467,43 @@ private void verifyEOSTransactionTimeoutCompatibility() {
}
}
+ private void verifyClientTelemetryConfigs() {
+ final String mainConsumerMetricsPushKey =
mainConsumerPrefix(ENABLE_METRICS_PUSH_CONFIG);
+ final String adminClientMetricsPushKey =
adminClientPrefix(ENABLE_METRICS_PUSH_CONFIG);
+ final boolean streamTelemetryEnabled =
+ !originals().containsKey(ENABLE_METRICS_PUSH_CONFIG) ||
(boolean) Objects.requireNonNull(
+ parseType(ENABLE_METRICS_PUSH_CONFIG,
originals().get(ENABLE_METRICS_PUSH_CONFIG), Type.BOOLEAN),
+ "Can't parse " + ENABLE_METRICS_PUSH_CONFIG + "
because it's null");
+
+ final boolean mainConsumerMetricsDisabled =
+ !originals().containsKey(mainConsumerMetricsPushKey) ||
(boolean) Objects.requireNonNull(
+ parseType(mainConsumerMetricsPushKey,
originals().get(mainConsumerMetricsPushKey), Type.BOOLEAN),
+ "Can't parse " + mainConsumerMetricsPushKey + " because it's
null");
+
+ final boolean adminMetricsEnabled =
+ !originals().containsKey(adminClientMetricsPushKey) ||
(boolean) Objects.requireNonNull(
+ parseType(adminClientMetricsPushKey,
originals().get(adminClientMetricsPushKey), Type.BOOLEAN),
+ "Can't parse " + adminClientMetricsPushKey + " because
it's null");
+
+ final String baseMetricsMisconfigurationMessage = "KafkaStreams has
metrics push enabled" +
+ " but the %s metrics push is disabled. Enable " +
+ " metrics push for the %s";
+
+ if (streamTelemetryEnabled) {
+ if (!mainConsumerMetricsDisabled && !adminMetricsEnabled) {
Review Comment:
Given that you use `!...Disabled` and `!...Enabled` would it make sense to
flip the variables from "disabled -> enable" and "enable -> disabled" to avoid
the negation here (and below)?
Also, I think we should throw if `streamTelemetryEnabled &&
mainConsumerMetricsDisabled` not if "main consumer enable" -- it seem the code
works correctly though, as we compute `mainConsumerMetricsDisabled` incorrectly
above, and thus there is two error which cancel out each other?
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1323,6 +1327,51 @@ public void
shouldDisableMetricCollectionOnMainConsumerOnly() {
);
}
+ @Test
+ public void
shouldThrowConfigExceptionWhenAdminClientMetricsDisabledStreamsMetricsPushEnabled()
{
+
props.put(StreamsConfig.adminClientPrefix(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG),
false);
+
+ final Exception exception = assertThrows(ConfigException.class, () ->
new StreamsConfig(props));
+
+ assertThat(
+ exception.getMessage(),
+ containsString("KafkaStreams has metrics push enabled" +
+ " but the admin client metrics push is disabled.
Enable " +
+ " metrics push for the admin client")
+ );
+ assertNull(
+ streamsConfig.getRestoreConsumerConfigs("clientId")
+ .get(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG)
+ );
+ assertNull(
+ streamsConfig.getGlobalConsumerConfigs("clientId")
+ .get(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG)
+ );
+ }
+
+ @Test
+ public void
shouldThrowConfigExceptionWhenAdminClientAndMainConsumerMetricsDisabledStreamsMetricsPushEnabled()
{
+
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG),
false);
+
props.put(StreamsConfig.adminClientPrefix(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG),
false);
+
+ final Exception exception = assertThrows(ConfigException.class, () ->
new StreamsConfig(props));
+
+ assertThat(
+ exception.getMessage(),
+ containsString("KafkaStreams has metrics push enabled" +
+ " but the main consumer and admin client metrics push
is disabled. Enable " +
+ " metrics push for the main consumer and the admin
client")
+ );
+ assertNull(
Review Comment:
as above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]