By the way. It was even cleaner than that.  I decided on a hunch this
morning that there has to be something simpler and cleaner Checked the
documentation and there it was.

You can just import these two statements and just do the stream without
implicit bindings:

import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _}

Check the code on the repository I just did a push. Let me know what you
think.

Danno!



On Fri, Nov 20, 2020 at 12:58 AM Daniel Hinojosa <
dhinoj...@evolutionnext.com> wrote:

> FYI, my goal was to get it to compile, since that is where
> implicit resolution takes place. Obviously, you would need to plug in the
> properties, start the stream, add a shutdown hook, etc.
>
> On Fri, Nov 20, 2020 at 12:56 AM Daniel Hinojosa <
> dhinoj...@evolutionnext.com> wrote:
>
>> Done.  I am using SBT for the build tool.  I created an application and
>> put it here
>> https://github.com/dhinojosa/kafka-streams-scala
>>
>> The implicits for SessionWindowedSerde from this line below. The reason
>> is I am turning off the timeWindowSerde, and that just leaves the
>> SessionWindowSerde, the last underscore is to include the rest from this
>> package.
>>
>> import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _}
>>
>> Everything should be in the project, it is now a 2.11.12 and Kafka
>> 2.4.0.  The only thing you may want to change is the SBT version in
>> project/build.properties. :)
>>
>>
>> On Thu, Nov 19, 2020 at 10:06 PM Eric Beabes <mailinglist...@gmail.com>
>> wrote:
>>
>>> Daniel,
>>>
>>> I copied your code from here:
>>> https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
>>>
>>> Still getting the same error message. Actually, I am not sure why it
>>> would
>>> work since you don't have implicits for SessionWindowedSerde. By the way,
>>> which versions of Scala & Kafka did you use? Maybe that's the issue. I am
>>> using following versions:
>>>
>>> *Scala: 2.11.12*
>>> *Kafka: 2.4.0*
>>>
>>> Can you please try with these versions & let me know if it still works
>>> for
>>> you? Thanks for your help.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Nov 19, 2020 at 5:16 PM Liam Clarke-Hutchinson <
>>> liam.cla...@adscale.co.nz> wrote:
>>>
>>> > That said John, nothing wrong with being explicit in code. :)
>>> >
>>> > On Fri, 20 Nov. 2020, 1:46 pm John Roesler, <vvcep...@apache.org>
>>> wrote:
>>> >
>>> > > Oh, nice. Thanks, Daniel!
>>> > >
>>> > > That’s much nicer than my ham-handed approach.
>>> > >
>>> > > Thanks,
>>> > > John
>>> > >
>>> > > On Thu, Nov 19, 2020, at 17:44, Daniel Hinojosa wrote:
>>> > > > Hope this helps, I tried copying your code into a sample
>>> application. I
>>> > > got
>>> > > > it to compile with the implicits all resolving. I think the trick
>>> was
>>> > > there
>>> > > > were two implementations for Windowing Serdes.  You just need to
>>> block
>>> > > one
>>> > > > from the imports.  See if that fits with what you are doing.  Oh
>>> also,
>>> > I
>>> > > > noticed that the types were not resolving when calling
>>> builder.stream,
>>> > > so I
>>> > > > put [String, String] in the builder.  Here is a gist, which formats
>>> > > better.
>>> > > >
>>> > > > https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
>>> > > >
>>> > > > import java.time.Duration
>>> > > > import java.util.Properties
>>> > > >
>>> > > > import org.apache.kafka.common.config.Config
>>> > > > import org.apache.kafka.streams.Topology
>>> > > > import org.apache.kafka.streams.kstream.{SessionWindows, Windowed}
>>> > > > import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde =>
>>> _,
>>> > _}
>>> > > > import org.apache.kafka.streams.scala.kstream.{Consumed, Grouped,
>>> > > > Materialized, Produced}
>>> > > > import org.apache.kafka.streams.scala.{ByteArraySessionStore,
>>> > > StreamsBuilder}
>>> > > >
>>> > > > class SampleStream {
>>> > > >   def createTopology(conf: Config, properties: Properties):
>>> Topology =
>>> > {
>>> > > >
>>> > > >     implicit val produced: Produced[Windowed[String], Long] =
>>> > > >       Produced.`with`[Windowed[String], Long]
>>> > > >
>>> > > >     implicit val grouped: Grouped[String, String] =
>>> > > >       Grouped.`with`[String, String]
>>> > > >
>>> > > >     implicit val consumed: Consumed[String, String] =
>>> > > >       Consumed.`with`[String, String]
>>> > > >
>>> > > >     implicit val materialized: Materialized[String, Long,
>>> > > > ByteArraySessionStore] = Materialized.`with`[String, Long,
>>> > > > ByteArraySessionStore]
>>> > > >
>>> > > >     val builder: StreamsBuilder = new StreamsBuilder()
>>> > > >
>>> > > >     builder
>>> > > >       .stream[String, String]("streams-plaintext-input")
>>> > > >       .groupBy((_, word) => word)
>>> > > >       .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 *
>>> 1000)))
>>> > > >       .count()
>>> > > >       .toStream
>>> > > >       .to("streams-pipe-output")
>>> > > >
>>> > > >     builder.build()
>>> > > >   }
>>> > > > }
>>> > > >
>>> > > >
>>> > > >
>>> > > >
>>> > > > On Thu, Nov 19, 2020 at 7:24 AM John Roesler <vvcep...@apache.org>
>>> > > wrote:
>>> > > >
>>> > > > > Hi Eric,
>>> > > > >
>>> > > > > Sure thing. Assuming the definition of ‘produced’ you had tried
>>> in
>>> > your
>>> > > > > code, it’s just:
>>> > > > >
>>> > > > > ...
>>> > > > > .toStream.to("streams-pipe-output")(produced)
>>> > > > >
>>> > > > > As far as the json serde goes, I think that I wrote an example of
>>> > using
>>> > > > > Jackson to implement a serde in Confluent’s
>>> kafka-streams-examples
>>> > > repo.
>>> > > > > I’m not sure what other/better examples
>>> > > > > might be out there.
>>> > > > >
>>> > > > > Hope this helps,
>>> > > > > John
>>> > > > >
>>> > > > > On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote:
>>> > > > > > Not sure what you mean by "pass it explicitly". The definition
>>> of
>>> > > 'to' is
>>> > > > > > given below. Can we pass it explicitly in this case. If yes,
>>> can
>>> > you
>>> > > > > please
>>> > > > > > show me how?
>>> > > > > >
>>> > > > > > def to(topic: String)(implicit produced: Produced[K, V]): Unit
>>> =
>>> > > > > >   inner.to(topic, produced)
>>> > > > > >
>>> > > > > >
>>> > > > > > Also not sure how to use a self documenting format like JSON.
>>> Any
>>> > > > > > examples to share?
>>> > > > > >
>>> > > > > >
>>> > > > > > On Wed, Nov 18, 2020 at 5:14 PM John Roesler <
>>> vvcep...@apache.org>
>>> > > > > wrote:
>>> > > > > >
>>> > > > > > > Hi Eric,
>>> > > > > > >
>>> > > > > > > Ah, that’s a bummer. The correct serde is the session
>>> windowed
>>> > > serde,
>>> > > > > as I
>>> > > > > > > can see you know. I’m afraid I’m a bit rusty on implicit
>>> > resolution
>>> > > > > rules,
>>> > > > > > > so I can’t be much help there.
>>> > > > > > >
>>> > > > > > > But my general recommendation for implicits is that when
>>> things
>>> > get
>>> > > > > weird,
>>> > > > > > > just don’t use them at all. For example, you can just
>>> explicitly
>>> > > pass
>>> > > > > the
>>> > > > > > > Produced in the second arg list of ‘to’.
>>> > > > > > >
>>> > > > > > > One other tip is that the serialized form produced by those
>>> > serdes
>>> > > is
>>> > > > > kind
>>> > > > > > > of specialized and might not be the most convenient for your
>>> use.
>>> > > If
>>> > > > > this
>>> > > > > > > is just a POC, if suggest mapping the keys to strings, so
>>> they
>>> > are
>>> > > > > > > human-readable. If this is a production use case, then you
>>> might
>>> > > want
>>> > > > > to
>>> > > > > > > use a more self-documenting format like JSON or AVRO. Just
>>> my two
>>> > > > > cents.
>>> > > > > > >
>>> > > > > > > I hope this helps!
>>> > > > > > > -John
>>> > > > > > >
>>> > > > > > > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
>>> > > > > > > > I keep getting '*ambiguous implicit values*' message in the
>>> > > following
>>> > > > > > > code.
>>> > > > > > > > I tried several things (as can be seen from a couple of
>>> lines
>>> > > I've
>>> > > > > > > > commented out). Any ideas on how to fix this? This is in
>>> > *Scala*.
>>> > > > > > > >
>>> > > > > > > >  def createTopology(conf: Config, properties: Properties):
>>> > > Topology =
>>> > > > > > > > {//    implicit val sessionSerde =
>>> > > > > > > > Serde[WindowedSerdes.SessionWindowedSerde[String]]//
>>> > implicit
>>> > > val
>>> > > > > > > > produced: Produced[Windowed[String], Long] =
>>> > > > > > > >
>>> Produced.`with`[WindowedSerdes.SessionWindowedSerde[String],
>>> > > Long]
>>> > > > > > > >     implicit val produced: Produced[Windowed[String],
>>> Long] =
>>> > > > > > > > Produced.`with`[Windowed[String], Long]
>>> > > > > > > >     implicit val consumed: Consumed[String, String] =
>>> > > > > > > > Consumed.`with`[String, String]
>>> > > > > > > >
>>> > > > > > > >     val builder: StreamsBuilder = new StreamsBuilder()
>>> > > > > > > >     builder.stream("streams-plaintext-input")
>>> > > > > > > >         .groupBy((_, word) => word)
>>> > > > > > > >
>>>  .windowedBy(SessionWindows.`with`(Duration.ofMillis(60
>>> > *
>>> > > > > 1000)))
>>> > > > > > > >         .count()
>>> > > > > > > >         .toStream.to("streams-pipe-output")
>>> > > > > > > >
>>> > > > > > > >     builder.build()
>>> > > > > > > >
>>> > > > > > > >   }
>>> > > > > > > >
>>> > > > > > > > *Compiler Errors:*
>>> > > > > > > >
>>> > > > > > > > Error:(52, 78) ambiguous implicit values:
>>> > > > > > > >  both method timeWindowedSerde in object Serdes of type
>>> > > [T](implicit
>>> > > > > > > > tSerde:
>>> > > > > > > >
>>> > > > > > >
>>> > > > >
>>> > >
>>> >
>>> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
>>> > > > > > > >  and method sessionWindowedSerde in object Serdes of type
>>> > > > > [T](implicit
>>> > > > > > > > tSerde:
>>> > > > > > > >
>>> > > > > > >
>>> > > > >
>>> > >
>>> >
>>> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
>>> > > > > > > >  match expected type
>>> > > > > > > >
>>> > > > > > >
>>> > > > >
>>> > >
>>> >
>>> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
>>> > > > > > > >     implicit val produced: Produced[Windowed[String],
>>> Long] =
>>> > > > > > > > Produced.`with`[Windowed[String], Long]
>>> > > > > > > > Error:(52, 78) could not find implicit value for parameter
>>> > > keySerde:
>>> > > > > > > >
>>> > > > > > >
>>> > > > >
>>> > >
>>> >
>>> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
>>> > > > > > > >     implicit val produced: Produced[Windowed[String],
>>> Long] =
>>> > > > > > > > Produced.`with`[Windowed[String], Long]
>>> > > > > > > > Error:(52, 78) not enough arguments for method with:
>>> (implicit
>>> > > > > > > > keySerde:
>>> > > > > > > >
>>> > > > > > >
>>> > > > >
>>> > >
>>> >
>>> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]],
>>> > > > > > > > implicit valueSerde:
>>> > > > > > > >
>>> > > > > > >
>>> > > > >
>>> > >
>>> >
>>> org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified
>>> > > > > > > > value parameters keySerde, valueSerde.
>>> > > > > > > >     implicit val produced: Produced[Windowed[String],
>>> Long] =
>>> > > > > > > > Produced.`with`[Windowed[String], Long]
>>> > > > > > > >
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to