Fábio Dias created FLINK-5755: --------------------------------- Summary: Flink with Kafka connection Key: FLINK-5755 URL: https://issues.apache.org/jira/browse/FLINK-5755 Project: Flink Issue Type: Bug Components: DataStream API, Kafka Connector Affects Versions: 1.1.3 Environment: Ubuntu 16.04.1 LTS Flink 1.1.3 Kakfa 0.10.1.1 Reporter: Fábio Dias
I'm trying to connect flink with kafka (Flink 1.1.3 Kakfa 0.10.1.1) I already try all the fixes that i could find, but none of them work. pom.xml : <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ux</groupId> <artifactId>logs</artifactId> <version>1.3-SNAPSHOT</version> <packaging>jar</packaging> <name>Flink Quickstart Job</name> <url>http://www.myorganization.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>${project.version}</flink.version> <slf4j.version>1.7.7</slf4j.version> <log4j.version>1.2.17</log4j.version> </properties> <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.10</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.10</artifactId> <version>1.3-SNAPSHOT</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> </dependencies> <profiles> <profile> <id>build-jar</id> <activation> <activeByDefault>false</activeByDefault> </activation> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.10</artifactId> <version>1.3-SNAPSHOT</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes combine.self="override"></excludes> </artifactSet> </configuration> </execution> </executions> </plugin> </plugins> </build> </profile> </profiles> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project> my java code : import java.util.Properties; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; public class App { public static void main(String[] args) throws Exception { System.out.println("Hello World!"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "flink_consumer"); DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<> ("ux_logs", new SimpleStringSchema(), properties)); messageStream.rebalance().map(new MapFunction<String, String>() { private static final long serialVersionUID = -6867736771747690202L; public String map(String value) throws Exception { return "Kafka and Flink says: " + value; } }).print(); env.execute(); } } And when i compile it, i get the following error: java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010 at ux.App.main(App.java:28) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) Do i need to remove my kafka, and run a older version? Thanks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)