Daniel,

Thanks again. Accidentally sent the previous message before finishing
typing the whole thing. Anyway, I've so much to learn about these
'implicits' but I generally get the idea.

It's compiling fine now. It's sending out "blank" messages but that's the
next challenge.

Thanks again.




On Fri, Nov 20, 2020 at 12:40 PM Eric Beabes <mailinglist...@gmail.com>
wrote:

> Wow. This is amazing Daniel. THANKS A LOT!
>
> On Fri, Nov 20, 2020 at 9:44 AM Daniel Hinojosa <
> dhinoj...@evolutionnext.com> wrote:
>
>> By the way. It was even cleaner than that.  I decided on a hunch this
>> morning that there has to be something simpler and cleaner Checked the
>> documentation and there it was.
>>
>> You can just import these two statements and just do the stream without
>> implicit bindings:
>>
>> import org.apache.kafka.streams.scala.ImplicitConversions._
>> import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _}
>>
>> Check the code on the repository I just did a push. Let me know what you
>> think.
>>
>> Danno!
>>
>>
>>
>> On Fri, Nov 20, 2020 at 12:58 AM Daniel Hinojosa <
>> dhinoj...@evolutionnext.com> wrote:
>>
>> > FYI, my goal was to get it to compile, since that is where
>> > implicit resolution takes place. Obviously, you would need to plug in
>> the
>> > properties, start the stream, add a shutdown hook, etc.
>> >
>> > On Fri, Nov 20, 2020 at 12:56 AM Daniel Hinojosa <
>> > dhinoj...@evolutionnext.com> wrote:
>> >
>> >> Done.  I am using SBT for the build tool.  I created an application and
>> >> put it here
>> >> https://github.com/dhinojosa/kafka-streams-scala
>> >>
>> >> The implicits for SessionWindowedSerde from this line below. The reason
>> >> is I am turning off the timeWindowSerde, and that just leaves the
>> >> SessionWindowSerde, the last underscore is to include the rest from
>> this
>> >> package.
>> >>
>> >> import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _,
>> _}
>> >>
>> >> Everything should be in the project, it is now a 2.11.12 and Kafka
>> >> 2.4.0.  The only thing you may want to change is the SBT version in
>> >> project/build.properties. :)
>> >>
>> >>
>> >> On Thu, Nov 19, 2020 at 10:06 PM Eric Beabes <mailinglist...@gmail.com
>> >
>> >> wrote:
>> >>
>> >>> Daniel,
>> >>>
>> >>> I copied your code from here:
>> >>> https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
>> >>>
>> >>> Still getting the same error message. Actually, I am not sure why it
>> >>> would
>> >>> work since you don't have implicits for SessionWindowedSerde. By the
>> way,
>> >>> which versions of Scala & Kafka did you use? Maybe that's the issue.
>> I am
>> >>> using following versions:
>> >>>
>> >>> *Scala: 2.11.12*
>> >>> *Kafka: 2.4.0*
>> >>>
>> >>> Can you please try with these versions & let me know if it still works
>> >>> for
>> >>> you? Thanks for your help.
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> On Thu, Nov 19, 2020 at 5:16 PM Liam Clarke-Hutchinson <
>> >>> liam.cla...@adscale.co.nz> wrote:
>> >>>
>> >>> > That said John, nothing wrong with being explicit in code. :)
>> >>> >
>> >>> > On Fri, 20 Nov. 2020, 1:46 pm John Roesler, <vvcep...@apache.org>
>> >>> wrote:
>> >>> >
>> >>> > > Oh, nice. Thanks, Daniel!
>> >>> > >
>> >>> > > That’s much nicer than my ham-handed approach.
>> >>> > >
>> >>> > > Thanks,
>> >>> > > John
>> >>> > >
>> >>> > > On Thu, Nov 19, 2020, at 17:44, Daniel Hinojosa wrote:
>> >>> > > > Hope this helps, I tried copying your code into a sample
>> >>> application. I
>> >>> > > got
>> >>> > > > it to compile with the implicits all resolving. I think the
>> trick
>> >>> was
>> >>> > > there
>> >>> > > > were two implementations for Windowing Serdes.  You just need to
>> >>> block
>> >>> > > one
>> >>> > > > from the imports.  See if that fits with what you are doing.  Oh
>> >>> also,
>> >>> > I
>> >>> > > > noticed that the types were not resolving when calling
>> >>> builder.stream,
>> >>> > > so I
>> >>> > > > put [String, String] in the builder.  Here is a gist, which
>> formats
>> >>> > > better.
>> >>> > > >
>> >>> > > >
>> https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
>> >>> > > >
>> >>> > > > import java.time.Duration
>> >>> > > > import java.util.Properties
>> >>> > > >
>> >>> > > > import org.apache.kafka.common.config.Config
>> >>> > > > import org.apache.kafka.streams.Topology
>> >>> > > > import org.apache.kafka.streams.kstream.{SessionWindows,
>> Windowed}
>> >>> > > > import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde
>> =>
>> >>> _,
>> >>> > _}
>> >>> > > > import org.apache.kafka.streams.scala.kstream.{Consumed,
>> Grouped,
>> >>> > > > Materialized, Produced}
>> >>> > > > import org.apache.kafka.streams.scala.{ByteArraySessionStore,
>> >>> > > StreamsBuilder}
>> >>> > > >
>> >>> > > > class SampleStream {
>> >>> > > >   def createTopology(conf: Config, properties: Properties):
>> >>> Topology =
>> >>> > {
>> >>> > > >
>> >>> > > >     implicit val produced: Produced[Windowed[String], Long] =
>> >>> > > >       Produced.`with`[Windowed[String], Long]
>> >>> > > >
>> >>> > > >     implicit val grouped: Grouped[String, String] =
>> >>> > > >       Grouped.`with`[String, String]
>> >>> > > >
>> >>> > > >     implicit val consumed: Consumed[String, String] =
>> >>> > > >       Consumed.`with`[String, String]
>> >>> > > >
>> >>> > > >     implicit val materialized: Materialized[String, Long,
>> >>> > > > ByteArraySessionStore] = Materialized.`with`[String, Long,
>> >>> > > > ByteArraySessionStore]
>> >>> > > >
>> >>> > > >     val builder: StreamsBuilder = new StreamsBuilder()
>> >>> > > >
>> >>> > > >     builder
>> >>> > > >       .stream[String, String]("streams-plaintext-input")
>> >>> > > >       .groupBy((_, word) => word)
>> >>> > > >       .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 *
>> >>> 1000)))
>> >>> > > >       .count()
>> >>> > > >       .toStream
>> >>> > > >       .to("streams-pipe-output")
>> >>> > > >
>> >>> > > >     builder.build()
>> >>> > > >   }
>> >>> > > > }
>> >>> > > >
>> >>> > > >
>> >>> > > >
>> >>> > > >
>> >>> > > > On Thu, Nov 19, 2020 at 7:24 AM John Roesler <
>> vvcep...@apache.org>
>> >>> > > wrote:
>> >>> > > >
>> >>> > > > > Hi Eric,
>> >>> > > > >
>> >>> > > > > Sure thing. Assuming the definition of ‘produced’ you had
>> tried
>> >>> in
>> >>> > your
>> >>> > > > > code, it’s just:
>> >>> > > > >
>> >>> > > > > ...
>> >>> > > > > .toStream.to("streams-pipe-output")(produced)
>> >>> > > > >
>> >>> > > > > As far as the json serde goes, I think that I wrote an
>> example of
>> >>> > using
>> >>> > > > > Jackson to implement a serde in Confluent’s
>> >>> kafka-streams-examples
>> >>> > > repo.
>> >>> > > > > I’m not sure what other/better examples
>> >>> > > > > might be out there.
>> >>> > > > >
>> >>> > > > > Hope this helps,
>> >>> > > > > John
>> >>> > > > >
>> >>> > > > > On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote:
>> >>> > > > > > Not sure what you mean by "pass it explicitly". The
>> definition
>> >>> of
>> >>> > > 'to' is
>> >>> > > > > > given below. Can we pass it explicitly in this case. If yes,
>> >>> can
>> >>> > you
>> >>> > > > > please
>> >>> > > > > > show me how?
>> >>> > > > > >
>> >>> > > > > > def to(topic: String)(implicit produced: Produced[K, V]):
>> Unit
>> >>> =
>> >>> > > > > >   inner.to(topic, produced)
>> >>> > > > > >
>> >>> > > > > >
>> >>> > > > > > Also not sure how to use a self documenting format like
>> JSON.
>> >>> Any
>> >>> > > > > > examples to share?
>> >>> > > > > >
>> >>> > > > > >
>> >>> > > > > > On Wed, Nov 18, 2020 at 5:14 PM John Roesler <
>> >>> vvcep...@apache.org>
>> >>> > > > > wrote:
>> >>> > > > > >
>> >>> > > > > > > Hi Eric,
>> >>> > > > > > >
>> >>> > > > > > > Ah, that’s a bummer. The correct serde is the session
>> >>> windowed
>> >>> > > serde,
>> >>> > > > > as I
>> >>> > > > > > > can see you know. I’m afraid I’m a bit rusty on implicit
>> >>> > resolution
>> >>> > > > > rules,
>> >>> > > > > > > so I can’t be much help there.
>> >>> > > > > > >
>> >>> > > > > > > But my general recommendation for implicits is that when
>> >>> things
>> >>> > get
>> >>> > > > > weird,
>> >>> > > > > > > just don’t use them at all. For example, you can just
>> >>> explicitly
>> >>> > > pass
>> >>> > > > > the
>> >>> > > > > > > Produced in the second arg list of ‘to’.
>> >>> > > > > > >
>> >>> > > > > > > One other tip is that the serialized form produced by
>> those
>> >>> > serdes
>> >>> > > is
>> >>> > > > > kind
>> >>> > > > > > > of specialized and might not be the most convenient for
>> your
>> >>> use.
>> >>> > > If
>> >>> > > > > this
>> >>> > > > > > > is just a POC, if suggest mapping the keys to strings, so
>> >>> they
>> >>> > are
>> >>> > > > > > > human-readable. If this is a production use case, then you
>> >>> might
>> >>> > > want
>> >>> > > > > to
>> >>> > > > > > > use a more self-documenting format like JSON or AVRO. Just
>> >>> my two
>> >>> > > > > cents.
>> >>> > > > > > >
>> >>> > > > > > > I hope this helps!
>> >>> > > > > > > -John
>> >>> > > > > > >
>> >>> > > > > > > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
>> >>> > > > > > > > I keep getting '*ambiguous implicit values*' message in
>> the
>> >>> > > following
>> >>> > > > > > > code.
>> >>> > > > > > > > I tried several things (as can be seen from a couple of
>> >>> lines
>> >>> > > I've
>> >>> > > > > > > > commented out). Any ideas on how to fix this? This is in
>> >>> > *Scala*.
>> >>> > > > > > > >
>> >>> > > > > > > >  def createTopology(conf: Config, properties:
>> Properties):
>> >>> > > Topology =
>> >>> > > > > > > > {//    implicit val sessionSerde =
>> >>> > > > > > > > Serde[WindowedSerdes.SessionWindowedSerde[String]]//
>> >>> > implicit
>> >>> > > val
>> >>> > > > > > > > produced: Produced[Windowed[String], Long] =
>> >>> > > > > > > >
>> >>> Produced.`with`[WindowedSerdes.SessionWindowedSerde[String],
>> >>> > > Long]
>> >>> > > > > > > >     implicit val produced: Produced[Windowed[String],
>> >>> Long] =
>> >>> > > > > > > > Produced.`with`[Windowed[String], Long]
>> >>> > > > > > > >     implicit val consumed: Consumed[String, String] =
>> >>> > > > > > > > Consumed.`with`[String, String]
>> >>> > > > > > > >
>> >>> > > > > > > >     val builder: StreamsBuilder = new StreamsBuilder()
>> >>> > > > > > > >     builder.stream("streams-plaintext-input")
>> >>> > > > > > > >         .groupBy((_, word) => word)
>> >>> > > > > > > >
>> >>>  .windowedBy(SessionWindows.`with`(Duration.ofMillis(60
>> >>> > *
>> >>> > > > > 1000)))
>> >>> > > > > > > >         .count()
>> >>> > > > > > > >         .toStream.to("streams-pipe-output")
>> >>> > > > > > > >
>> >>> > > > > > > >     builder.build()
>> >>> > > > > > > >
>> >>> > > > > > > >   }
>> >>> > > > > > > >
>> >>> > > > > > > > *Compiler Errors:*
>> >>> > > > > > > >
>> >>> > > > > > > > Error:(52, 78) ambiguous implicit values:
>> >>> > > > > > > >  both method timeWindowedSerde in object Serdes of type
>> >>> > > [T](implicit
>> >>> > > > > > > > tSerde:
>> >>> > > > > > > >
>> >>> > > > > > >
>> >>> > > > >
>> >>> > >
>> >>> >
>> >>>
>> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
>> >>> > > > > > > >  and method sessionWindowedSerde in object Serdes of
>> type
>> >>> > > > > [T](implicit
>> >>> > > > > > > > tSerde:
>> >>> > > > > > > >
>> >>> > > > > > >
>> >>> > > > >
>> >>> > >
>> >>> >
>> >>>
>> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
>> >>> > > > > > > >  match expected type
>> >>> > > > > > > >
>> >>> > > > > > >
>> >>> > > > >
>> >>> > >
>> >>> >
>> >>>
>> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
>> >>> > > > > > > >     implicit val produced: Produced[Windowed[String],
>> >>> Long] =
>> >>> > > > > > > > Produced.`with`[Windowed[String], Long]
>> >>> > > > > > > > Error:(52, 78) could not find implicit value for
>> parameter
>> >>> > > keySerde:
>> >>> > > > > > > >
>> >>> > > > > > >
>> >>> > > > >
>> >>> > >
>> >>> >
>> >>>
>> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
>> >>> > > > > > > >     implicit val produced: Produced[Windowed[String],
>> >>> Long] =
>> >>> > > > > > > > Produced.`with`[Windowed[String], Long]
>> >>> > > > > > > > Error:(52, 78) not enough arguments for method with:
>> >>> (implicit
>> >>> > > > > > > > keySerde:
>> >>> > > > > > > >
>> >>> > > > > > >
>> >>> > > > >
>> >>> > >
>> >>> >
>> >>>
>> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]],
>> >>> > > > > > > > implicit valueSerde:
>> >>> > > > > > > >
>> >>> > > > > > >
>> >>> > > > >
>> >>> > >
>> >>> >
>> >>>
>> org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified
>> >>> > > > > > > > value parameters keySerde, valueSerde.
>> >>> > > > > > > >     implicit val produced: Produced[Windowed[String],
>> >>> Long] =
>> >>> > > > > > > > Produced.`with`[Windowed[String], Long]
>> >>> > > > > > > >
>> >>> > > > > > >
>> >>> > > > > >
>> >>> > > > >
>> >>> > > >
>> >>> > >
>> >>> >
>> >>>
>> >>
>>
>

Reply via email to