Hi,

why can't I register the stream as a table and get a MalformedClassName
exception?

val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
    new FlinkKafkaConsumer(
      "tweets-raw-json",
      serializer,
      properties
    ).setStartFromEarliest() // TODO experiment with different start values
  )

case class Foo(lang: String, count: Int)
val r = stream
    .map(e => {
      Foo(e.get("value").get("lang").asText(), 1)
    })
    .keyBy(_.lang)
    .timeWindow(Time.seconds(10))
    .sum("count")
r.print()
stenv.registerDataStream("tweets_json", r)

Best,
Georg

Reply via email to