[
https://issues.apache.org/jira/browse/FLINK-5755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15950744#comment-15950744
]
Tzu-Li (Gordon) Tai commented on FLINK-5755:
--------------------------------------------
Hi [~fddias],
are you still having problems with this? It seems like you just don't have the
right dependencies, or perhaps you've built Flink with the wrong version.
Anyway, I recommend to post your question on the mailing lists
([email protected]), that's where people ask questions. The JIRA is used to
file issues.
If you still are having problems with the packaging, feel free to CC me
([email protected]) in your question on the mailing list so that I can take a
look. Closing this JIRA ticket now.
> Flink with Kafka connection
> ---------------------------
>
> Key: FLINK-5755
> URL: https://issues.apache.org/jira/browse/FLINK-5755
> Project: Flink
> Issue Type: Bug
> Components: DataStream API, Kafka Connector
> Affects Versions: 1.1.3
> Environment: Ubuntu 16.04.1 LTS
> Flink 1.1.3
> Kakfa 0.10.1.1
> Reporter: Fábio Dias
>
> I'm trying to connect flink with kafka (Flink 1.1.3 Kakfa 0.10.1.1)
> I already try all the fixes that i could find, but none of them work.
> pom.xml :
> <project xmlns="http://maven.apache.org/POM/4.0.0"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd">
> <modelVersion>4.0.0</modelVersion>
>
> <groupId>ux</groupId>
> <artifactId>logs</artifactId>
> <version>1.3-SNAPSHOT</version>
> <packaging>jar</packaging>
> <name>Flink Quickstart Job</name>
> <url>http://www.myorganization.org</url>
> <properties>
>
> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
> <flink.version>${project.version}</flink.version>
> <slf4j.version>1.7.7</slf4j.version>
> <log4j.version>1.2.17</log4j.version>
> </properties>
> <repositories>
> <repository>
> <id>apache.snapshots</id>
> <name>Apache Development Snapshot Repository</name>
>
> <url>https://repository.apache.org/content/repositories/snapshots/</url>
> <releases>
> <enabled>false</enabled>
> </releases>
> <snapshots>
> <enabled>true</enabled>
> </snapshots>
> </repository>
> </repositories>
> <dependencies>
> <dependency>
> <groupId>junit</groupId>
> <artifactId>junit</artifactId>
> <version>3.8.1</version>
> <scope>test</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-java_2.10</artifactId>
> <version>${project.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-java</artifactId>
> <version>${project.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-clients_2.10</artifactId>
> <version>${project.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
> <version>1.3-SNAPSHOT</version>
> </dependency>
> <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-log4j12</artifactId>
> <version>${slf4j.version}</version>
> </dependency>
> <dependency>
> <groupId>log4j</groupId>
> <artifactId>log4j</artifactId>
> <version>${log4j.version}</version>
> </dependency>
>
> </dependencies>
> <profiles>
> <profile>
> <id>build-jar</id>
> <activation>
> <activeByDefault>false</activeByDefault>
> </activation>
> <dependencies>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-java</artifactId>
> <version>${project.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-streaming-java_2.10</artifactId>
> <version>${project.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-clients_2.10</artifactId>
> <version>1.3-SNAPSHOT</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-log4j12</artifactId>
> <version>${slf4j.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>log4j</groupId>
> <artifactId>log4j</artifactId>
> <version>${log4j.version}</version>
> <scope>provided</scope>
> </dependency>
> </dependencies>
> <build>
> <plugins>
> <plugin>
>
> <groupId>org.apache.maven.plugins</groupId>
>
> <artifactId>maven-shade-plugin</artifactId>
> <version>2.4.1</version>
> <executions>
> <execution>
>
> <phase>package</phase>
> <goals>
>
> <goal>shade</goal>
> </goals>
> <configuration>
>
> <artifactSet>
>
> <excludes combine.self="override"></excludes>
>
> </artifactSet>
> </configuration>
> </execution>
> </executions>
> </plugin>
> </plugins>
> </build>
> </profile>
> </profiles>
> <build>
> <plugins>
> <plugin>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-compiler-plugin</artifactId>
> <version>3.1</version>
> <configuration>
> <source>1.8</source>
> <target>1.8</target>
> </configuration>
> </plugin>
> </plugins>
> </build>
> </project>
> my java code :
> import java.util.Properties;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> public class App
> {
> public static void main(String[] args) throws Exception {
>
> System.out.println("Hello World!");
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("zookeeper.connect", "localhost:2181");
> properties.setProperty("group.id", "flink_consumer");
> DataStream<String> messageStream = env.addSource(new
> FlinkKafkaConsumer010<>
> ("ux_logs", new SimpleStringSchema(), properties));
> messageStream.rebalance().map(new MapFunction<String, String>()
> {
> private static final long serialVersionUID =
> -6867736771747690202L;
> public String map(String value) throws Exception {
> return "Kafka and Flink says: " + value;
> }
> }).print();
>
> env.execute();
> }
> }
> And when i compile it, i get the following error:
> java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010
> at ux.App.main(App.java:28)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320)
> at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> Do i need to remove my kafka, and run a older version?
> Thanks.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)