FYI, my goal was to get it to compile, since that is where implicit resolution takes place. Obviously, you would need to plug in the properties, start the stream, add a shutdown hook, etc.
On Fri, Nov 20, 2020 at 12:56 AM Daniel Hinojosa < dhinoj...@evolutionnext.com> wrote: > Done. I am using SBT for the build tool. I created an application and > put it here > https://github.com/dhinojosa/kafka-streams-scala > > The implicits for SessionWindowedSerde from this line below. The reason is > I am turning off the timeWindowSerde, and that just leaves the > SessionWindowSerde, the last underscore is to include the rest from this > package. > > import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _} > > Everything should be in the project, it is now a 2.11.12 and Kafka 2.4.0. > The only thing you may want to change is the SBT version in > project/build.properties. :) > > > On Thu, Nov 19, 2020 at 10:06 PM Eric Beabes <mailinglist...@gmail.com> > wrote: > >> Daniel, >> >> I copied your code from here: >> https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735 >> >> Still getting the same error message. Actually, I am not sure why it would >> work since you don't have implicits for SessionWindowedSerde. By the way, >> which versions of Scala & Kafka did you use? Maybe that's the issue. I am >> using following versions: >> >> *Scala: 2.11.12* >> *Kafka: 2.4.0* >> >> Can you please try with these versions & let me know if it still works for >> you? Thanks for your help. >> >> >> >> >> >> >> On Thu, Nov 19, 2020 at 5:16 PM Liam Clarke-Hutchinson < >> liam.cla...@adscale.co.nz> wrote: >> >> > That said John, nothing wrong with being explicit in code. :) >> > >> > On Fri, 20 Nov. 2020, 1:46 pm John Roesler, <vvcep...@apache.org> >> wrote: >> > >> > > Oh, nice. Thanks, Daniel! >> > > >> > > That’s much nicer than my ham-handed approach. >> > > >> > > Thanks, >> > > John >> > > >> > > On Thu, Nov 19, 2020, at 17:44, Daniel Hinojosa wrote: >> > > > Hope this helps, I tried copying your code into a sample >> application. I >> > > got >> > > > it to compile with the implicits all resolving. I think the trick >> was >> > > there >> > > > were two implementations for Windowing Serdes. You just need to >> block >> > > one >> > > > from the imports. See if that fits with what you are doing. Oh >> also, >> > I >> > > > noticed that the types were not resolving when calling >> builder.stream, >> > > so I >> > > > put [String, String] in the builder. Here is a gist, which formats >> > > better. >> > > > >> > > > https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735 >> > > > >> > > > import java.time.Duration >> > > > import java.util.Properties >> > > > >> > > > import org.apache.kafka.common.config.Config >> > > > import org.apache.kafka.streams.Topology >> > > > import org.apache.kafka.streams.kstream.{SessionWindows, Windowed} >> > > > import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => >> _, >> > _} >> > > > import org.apache.kafka.streams.scala.kstream.{Consumed, Grouped, >> > > > Materialized, Produced} >> > > > import org.apache.kafka.streams.scala.{ByteArraySessionStore, >> > > StreamsBuilder} >> > > > >> > > > class SampleStream { >> > > > def createTopology(conf: Config, properties: Properties): >> Topology = >> > { >> > > > >> > > > implicit val produced: Produced[Windowed[String], Long] = >> > > > Produced.`with`[Windowed[String], Long] >> > > > >> > > > implicit val grouped: Grouped[String, String] = >> > > > Grouped.`with`[String, String] >> > > > >> > > > implicit val consumed: Consumed[String, String] = >> > > > Consumed.`with`[String, String] >> > > > >> > > > implicit val materialized: Materialized[String, Long, >> > > > ByteArraySessionStore] = Materialized.`with`[String, Long, >> > > > ByteArraySessionStore] >> > > > >> > > > val builder: StreamsBuilder = new StreamsBuilder() >> > > > >> > > > builder >> > > > .stream[String, String]("streams-plaintext-input") >> > > > .groupBy((_, word) => word) >> > > > .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * >> 1000))) >> > > > .count() >> > > > .toStream >> > > > .to("streams-pipe-output") >> > > > >> > > > builder.build() >> > > > } >> > > > } >> > > > >> > > > >> > > > >> > > > >> > > > On Thu, Nov 19, 2020 at 7:24 AM John Roesler <vvcep...@apache.org> >> > > wrote: >> > > > >> > > > > Hi Eric, >> > > > > >> > > > > Sure thing. Assuming the definition of ‘produced’ you had tried in >> > your >> > > > > code, it’s just: >> > > > > >> > > > > ... >> > > > > .toStream.to("streams-pipe-output")(produced) >> > > > > >> > > > > As far as the json serde goes, I think that I wrote an example of >> > using >> > > > > Jackson to implement a serde in Confluent’s kafka-streams-examples >> > > repo. >> > > > > I’m not sure what other/better examples >> > > > > might be out there. >> > > > > >> > > > > Hope this helps, >> > > > > John >> > > > > >> > > > > On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote: >> > > > > > Not sure what you mean by "pass it explicitly". The definition >> of >> > > 'to' is >> > > > > > given below. Can we pass it explicitly in this case. If yes, can >> > you >> > > > > please >> > > > > > show me how? >> > > > > > >> > > > > > def to(topic: String)(implicit produced: Produced[K, V]): Unit = >> > > > > > inner.to(topic, produced) >> > > > > > >> > > > > > >> > > > > > Also not sure how to use a self documenting format like JSON. >> Any >> > > > > > examples to share? >> > > > > > >> > > > > > >> > > > > > On Wed, Nov 18, 2020 at 5:14 PM John Roesler < >> vvcep...@apache.org> >> > > > > wrote: >> > > > > > >> > > > > > > Hi Eric, >> > > > > > > >> > > > > > > Ah, that’s a bummer. The correct serde is the session windowed >> > > serde, >> > > > > as I >> > > > > > > can see you know. I’m afraid I’m a bit rusty on implicit >> > resolution >> > > > > rules, >> > > > > > > so I can’t be much help there. >> > > > > > > >> > > > > > > But my general recommendation for implicits is that when >> things >> > get >> > > > > weird, >> > > > > > > just don’t use them at all. For example, you can just >> explicitly >> > > pass >> > > > > the >> > > > > > > Produced in the second arg list of ‘to’. >> > > > > > > >> > > > > > > One other tip is that the serialized form produced by those >> > serdes >> > > is >> > > > > kind >> > > > > > > of specialized and might not be the most convenient for your >> use. >> > > If >> > > > > this >> > > > > > > is just a POC, if suggest mapping the keys to strings, so they >> > are >> > > > > > > human-readable. If this is a production use case, then you >> might >> > > want >> > > > > to >> > > > > > > use a more self-documenting format like JSON or AVRO. Just my >> two >> > > > > cents. >> > > > > > > >> > > > > > > I hope this helps! >> > > > > > > -John >> > > > > > > >> > > > > > > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote: >> > > > > > > > I keep getting '*ambiguous implicit values*' message in the >> > > following >> > > > > > > code. >> > > > > > > > I tried several things (as can be seen from a couple of >> lines >> > > I've >> > > > > > > > commented out). Any ideas on how to fix this? This is in >> > *Scala*. >> > > > > > > > >> > > > > > > > def createTopology(conf: Config, properties: Properties): >> > > Topology = >> > > > > > > > {// implicit val sessionSerde = >> > > > > > > > Serde[WindowedSerdes.SessionWindowedSerde[String]]// >> > implicit >> > > val >> > > > > > > > produced: Produced[Windowed[String], Long] = >> > > > > > > > Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], >> > > Long] >> > > > > > > > implicit val produced: Produced[Windowed[String], Long] >> = >> > > > > > > > Produced.`with`[Windowed[String], Long] >> > > > > > > > implicit val consumed: Consumed[String, String] = >> > > > > > > > Consumed.`with`[String, String] >> > > > > > > > >> > > > > > > > val builder: StreamsBuilder = new StreamsBuilder() >> > > > > > > > builder.stream("streams-plaintext-input") >> > > > > > > > .groupBy((_, word) => word) >> > > > > > > > >> .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 >> > * >> > > > > 1000))) >> > > > > > > > .count() >> > > > > > > > .toStream.to("streams-pipe-output") >> > > > > > > > >> > > > > > > > builder.build() >> > > > > > > > >> > > > > > > > } >> > > > > > > > >> > > > > > > > *Compiler Errors:* >> > > > > > > > >> > > > > > > > Error:(52, 78) ambiguous implicit values: >> > > > > > > > both method timeWindowedSerde in object Serdes of type >> > > [T](implicit >> > > > > > > > tSerde: >> > > > > > > > >> > > > > > > >> > > > > >> > > >> > >> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T] >> > > > > > > > and method sessionWindowedSerde in object Serdes of type >> > > > > [T](implicit >> > > > > > > > tSerde: >> > > > > > > > >> > > > > > > >> > > > > >> > > >> > >> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T] >> > > > > > > > match expected type >> > > > > > > > >> > > > > > > >> > > > > >> > > >> > >> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]] >> > > > > > > > implicit val produced: Produced[Windowed[String], Long] >> = >> > > > > > > > Produced.`with`[Windowed[String], Long] >> > > > > > > > Error:(52, 78) could not find implicit value for parameter >> > > keySerde: >> > > > > > > > >> > > > > > > >> > > > > >> > > >> > >> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]] >> > > > > > > > implicit val produced: Produced[Windowed[String], Long] >> = >> > > > > > > > Produced.`with`[Windowed[String], Long] >> > > > > > > > Error:(52, 78) not enough arguments for method with: >> (implicit >> > > > > > > > keySerde: >> > > > > > > > >> > > > > > > >> > > > > >> > > >> > >> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]], >> > > > > > > > implicit valueSerde: >> > > > > > > > >> > > > > > > >> > > > > >> > > >> > >> org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified >> > > > > > > > value parameters keySerde, valueSerde. >> > > > > > > > implicit val produced: Produced[Windowed[String], Long] >> = >> > > > > > > > Produced.`with`[Windowed[String], Long] >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >