Daniel, Thanks again. Accidentally sent the previous message before finishing typing the whole thing. Anyway, I've so much to learn about these 'implicits' but I generally get the idea.
It's compiling fine now. It's sending out "blank" messages but that's the next challenge. Thanks again. On Fri, Nov 20, 2020 at 12:40 PM Eric Beabes <[email protected]> wrote: > Wow. This is amazing Daniel. THANKS A LOT! > > On Fri, Nov 20, 2020 at 9:44 AM Daniel Hinojosa < > [email protected]> wrote: > >> By the way. It was even cleaner than that. I decided on a hunch this >> morning that there has to be something simpler and cleaner Checked the >> documentation and there it was. >> >> You can just import these two statements and just do the stream without >> implicit bindings: >> >> import org.apache.kafka.streams.scala.ImplicitConversions._ >> import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _} >> >> Check the code on the repository I just did a push. Let me know what you >> think. >> >> Danno! >> >> >> >> On Fri, Nov 20, 2020 at 12:58 AM Daniel Hinojosa < >> [email protected]> wrote: >> >> > FYI, my goal was to get it to compile, since that is where >> > implicit resolution takes place. Obviously, you would need to plug in >> the >> > properties, start the stream, add a shutdown hook, etc. >> > >> > On Fri, Nov 20, 2020 at 12:56 AM Daniel Hinojosa < >> > [email protected]> wrote: >> > >> >> Done. I am using SBT for the build tool. I created an application and >> >> put it here >> >> https://github.com/dhinojosa/kafka-streams-scala >> >> >> >> The implicits for SessionWindowedSerde from this line below. The reason >> >> is I am turning off the timeWindowSerde, and that just leaves the >> >> SessionWindowSerde, the last underscore is to include the rest from >> this >> >> package. >> >> >> >> import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, >> _} >> >> >> >> Everything should be in the project, it is now a 2.11.12 and Kafka >> >> 2.4.0. The only thing you may want to change is the SBT version in >> >> project/build.properties. :) >> >> >> >> >> >> On Thu, Nov 19, 2020 at 10:06 PM Eric Beabes <[email protected] >> > >> >> wrote: >> >> >> >>> Daniel, >> >>> >> >>> I copied your code from here: >> >>> https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735 >> >>> >> >>> Still getting the same error message. Actually, I am not sure why it >> >>> would >> >>> work since you don't have implicits for SessionWindowedSerde. By the >> way, >> >>> which versions of Scala & Kafka did you use? Maybe that's the issue. >> I am >> >>> using following versions: >> >>> >> >>> *Scala: 2.11.12* >> >>> *Kafka: 2.4.0* >> >>> >> >>> Can you please try with these versions & let me know if it still works >> >>> for >> >>> you? Thanks for your help. >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> On Thu, Nov 19, 2020 at 5:16 PM Liam Clarke-Hutchinson < >> >>> [email protected]> wrote: >> >>> >> >>> > That said John, nothing wrong with being explicit in code. :) >> >>> > >> >>> > On Fri, 20 Nov. 2020, 1:46 pm John Roesler, <[email protected]> >> >>> wrote: >> >>> > >> >>> > > Oh, nice. Thanks, Daniel! >> >>> > > >> >>> > > That’s much nicer than my ham-handed approach. >> >>> > > >> >>> > > Thanks, >> >>> > > John >> >>> > > >> >>> > > On Thu, Nov 19, 2020, at 17:44, Daniel Hinojosa wrote: >> >>> > > > Hope this helps, I tried copying your code into a sample >> >>> application. I >> >>> > > got >> >>> > > > it to compile with the implicits all resolving. I think the >> trick >> >>> was >> >>> > > there >> >>> > > > were two implementations for Windowing Serdes. You just need to >> >>> block >> >>> > > one >> >>> > > > from the imports. See if that fits with what you are doing. Oh >> >>> also, >> >>> > I >> >>> > > > noticed that the types were not resolving when calling >> >>> builder.stream, >> >>> > > so I >> >>> > > > put [String, String] in the builder. Here is a gist, which >> formats >> >>> > > better. >> >>> > > > >> >>> > > > >> https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735 >> >>> > > > >> >>> > > > import java.time.Duration >> >>> > > > import java.util.Properties >> >>> > > > >> >>> > > > import org.apache.kafka.common.config.Config >> >>> > > > import org.apache.kafka.streams.Topology >> >>> > > > import org.apache.kafka.streams.kstream.{SessionWindows, >> Windowed} >> >>> > > > import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde >> => >> >>> _, >> >>> > _} >> >>> > > > import org.apache.kafka.streams.scala.kstream.{Consumed, >> Grouped, >> >>> > > > Materialized, Produced} >> >>> > > > import org.apache.kafka.streams.scala.{ByteArraySessionStore, >> >>> > > StreamsBuilder} >> >>> > > > >> >>> > > > class SampleStream { >> >>> > > > def createTopology(conf: Config, properties: Properties): >> >>> Topology = >> >>> > { >> >>> > > > >> >>> > > > implicit val produced: Produced[Windowed[String], Long] = >> >>> > > > Produced.`with`[Windowed[String], Long] >> >>> > > > >> >>> > > > implicit val grouped: Grouped[String, String] = >> >>> > > > Grouped.`with`[String, String] >> >>> > > > >> >>> > > > implicit val consumed: Consumed[String, String] = >> >>> > > > Consumed.`with`[String, String] >> >>> > > > >> >>> > > > implicit val materialized: Materialized[String, Long, >> >>> > > > ByteArraySessionStore] = Materialized.`with`[String, Long, >> >>> > > > ByteArraySessionStore] >> >>> > > > >> >>> > > > val builder: StreamsBuilder = new StreamsBuilder() >> >>> > > > >> >>> > > > builder >> >>> > > > .stream[String, String]("streams-plaintext-input") >> >>> > > > .groupBy((_, word) => word) >> >>> > > > .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * >> >>> 1000))) >> >>> > > > .count() >> >>> > > > .toStream >> >>> > > > .to("streams-pipe-output") >> >>> > > > >> >>> > > > builder.build() >> >>> > > > } >> >>> > > > } >> >>> > > > >> >>> > > > >> >>> > > > >> >>> > > > >> >>> > > > On Thu, Nov 19, 2020 at 7:24 AM John Roesler < >> [email protected]> >> >>> > > wrote: >> >>> > > > >> >>> > > > > Hi Eric, >> >>> > > > > >> >>> > > > > Sure thing. Assuming the definition of ‘produced’ you had >> tried >> >>> in >> >>> > your >> >>> > > > > code, it’s just: >> >>> > > > > >> >>> > > > > ... >> >>> > > > > .toStream.to("streams-pipe-output")(produced) >> >>> > > > > >> >>> > > > > As far as the json serde goes, I think that I wrote an >> example of >> >>> > using >> >>> > > > > Jackson to implement a serde in Confluent’s >> >>> kafka-streams-examples >> >>> > > repo. >> >>> > > > > I’m not sure what other/better examples >> >>> > > > > might be out there. >> >>> > > > > >> >>> > > > > Hope this helps, >> >>> > > > > John >> >>> > > > > >> >>> > > > > On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote: >> >>> > > > > > Not sure what you mean by "pass it explicitly". The >> definition >> >>> of >> >>> > > 'to' is >> >>> > > > > > given below. Can we pass it explicitly in this case. If yes, >> >>> can >> >>> > you >> >>> > > > > please >> >>> > > > > > show me how? >> >>> > > > > > >> >>> > > > > > def to(topic: String)(implicit produced: Produced[K, V]): >> Unit >> >>> = >> >>> > > > > > inner.to(topic, produced) >> >>> > > > > > >> >>> > > > > > >> >>> > > > > > Also not sure how to use a self documenting format like >> JSON. >> >>> Any >> >>> > > > > > examples to share? >> >>> > > > > > >> >>> > > > > > >> >>> > > > > > On Wed, Nov 18, 2020 at 5:14 PM John Roesler < >> >>> [email protected]> >> >>> > > > > wrote: >> >>> > > > > > >> >>> > > > > > > Hi Eric, >> >>> > > > > > > >> >>> > > > > > > Ah, that’s a bummer. The correct serde is the session >> >>> windowed >> >>> > > serde, >> >>> > > > > as I >> >>> > > > > > > can see you know. I’m afraid I’m a bit rusty on implicit >> >>> > resolution >> >>> > > > > rules, >> >>> > > > > > > so I can’t be much help there. >> >>> > > > > > > >> >>> > > > > > > But my general recommendation for implicits is that when >> >>> things >> >>> > get >> >>> > > > > weird, >> >>> > > > > > > just don’t use them at all. For example, you can just >> >>> explicitly >> >>> > > pass >> >>> > > > > the >> >>> > > > > > > Produced in the second arg list of ‘to’. >> >>> > > > > > > >> >>> > > > > > > One other tip is that the serialized form produced by >> those >> >>> > serdes >> >>> > > is >> >>> > > > > kind >> >>> > > > > > > of specialized and might not be the most convenient for >> your >> >>> use. >> >>> > > If >> >>> > > > > this >> >>> > > > > > > is just a POC, if suggest mapping the keys to strings, so >> >>> they >> >>> > are >> >>> > > > > > > human-readable. If this is a production use case, then you >> >>> might >> >>> > > want >> >>> > > > > to >> >>> > > > > > > use a more self-documenting format like JSON or AVRO. Just >> >>> my two >> >>> > > > > cents. >> >>> > > > > > > >> >>> > > > > > > I hope this helps! >> >>> > > > > > > -John >> >>> > > > > > > >> >>> > > > > > > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote: >> >>> > > > > > > > I keep getting '*ambiguous implicit values*' message in >> the >> >>> > > following >> >>> > > > > > > code. >> >>> > > > > > > > I tried several things (as can be seen from a couple of >> >>> lines >> >>> > > I've >> >>> > > > > > > > commented out). Any ideas on how to fix this? This is in >> >>> > *Scala*. >> >>> > > > > > > > >> >>> > > > > > > > def createTopology(conf: Config, properties: >> Properties): >> >>> > > Topology = >> >>> > > > > > > > {// implicit val sessionSerde = >> >>> > > > > > > > Serde[WindowedSerdes.SessionWindowedSerde[String]]// >> >>> > implicit >> >>> > > val >> >>> > > > > > > > produced: Produced[Windowed[String], Long] = >> >>> > > > > > > > >> >>> Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], >> >>> > > Long] >> >>> > > > > > > > implicit val produced: Produced[Windowed[String], >> >>> Long] = >> >>> > > > > > > > Produced.`with`[Windowed[String], Long] >> >>> > > > > > > > implicit val consumed: Consumed[String, String] = >> >>> > > > > > > > Consumed.`with`[String, String] >> >>> > > > > > > > >> >>> > > > > > > > val builder: StreamsBuilder = new StreamsBuilder() >> >>> > > > > > > > builder.stream("streams-plaintext-input") >> >>> > > > > > > > .groupBy((_, word) => word) >> >>> > > > > > > > >> >>> .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 >> >>> > * >> >>> > > > > 1000))) >> >>> > > > > > > > .count() >> >>> > > > > > > > .toStream.to("streams-pipe-output") >> >>> > > > > > > > >> >>> > > > > > > > builder.build() >> >>> > > > > > > > >> >>> > > > > > > > } >> >>> > > > > > > > >> >>> > > > > > > > *Compiler Errors:* >> >>> > > > > > > > >> >>> > > > > > > > Error:(52, 78) ambiguous implicit values: >> >>> > > > > > > > both method timeWindowedSerde in object Serdes of type >> >>> > > [T](implicit >> >>> > > > > > > > tSerde: >> >>> > > > > > > > >> >>> > > > > > > >> >>> > > > > >> >>> > > >> >>> > >> >>> >> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T] >> >>> > > > > > > > and method sessionWindowedSerde in object Serdes of >> type >> >>> > > > > [T](implicit >> >>> > > > > > > > tSerde: >> >>> > > > > > > > >> >>> > > > > > > >> >>> > > > > >> >>> > > >> >>> > >> >>> >> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T] >> >>> > > > > > > > match expected type >> >>> > > > > > > > >> >>> > > > > > > >> >>> > > > > >> >>> > > >> >>> > >> >>> >> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]] >> >>> > > > > > > > implicit val produced: Produced[Windowed[String], >> >>> Long] = >> >>> > > > > > > > Produced.`with`[Windowed[String], Long] >> >>> > > > > > > > Error:(52, 78) could not find implicit value for >> parameter >> >>> > > keySerde: >> >>> > > > > > > > >> >>> > > > > > > >> >>> > > > > >> >>> > > >> >>> > >> >>> >> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]] >> >>> > > > > > > > implicit val produced: Produced[Windowed[String], >> >>> Long] = >> >>> > > > > > > > Produced.`with`[Windowed[String], Long] >> >>> > > > > > > > Error:(52, 78) not enough arguments for method with: >> >>> (implicit >> >>> > > > > > > > keySerde: >> >>> > > > > > > > >> >>> > > > > > > >> >>> > > > > >> >>> > > >> >>> > >> >>> >> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]], >> >>> > > > > > > > implicit valueSerde: >> >>> > > > > > > > >> >>> > > > > > > >> >>> > > > > >> >>> > > >> >>> > >> >>> >> org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified >> >>> > > > > > > > value parameters keySerde, valueSerde. >> >>> > > > > > > > implicit val produced: Produced[Windowed[String], >> >>> Long] = >> >>> > > > > > > > Produced.`with`[Windowed[String], Long] >> >>> > > > > > > > >> >>> > > > > > > >> >>> > > > > > >> >>> > > > > >> >>> > > > >> >>> > > >> >>> > >> >>> >> >> >> >
