Hope this helps, I tried copying your code into a sample application. I got
it to compile with the implicits all resolving. I think the trick was there
were two implementations for Windowing Serdes.  You just need to block one
from the imports.  See if that fits with what you are doing.  Oh also, I
noticed that the types were not resolving when calling builder.stream, so I
put [String, String] in the builder.  Here is a gist, which formats better.

https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735

import java.time.Duration
import java.util.Properties

import org.apache.kafka.common.config.Config
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.{SessionWindows, Windowed}
import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _}
import org.apache.kafka.streams.scala.kstream.{Consumed, Grouped,
Materialized, Produced}
import org.apache.kafka.streams.scala.{ByteArraySessionStore, StreamsBuilder}

class SampleStream {
  def createTopology(conf: Config, properties: Properties): Topology = {

    implicit val produced: Produced[Windowed[String], Long] =
      Produced.`with`[Windowed[String], Long]

    implicit val grouped: Grouped[String, String] =
      Grouped.`with`[String, String]

    implicit val consumed: Consumed[String, String] =
      Consumed.`with`[String, String]

    implicit val materialized: Materialized[String, Long,
ByteArraySessionStore] = Materialized.`with`[String, Long,
ByteArraySessionStore]

    val builder: StreamsBuilder = new StreamsBuilder()

    builder
      .stream[String, String]("streams-plaintext-input")
      .groupBy((_, word) => word)
      .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
      .count()
      .toStream
      .to("streams-pipe-output")

    builder.build()
  }
}




On Thu, Nov 19, 2020 at 7:24 AM John Roesler <vvcep...@apache.org> wrote:

> Hi Eric,
>
> Sure thing. Assuming the definition of ‘produced’ you had tried in your
> code, it’s just:
>
> ...
> .toStream.to("streams-pipe-output")(produced)
>
> As far as the json serde goes, I think that I wrote an example of using
> Jackson to implement a serde in Confluent’s kafka-streams-examples repo.
> I’m not sure what other/better examples
> might be out there.
>
> Hope this helps,
> John
>
> On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote:
> > Not sure what you mean by "pass it explicitly". The definition of 'to' is
> > given below. Can we pass it explicitly in this case. If yes, can you
> please
> > show me how?
> >
> > def to(topic: String)(implicit produced: Produced[K, V]): Unit =
> >   inner.to(topic, produced)
> >
> >
> > Also not sure how to use a self documenting format like JSON. Any
> > examples to share?
> >
> >
> > On Wed, Nov 18, 2020 at 5:14 PM John Roesler <vvcep...@apache.org>
> wrote:
> >
> > > Hi Eric,
> > >
> > > Ah, that’s a bummer. The correct serde is the session windowed serde,
> as I
> > > can see you know. I’m afraid I’m a bit rusty on implicit resolution
> rules,
> > > so I can’t be much help there.
> > >
> > > But my general recommendation for implicits is that when things get
> weird,
> > > just don’t use them at all. For example, you can just explicitly pass
> the
> > > Produced in the second arg list of ‘to’.
> > >
> > > One other tip is that the serialized form produced by those serdes is
> kind
> > > of specialized and might not be the most convenient for your use. If
> this
> > > is just a POC, if suggest mapping the keys to strings, so they are
> > > human-readable. If this is a production use case, then you might want
> to
> > > use a more self-documenting format like JSON or AVRO. Just my two
> cents.
> > >
> > > I hope this helps!
> > > -John
> > >
> > > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
> > > > I keep getting '*ambiguous implicit values*' message in the following
> > > code.
> > > > I tried several things (as can be seen from a couple of lines I've
> > > > commented out). Any ideas on how to fix this? This is in *Scala*.
> > > >
> > > >  def createTopology(conf: Config, properties: Properties): Topology =
> > > > {//    implicit val sessionSerde =
> > > > Serde[WindowedSerdes.SessionWindowedSerde[String]]//    implicit val
> > > > produced: Produced[Windowed[String], Long] =
> > > > Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], Long]
> > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > Produced.`with`[Windowed[String], Long]
> > > >     implicit val consumed: Consumed[String, String] =
> > > > Consumed.`with`[String, String]
> > > >
> > > >     val builder: StreamsBuilder = new StreamsBuilder()
> > > >     builder.stream("streams-plaintext-input")
> > > >         .groupBy((_, word) => word)
> > > >         .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 *
> 1000)))
> > > >         .count()
> > > >         .toStream.to("streams-pipe-output")
> > > >
> > > >     builder.build()
> > > >
> > > >   }
> > > >
> > > > *Compiler Errors:*
> > > >
> > > > Error:(52, 78) ambiguous implicit values:
> > > >  both method timeWindowedSerde in object Serdes of type [T](implicit
> > > > tSerde:
> > > >
> > >
> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
> > > >  and method sessionWindowedSerde in object Serdes of type
> [T](implicit
> > > > tSerde:
> > > >
> > >
> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
> > > >  match expected type
> > > >
> > >
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > Produced.`with`[Windowed[String], Long]
> > > > Error:(52, 78) could not find implicit value for parameter keySerde:
> > > >
> > >
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > Produced.`with`[Windowed[String], Long]
> > > > Error:(52, 78) not enough arguments for method with: (implicit
> > > > keySerde:
> > > >
> > >
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]],
> > > > implicit valueSerde:
> > > >
> > >
> org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified
> > > > value parameters keySerde, valueSerde.
> > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > Produced.`with`[Windowed[String], Long]
> > > >
> > >
> >
>

Reply via email to