Looking at the other threads, I assume you solved this issue. The problem should have been that FlinkKafka09Consumer is not included in the flink-connector-kafka-0.11 module, because it is the connector for Kafka 0.9 and not Kafka 0.11.
Best, Fabian 2018-07-02 11:20 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: > Hi, > > I created a jar file with sbt with this sbt file > > cat md_streaming.sbt > name := "md_streaming" > version := "1.0" > scalaVersion := "2.11.8" > > libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.3" > libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6" > libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6" > libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6" > libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" > % "1.5.0" > libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" > % "1.5.0" > libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" > libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.5.0" > libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % > "1.5.0" > libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" > > and the Scala code is very basic > > import java.util.Properties > import java.util.Arrays > import org.apache.flink.api.common.functions.MapFunction > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.datastream.DataStream > import org.apache.flink.streaming.api.environment. > StreamExecutionEnvironment > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 > import org.apache.flink.streaming.util.serialization.SimpleStringSchema > import org.apache.flink.streaming.util.serialization.DeserializationSchema > import org.apache.flink.streaming.util.serialization.SimpleStringSchema > object md_streaming > { > def main(args: Array[String]) > { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val properties = new Properties() > properties.setProperty("bootstrap.servers", "rhes75:9092") > properties.setProperty("zookeeper.connect", "rhes75:2181") > properties.setProperty("group.id", "md_streaming") > val stream = env > .addSource(new FlinkKafkaConsumer09[String]("md", new > SimpleStringSchema(), properties)) > .writeAsText("/tmp/md_streaming.txt") > env.execute("Flink Kafka Example") > } > } > > It compiles OK as follows > > Compiling md_streaming > [info] Set current project to md_streaming (in build > file:/home/hduser/dba/bin/flink/md_streaming/) > [success] Total time: 0 s, completed Jul 2, 2018 10:16:05 AM > [info] Set current project to md_streaming (in build > file:/home/hduser/dba/bin/flink/md_streaming/) > [info] Updating {file:/home/hduser/dba/bin/flink/md_streaming/}md_ > streaming... > [info] Resolving jline#jline;2.12.1 ... > [info] Done updating. > [warn] Scala version was updated by one of library dependencies: > [warn] * org.scala-lang:scala-library:(2.11.8, 2.11.11, 2.11.6, 2.11.7) > -> 2.11.12 > [warn] To force scalaVersion, add the following: > [warn] ivyScala := ivyScala.value map { _.copy(overrideScalaVersion = > true) } > [warn] There may be incompatibilities among your library dependencies. > [warn] Here are some of the libraries that were evicted: > [warn] * org.apache.kafka:kafka_2.11:0.8.2.2 -> 1.1.0 > [warn] Run 'evicted' to see detailed eviction warnings > [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_ > streaming/target/scala-2.11/classes... > [warn] there was one deprecation warning; re-run with -deprecation for > details > [warn] one warning found > [info] Packaging /home/hduser/dba/bin/flink/md_ > streaming/target/scala-2.11/md_streaming_2.11-1.0.jar ... > [info] Done packaging. > [success] Total time: 3 s, completed Jul 2, 2018 10:16:10 AM > Completed compiling > > The content of jar file is as follows > jar tvf /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/ > md_streaming_2.11-1.0.jar > 277 Mon Jul 02 10:16:10 BST 2018 META-INF/MANIFEST.MF > 2003 Mon Jul 02 10:16:10 BST 2018 md_streaming$.class > 599 Mon Jul 02 10:16:10 BST 2018 md_streaming.class > > When I run it with flink run I get this error > > flink run /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/ > md_streaming_2.11-1.0.jar > Starting execution of program > java.lang.NoClassDefFoundError: org/apache/flink/streaming/ > connectors/kafka/FlinkKafkaConsumer09 > at md_streaming$.main(md_streaming.scala:22) > at md_streaming.main(md_streaming.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.client.program.PackagedProgram.callMainMethod( > PackagedProgram.java:528) > at org.apache.flink.client.program.PackagedProgram. > invokeInteractiveModeForExecution(PackagedProgram.java:420) > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:404) > at org.apache.flink.client.cli.CliFrontend.executeProgram( > CliFrontend.java:781) > at org.apache.flink.client.cli.CliFrontend.runProgram( > CliFrontend.java:275) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend. > java:210) > at org.apache.flink.client.cli.CliFrontend.parseParameters( > CliFrontend.java:1020) > at org.apache.flink.client.cli.CliFrontend.lambda$main$9( > CliFrontend.java:1096) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs( > UserGroupInformation.java:1836) > at org.apache.flink.runtime.security.HadoopSecurityContext. > runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend. > java:1096) > Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming. > connectors.kafka.FlinkKafkaConsumer09 > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > Do I need to pass additional classes? I suspect the jar file is not > complete! > > Thanks > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > >