ableegoldman commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1619985405
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##########
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode<Void, Void, ?, ?>
child) {
public void init(final InternalProcessorContext<Void, Void> context) {
super.init(context);
this.context = context;
- keySerializer = prepareKeySerializer(keySerializer, context,
this.name());
- valSerializer = prepareValueSerializer(valSerializer, context,
this.name());
+ try {
+ keySerializer = prepareKeySerializer(keySerializer, context,
this.name());
+ } catch (final ConfigException e) {
+ throw new ConfigException(String.format("Failed to initialize key
serdes for sink node %s", name()));
+ } catch (final StreamsException e) {
Review Comment:
Can't say what it's original purpose was but StreamsException has definitely
morphed into a catch-all for exceptions throughout Streams. It's definitely not
exclusive to the state of a task though (that would be
ProcessorStateException). The nice thing about StreamsException is you can add
other useful metadata such as the taskId where the error originated, so I
always prefer to just throw the StreamsException. We also know for a fact that
StreamsException will be caught and handled properly as it gets bubbled up. So
I'd go for merging this into a single `catch RuntimeException` block, then wrap
it in a StreamsException.
And don't forget to add the task id too! 😄 (you can get it from the
processor context)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]