FYI, my goal was to get it to compile, since that is where
implicit resolution takes place. Obviously, you would need to plug in the
properties, start the stream, add a shutdown hook, etc.

On Fri, Nov 20, 2020 at 12:56 AM Daniel Hinojosa <
dhinoj...@evolutionnext.com> wrote:

> Done.  I am using SBT for the build tool.  I created an application and
> put it here
> https://github.com/dhinojosa/kafka-streams-scala
>
> The implicits for SessionWindowedSerde from this line below. The reason is
> I am turning off the timeWindowSerde, and that just leaves the
> SessionWindowSerde, the last underscore is to include the rest from this
> package.
>
> import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _}
>
> Everything should be in the project, it is now a 2.11.12 and Kafka 2.4.0.
> The only thing you may want to change is the SBT version in
> project/build.properties. :)
>
>
> On Thu, Nov 19, 2020 at 10:06 PM Eric Beabes <mailinglist...@gmail.com>
> wrote:
>
>> Daniel,
>>
>> I copied your code from here:
>> https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
>>
>> Still getting the same error message. Actually, I am not sure why it would
>> work since you don't have implicits for SessionWindowedSerde. By the way,
>> which versions of Scala & Kafka did you use? Maybe that's the issue. I am
>> using following versions:
>>
>> *Scala: 2.11.12*
>> *Kafka: 2.4.0*
>>
>> Can you please try with these versions & let me know if it still works for
>> you? Thanks for your help.
>>
>>
>>
>>
>>
>>
>> On Thu, Nov 19, 2020 at 5:16 PM Liam Clarke-Hutchinson <
>> liam.cla...@adscale.co.nz> wrote:
>>
>> > That said John, nothing wrong with being explicit in code. :)
>> >
>> > On Fri, 20 Nov. 2020, 1:46 pm John Roesler, <vvcep...@apache.org>
>> wrote:
>> >
>> > > Oh, nice. Thanks, Daniel!
>> > >
>> > > That’s much nicer than my ham-handed approach.
>> > >
>> > > Thanks,
>> > > John
>> > >
>> > > On Thu, Nov 19, 2020, at 17:44, Daniel Hinojosa wrote:
>> > > > Hope this helps, I tried copying your code into a sample
>> application. I
>> > > got
>> > > > it to compile with the implicits all resolving. I think the trick
>> was
>> > > there
>> > > > were two implementations for Windowing Serdes.  You just need to
>> block
>> > > one
>> > > > from the imports.  See if that fits with what you are doing.  Oh
>> also,
>> > I
>> > > > noticed that the types were not resolving when calling
>> builder.stream,
>> > > so I
>> > > > put [String, String] in the builder.  Here is a gist, which formats
>> > > better.
>> > > >
>> > > > https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
>> > > >
>> > > > import java.time.Duration
>> > > > import java.util.Properties
>> > > >
>> > > > import org.apache.kafka.common.config.Config
>> > > > import org.apache.kafka.streams.Topology
>> > > > import org.apache.kafka.streams.kstream.{SessionWindows, Windowed}
>> > > > import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde =>
>> _,
>> > _}
>> > > > import org.apache.kafka.streams.scala.kstream.{Consumed, Grouped,
>> > > > Materialized, Produced}
>> > > > import org.apache.kafka.streams.scala.{ByteArraySessionStore,
>> > > StreamsBuilder}
>> > > >
>> > > > class SampleStream {
>> > > >   def createTopology(conf: Config, properties: Properties):
>> Topology =
>> > {
>> > > >
>> > > >     implicit val produced: Produced[Windowed[String], Long] =
>> > > >       Produced.`with`[Windowed[String], Long]
>> > > >
>> > > >     implicit val grouped: Grouped[String, String] =
>> > > >       Grouped.`with`[String, String]
>> > > >
>> > > >     implicit val consumed: Consumed[String, String] =
>> > > >       Consumed.`with`[String, String]
>> > > >
>> > > >     implicit val materialized: Materialized[String, Long,
>> > > > ByteArraySessionStore] = Materialized.`with`[String, Long,
>> > > > ByteArraySessionStore]
>> > > >
>> > > >     val builder: StreamsBuilder = new StreamsBuilder()
>> > > >
>> > > >     builder
>> > > >       .stream[String, String]("streams-plaintext-input")
>> > > >       .groupBy((_, word) => word)
>> > > >       .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 *
>> 1000)))
>> > > >       .count()
>> > > >       .toStream
>> > > >       .to("streams-pipe-output")
>> > > >
>> > > >     builder.build()
>> > > >   }
>> > > > }
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > On Thu, Nov 19, 2020 at 7:24 AM John Roesler <vvcep...@apache.org>
>> > > wrote:
>> > > >
>> > > > > Hi Eric,
>> > > > >
>> > > > > Sure thing. Assuming the definition of ‘produced’ you had tried in
>> > your
>> > > > > code, it’s just:
>> > > > >
>> > > > > ...
>> > > > > .toStream.to("streams-pipe-output")(produced)
>> > > > >
>> > > > > As far as the json serde goes, I think that I wrote an example of
>> > using
>> > > > > Jackson to implement a serde in Confluent’s kafka-streams-examples
>> > > repo.
>> > > > > I’m not sure what other/better examples
>> > > > > might be out there.
>> > > > >
>> > > > > Hope this helps,
>> > > > > John
>> > > > >
>> > > > > On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote:
>> > > > > > Not sure what you mean by "pass it explicitly". The definition
>> of
>> > > 'to' is
>> > > > > > given below. Can we pass it explicitly in this case. If yes, can
>> > you
>> > > > > please
>> > > > > > show me how?
>> > > > > >
>> > > > > > def to(topic: String)(implicit produced: Produced[K, V]): Unit =
>> > > > > >   inner.to(topic, produced)
>> > > > > >
>> > > > > >
>> > > > > > Also not sure how to use a self documenting format like JSON.
>> Any
>> > > > > > examples to share?
>> > > > > >
>> > > > > >
>> > > > > > On Wed, Nov 18, 2020 at 5:14 PM John Roesler <
>> vvcep...@apache.org>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi Eric,
>> > > > > > >
>> > > > > > > Ah, that’s a bummer. The correct serde is the session windowed
>> > > serde,
>> > > > > as I
>> > > > > > > can see you know. I’m afraid I’m a bit rusty on implicit
>> > resolution
>> > > > > rules,
>> > > > > > > so I can’t be much help there.
>> > > > > > >
>> > > > > > > But my general recommendation for implicits is that when
>> things
>> > get
>> > > > > weird,
>> > > > > > > just don’t use them at all. For example, you can just
>> explicitly
>> > > pass
>> > > > > the
>> > > > > > > Produced in the second arg list of ‘to’.
>> > > > > > >
>> > > > > > > One other tip is that the serialized form produced by those
>> > serdes
>> > > is
>> > > > > kind
>> > > > > > > of specialized and might not be the most convenient for your
>> use.
>> > > If
>> > > > > this
>> > > > > > > is just a POC, if suggest mapping the keys to strings, so they
>> > are
>> > > > > > > human-readable. If this is a production use case, then you
>> might
>> > > want
>> > > > > to
>> > > > > > > use a more self-documenting format like JSON or AVRO. Just my
>> two
>> > > > > cents.
>> > > > > > >
>> > > > > > > I hope this helps!
>> > > > > > > -John
>> > > > > > >
>> > > > > > > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
>> > > > > > > > I keep getting '*ambiguous implicit values*' message in the
>> > > following
>> > > > > > > code.
>> > > > > > > > I tried several things (as can be seen from a couple of
>> lines
>> > > I've
>> > > > > > > > commented out). Any ideas on how to fix this? This is in
>> > *Scala*.
>> > > > > > > >
>> > > > > > > >  def createTopology(conf: Config, properties: Properties):
>> > > Topology =
>> > > > > > > > {//    implicit val sessionSerde =
>> > > > > > > > Serde[WindowedSerdes.SessionWindowedSerde[String]]//
>> > implicit
>> > > val
>> > > > > > > > produced: Produced[Windowed[String], Long] =
>> > > > > > > > Produced.`with`[WindowedSerdes.SessionWindowedSerde[String],
>> > > Long]
>> > > > > > > >     implicit val produced: Produced[Windowed[String], Long]
>> =
>> > > > > > > > Produced.`with`[Windowed[String], Long]
>> > > > > > > >     implicit val consumed: Consumed[String, String] =
>> > > > > > > > Consumed.`with`[String, String]
>> > > > > > > >
>> > > > > > > >     val builder: StreamsBuilder = new StreamsBuilder()
>> > > > > > > >     builder.stream("streams-plaintext-input")
>> > > > > > > >         .groupBy((_, word) => word)
>> > > > > > > >
>>  .windowedBy(SessionWindows.`with`(Duration.ofMillis(60
>> > *
>> > > > > 1000)))
>> > > > > > > >         .count()
>> > > > > > > >         .toStream.to("streams-pipe-output")
>> > > > > > > >
>> > > > > > > >     builder.build()
>> > > > > > > >
>> > > > > > > >   }
>> > > > > > > >
>> > > > > > > > *Compiler Errors:*
>> > > > > > > >
>> > > > > > > > Error:(52, 78) ambiguous implicit values:
>> > > > > > > >  both method timeWindowedSerde in object Serdes of type
>> > > [T](implicit
>> > > > > > > > tSerde:
>> > > > > > > >
>> > > > > > >
>> > > > >
>> > >
>> >
>> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
>> > > > > > > >  and method sessionWindowedSerde in object Serdes of type
>> > > > > [T](implicit
>> > > > > > > > tSerde:
>> > > > > > > >
>> > > > > > >
>> > > > >
>> > >
>> >
>> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
>> > > > > > > >  match expected type
>> > > > > > > >
>> > > > > > >
>> > > > >
>> > >
>> >
>> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
>> > > > > > > >     implicit val produced: Produced[Windowed[String], Long]
>> =
>> > > > > > > > Produced.`with`[Windowed[String], Long]
>> > > > > > > > Error:(52, 78) could not find implicit value for parameter
>> > > keySerde:
>> > > > > > > >
>> > > > > > >
>> > > > >
>> > >
>> >
>> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
>> > > > > > > >     implicit val produced: Produced[Windowed[String], Long]
>> =
>> > > > > > > > Produced.`with`[Windowed[String], Long]
>> > > > > > > > Error:(52, 78) not enough arguments for method with:
>> (implicit
>> > > > > > > > keySerde:
>> > > > > > > >
>> > > > > > >
>> > > > >
>> > >
>> >
>> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]],
>> > > > > > > > implicit valueSerde:
>> > > > > > > >
>> > > > > > >
>> > > > >
>> > >
>> >
>> org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified
>> > > > > > > > value parameters keySerde, valueSerde.
>> > > > > > > >     implicit val produced: Produced[Windowed[String], Long]
>> =
>> > > > > > > > Produced.`with`[Windowed[String], Long]
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to