Hi,
I am trying to write a minimal Kafka consumer in Scala and got
this far:
➜ scala git:(kafka_exp_001) ✗ cat KafkaConsumer.scala
package io.allthingsdata.kafkaConsumer
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
import org.apache.flink.api.common.typeinfo._
//import org.apache.flink.streaming.api.windowing.time.Time
object KafkaConsumer {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val valueDeserializer = new SimpleStringSchema()
val props = new java.util.Properties()
// create a Kafka Consumer
val kafkaConsumer: FlinkKafkaConsumer082[String] =
new FlinkKafkaConsumer082(
"Topic1",
valueDeserializer,
props
)
// get input data
val messageStream: DataStream[String] = env.addSource(kafkaConsumer) //
supply typeInfo ?
// do something with it
val messages = messageStream.
map ( s => "Kafka and Flink say: $s" )
// execute and print result
messages.print()
}
}
/* based on this Java example code
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer082<>(
parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
Once a DataStream is created, you can transform it as you like. For
example, let us pad every word with a fixed prefix, and print to stdout:
messageStream
.rebalance()
.map ( s -> “Kafka and Flink says: ” + s)
.print();
*/
When trying to compile in sbt I get these error messages:
```
> compile
[info] Compiling 1 Scala source to
/Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes...
[error]
/Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28:
could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[String]
[error] val messageStream: DataStream[String] =
env.addSource(kafkaConsumer) // supply typeInfo ?
[error] ^
[error]
/Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32:
could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[String]
[error] map ( s => "Kafka and Flink say: $s" )
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 0 s, completed Dec 19, 2015 5:11:56 PM
```
When inspecting DataStreamSource addSource, I read:
/**
* Ads a data source with a custom type information thus opening a
* {@link DataStream}. Only in very special cases does the user need to
* support type information. Otherwise use
* {@link
#addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
*
I did try to supply a `BasicTypeInfo.STRING_TYPE_INFO` as typeInfo
argument, but that does not solve it.
When trying:
`val messageStream: DataStream[String] = env.addSource(kafkaConsumer,
BasicTypeInfo.STRING_TYPE_INFO) // supply typeInfo ?`
I get:
> compile
[info] Compiling 1 Scala source to
/Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes...
[error]
/Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28:
overloaded method value addSource with alternatives:
[error] [T](function:
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T]
=> Unit)(implicit evidence$17: scala.reflect.ClassTag[T], implicit
evidence$18:
org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
<and>
[error] [T](function:
org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit
evidence$15: scala.reflect.ClassTag[T], implicit evidence$16:
org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
[error] cannot be applied to
(org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082[String],
org.apache.flink.api.common.typeinfo.BasicTypeInfo[String])
[error] val messageStream: DataStream[String] =
env.addSource(kafkaConsumer, BasicTypeInfo.STRING_TYPE_INFO) // supply
typeInfo ?
[error] ^
[error]
/Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32:
could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[String]
[error] map ( s => "Kafka and Flink say: $s" )
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 0 s, completed Dec 20, 2015 3:13:04 PM
>
The sbt.build I am using is:
```
➜ flink-sbt git:(kafka_exp_001) ✗ cat build.sbt
val flink_scala_0_10_0 = "org.apache.flink" % "flink-scala" % "0.10.0"
val flink_clients_0_10_0 = "org.apache.flink" % "flink-clients" % "0.10.0"
val flink_streaming_scala_0_10_0 = "org.apache.flink" %
"flink-streaming-scala" % "0.10.0"
val flink_connector_kafka_0_10_0 = "org.apache.flink" %
"flink-connector-kafka" % "0.10.0"
lazy val commonSettings = Seq(
organization := "io.allthingsdata",
version := "0.1.0",
scalaVersion := "2.10.6"
)
lazy val root = (project in file(".")).
settings(commonSettings: _*).
settings(
name := "flink-sbt",
fork in run := true,
libraryDependencies += flink_scala_0_10_0,
libraryDependencies += flink_clients_0_10_0,
libraryDependencies += flink_streaming_scala_0_10_0,
libraryDependencies += flink_connector_kafka_0_10_0
)
```
Any hints would be very much appreciated at how to make this minimal
example work.
Many thanks :-)
Peter