https://stackoverflow.com/questions/67289232/kafkastreams-scala-replace-functionscompatconversions
I asked this question ~10 days ago, and it now occurs to me that I asked it
in the wrong place. So trying here:
I am using KafkaStreams 2.6.0, scala, in an existing bunch of applications.
I'm devising of a scheme to maximize the uptime/robustness and on every
exception, log + (discard or send-to-dead-letter-topic). I want to do this
*without* explicitly adding Try/try-catch blocks all over the applications.
I had this idea, to replace FunctionsCompactConversions with my own
GuardedFunctionsCompactConversions, and add Try-s there e.g.
replace
implicit class ForeachActionFromFunction[K, V](val p: (K, V) =>
Unit) extends AnyVal {
def asForeachAction: ForeachAction[K, V] = (key: K, value: V) =>
p(key, value)
}
with
implicit class ForeachActionFromFunction[K, V](val p: (K, V) =>
Unit) extends AnyVal {
def asForeachAction: ForeachAction[K, V] = (key: K, value: V) => {
setLogContext(value, key)
Try(p(key, value)) match {
case Success(_) =>
case Failure(ex) => Error(s"asForeachAction Failed $ex when
handle ($key, $value)")
}
}
}
or
def asPredicate: Predicate[K, V] = (key: K, value: V) => p(key, value)
with
def asPredicate: Predicate[K, V] = (key: K, value: V) => {
setLogContext(value, key)
Try(p(key, value)) match {
case Success(s) => s
case Failure(ex) =>
Error(s"asPredicate Failed $ex when handle ($key, $value)")
false
}
}
etc. This way -
1.
All the application-provided code is guarded (predicates, reducers,
Serde, ...), and one can't "forget" to try/catch
2.
Upon any error/exception, can log the message at hand, providing insight
as to what the fix should be
3.
Centrally able to disable this logging in production etc.
4.
Centrally able to opt for the Guarded version if troubleshooting, while
using the compact ones by default
Unfortunately I failed to find the right path to do this. I created the
GuardedFunctionsCompactConversions object, but could not extend/override
the compact one, nor get it imported into the proper
KTable/KStream/KGroupedStream/... classes.
Is this a common requirement? I expect it is. Is there a right way to get
there?