mjsax commented on code in PR #17710:
URL: https://github.com/apache/kafka/pull/17710#discussion_r1831960504
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -248,18 +251,54 @@ private static StreamsConfig createConfig(final String
enforcedProcessingValue)
}
private static StreamsConfig createConfig(final String eosConfig, final
String enforcedProcessingValue) {
- return createConfig(eosConfig, enforcedProcessingValue,
LogAndFailExceptionHandler.class.getName(),
LogAndFailProcessingExceptionHandler.class.getName());
+ return createConfig(
+ eosConfig,
+ enforcedProcessingValue,
+ LogAndFailExceptionHandler.class,
+ LogAndFailProcessingExceptionHandler.class,
+ FailOnInvalidTimestamp.class
+ );
+ }
+
+ private static StreamsConfig createConfig(final Class<? extends
DeserializationExceptionHandler> deserializationExceptionHandler) {
+ return createConfig(
+ AT_LEAST_ONCE,
+ "0",
+ deserializationExceptionHandler,
+ LogAndFailProcessingExceptionHandler.class,
+ FailOnInvalidTimestamp.class
+ );
+ }
+
+ private static StreamsConfig createConfigWithTsExtractor(final Class<?
extends TimestampExtractor> timestampExtractor) {
+ return createConfig(
+ AT_LEAST_ONCE,
+ "0",
+ LogAndFailExceptionHandler.class,
+ LogAndFailProcessingExceptionHandler.class,
+ timestampExtractor
+ );
}
- private static StreamsConfig createConfig(final String eosConfig, final
String enforcedProcessingValue, final String deserializationExceptionHandler) {
- return createConfig(eosConfig, enforcedProcessingValue,
deserializationExceptionHandler,
LogAndFailProcessingExceptionHandler.class.getName());
+ private static StreamsConfig createConfig(
+ final String enforcedProcessingValue,
+ final Class<? extends ProcessingExceptionHandler>
processingExceptionHandler
+ ) {
+ return createConfig(
+ AT_LEAST_ONCE,
+ enforcedProcessingValue,
+ LogAndFailExceptionHandler.class,
+ processingExceptionHandler,
+ FailOnInvalidTimestamp.class
+ );
}
private static StreamsConfig createConfig(
final String eosConfig,
final String enforcedProcessingValue,
- final String deserializationExceptionHandler,
- final String processingExceptionHandler) {
+ final Class<? extends DeserializationExceptionHandler>
deserializationExceptionHandler,
+ final Class<? extends ProcessingExceptionHandler>
processingExceptionHandler,
+ final Class<? extends TimestampExtractor> timestampExtractor) {
Review Comment:
Adding new `TimestampExtractor` parameter here, and cleanup the overload of
`createConfig` a little bit, to avoid passing in unnecessary parameters (create
a little noise on the PR below, but code is much cleaner now)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]