Hi John,
Thank you for your reply.
The "built-in" FunctionsCompactConversions is "*private*[scala] object
FunctionsCompatConversions" ,
Also, it is explicitly imported from KStream/KGroupedStream/KTable etc..
I don't understand implicit well enough, I guess - but:
- I can't extend a "*private*[scala] object FunctionsCompatConversions",
right?
- You wrote:
*You can always copy/paste it’s logic into your implementation. *
*Note also that you will have to make sure that in your topology building
code, you have to import your implicit converter, and you cannot import
ours. So I must ask -- *How?
1. I don't directly import FunctionsCompatConversions. Naturally, I
import KStream/KGroupedStream/KTable etc., and they import it.
(See below an example from for KStream.scala)
2. Do I need to change anything in the apache-kafka-streams code?
I assume no.
3. Can I import the Guarded class once, e.g. in a "common" jar
created w/ sbt-assembly?
Or should I explicitly import an alternative
FunctionsCompactConversions?
- Before or after import KStream/KGroupedStream/KTable etc.?
import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
FlatValueMapperFromFunction,
FlatValueMapperWithKeyFromFunction,
ForeachActionFromFunction,
KeyValueMapperFromFunction,
MapperFromFunction,
PredicateFromFunction,
TransformerSupplierAsJava,
ValueMapperFromFunction,
ValueMapperWithKeyFromFunction,
ValueTransformerSupplierAsJava,
ValueTransformerSupplierWithKeyAsJava
}
On Sun, May 9, 2021 at 5:30 PM John Roesler <[email protected]> wrote:
> Hi Rama,
>
> There has been discussion on and off for building something like this into
> the library, and I think it’s a good idea, but we haven’t had anyone draft
> a proposal.
>
> My bias with Scala is actually to avoid using implicits for nontrivial
> logic like this. It’s convenient, but it also makes the code hard to
> understand, especially for newcomers to your codebase.
>
> If you do want to stick with implicits, I’m not sure what the problem
> might be. If you can’t extend the library conversion, you can always
> copy/paste it’s logic into your implementation.
>
> Note also that you will have to make sure that in your topology building
> code, you have to import your implicit converter, and you cannot import
> ours.
>
> I hope this helps!
> -John
>
> On Sun, May 9, 2021, at 02:20, Rama Eshel wrote:
> >
> https://stackoverflow.com/questions/67289232/kafkastreams-scala-replace-functionscompatconversions
> >
> > I asked this question ~10 days ago, and it now occurs to me that I asked
> it
> > in the wrong place. So trying here:
> >
> >
> > I am using KafkaStreams 2.6.0, scala, in an existing bunch of
> applications.
> >
> > I'm devising of a scheme to maximize the uptime/robustness and on every
> > exception, log + (discard or send-to-dead-letter-topic). I want to do
> this
> > *without* explicitly adding Try/try-catch blocks all over the
> applications.
> >
> > I had this idea, to replace FunctionsCompactConversions with my own
> > GuardedFunctionsCompactConversions, and add Try-s there e.g.
> >
> > replace
> >
> > implicit class ForeachActionFromFunction[K, V](val p: (K, V) =>
> > Unit) extends AnyVal {
> > def asForeachAction: ForeachAction[K, V] = (key: K, value: V) =>
> > p(key, value)
> > }
> >
> > with
> >
> > implicit class ForeachActionFromFunction[K, V](val p: (K, V) =>
> > Unit) extends AnyVal {
> > def asForeachAction: ForeachAction[K, V] = (key: K, value: V) => {
> > setLogContext(value, key)
> > Try(p(key, value)) match {
> > case Success(_) =>
> > case Failure(ex) => Error(s"asForeachAction Failed $ex when
> > handle ($key, $value)")
> > }
> > }
> > }
> >
> > or
> >
> > def asPredicate: Predicate[K, V] = (key: K, value: V) => p(key,
> value)
> >
> > with
> >
> > def asPredicate: Predicate[K, V] = (key: K, value: V) => {
> > setLogContext(value, key)
> > Try(p(key, value)) match {
> > case Success(s) => s
> > case Failure(ex) =>
> > Error(s"asPredicate Failed $ex when handle ($key, $value)")
> > false
> > }
> > }
> >
> > etc. This way -
> >
> > 1.
> >
> > All the application-provided code is guarded (predicates, reducers,
> > Serde, ...), and one can't "forget" to try/catch
> > 2.
> >
> > Upon any error/exception, can log the message at hand, providing
> insight
> > as to what the fix should be
> > 3.
> >
> > Centrally able to disable this logging in production etc.
> > 4.
> >
> > Centrally able to opt for the Guarded version if troubleshooting,
> while
> > using the compact ones by default
> >
> > Unfortunately I failed to find the right path to do this. I created the
> > GuardedFunctionsCompactConversions object, but could not extend/override
> > the compact one, nor get it imported into the proper
> > KTable/KStream/KGroupedStream/... classes.
> >
> > Is this a common requirement? I expect it is. Is there a right way to get
> > there?
> >
>