cadonna commented on code in PR #19275:
URL: https://github.com/apache/kafka/pull/19275#discussion_r2039706952
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -432,19 +428,8 @@ private List<String> getTaskIdsAsStrings(final
KafkaStreams streams) {
}
private static Stream<Arguments> singleAndMultiTaskParameters() {
- return Stream.of(Arguments.of("simple", true),
- Arguments.of("simple", false),
- Arguments.of("complex", true),
- Arguments.of("complex", false));
- }
-
- private static Stream<Arguments> multiTaskParameters() {
- return Stream.of(Arguments.of(true),
- Arguments.of(false));
- }
-
- private Properties props(final boolean stateUpdaterEnabled) {
- return
props(mkObjectProperties(mkMap(mkEntry(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED,
stateUpdaterEnabled))));
+ return Stream.of(Arguments.of("simple"),
+ Arguments.of("complex"));
}
private Properties props(final Properties extraProperties) {
Review Comment:
Since this method is exclusively called with `null` now, you can remove the
parameter.
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -264,9 +261,9 @@ public void shouldPushMetricsToBroker(final String
recordingLevel) throws Except
@ParameterizedTest
@MethodSource("singleAndMultiTaskParameters")
Review Comment:
This can probably be transformed to `@ValueSource(strings = {"simple",
"complex"})`
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java:
##########
@@ -161,7 +154,10 @@ private Properties props(final Properties extraProperties)
{
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.IntegerSerde.class);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
1000L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
- streamsConfiguration.putAll(extraProperties);
+
+ if (extraProperties != null) {
+ streamsConfiguration.putAll(extraProperties);
+ }
Review Comment:
Instead of passing `null` could you please do the following:
```java
private Properties props(final Properties extraProperties) {
Properties streamsConfiguration = props();
streamsConfiguration.putAll(extraProperties);
return streamsConfiguration;
}
private Properties props() {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory(appId).getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.IntegerSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.IntegerSerde.class);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
1000L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
streamsConfigurations.add(streamsConfiguration);
return streamsConfiguration;
}
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -981,7 +933,7 @@ public void shouldCreateStandbyTaskDuringAssignment() {
}
@Test
- public void
shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithStateUpdaterEnabled()
{
+ public void
shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithStateUpdater() {
Review Comment:
```suggestion
public void
shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInit() {
```
Here and elsewhere.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -364,8 +361,8 @@ public void onChange(final Thread thread,
@ParameterizedTest
@MethodSource("data")
Review Comment:
You could change this to `@ValueSource(booleans = {true, false})`.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -899,6 +840,7 @@ private StreamTask convertStandbyToActive(final StandbyTask
standbyTask, final S
return activeTaskCreator.createActiveTaskFromStandby(standbyTask,
partitions, mainConsumer);
}
+ // This can also be removed, but what about the test cases?
Review Comment:
If the production code is removed also the test cases are not needed anymore.
If you have comments or questions please add them to the PR instead of
inline comments in code. Inline comments are easy to overlook and then we have
comments that nobody understands in the codebase.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]