Fábio Dias created FLINK-5755:
---------------------------------

             Summary: Flink with Kafka connection
                 Key: FLINK-5755
                 URL: https://issues.apache.org/jira/browse/FLINK-5755
             Project: Flink
          Issue Type: Bug
          Components: DataStream API, Kafka Connector
    Affects Versions: 1.1.3
         Environment: Ubuntu 16.04.1 LTS
Flink 1.1.3
Kakfa 0.10.1.1
            Reporter: Fábio Dias


I'm trying to connect flink with kafka (Flink 1.1.3 Kakfa 0.10.1.1)

I already try all the fixes that i could find, but none of them work.

pom.xml :

<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
        <modelVersion>4.0.0</modelVersion>
        
        <groupId>ux</groupId>
        <artifactId>logs</artifactId>
        <version>1.3-SNAPSHOT</version>
        <packaging>jar</packaging>

        <name>Flink Quickstart Job</name>
        <url>http://www.myorganization.org</url>

        <properties>
                
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                <flink.version>${project.version}</flink.version>
                <slf4j.version>1.7.7</slf4j.version>
                <log4j.version>1.2.17</log4j.version>
        </properties>

        <repositories>
                <repository>
                        <id>apache.snapshots</id>
                        <name>Apache Development Snapshot Repository</name>
                        
<url>https://repository.apache.org/content/repositories/snapshots/</url>
                        <releases>
                                <enabled>false</enabled>
                        </releases>
                        <snapshots>
                                <enabled>true</enabled>
                        </snapshots>
                </repository>
        </repositories>
        <dependencies>
<dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
                <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java_2.10</artifactId>
                  <version>${project.version}</version>
                </dependency>
                <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-java</artifactId>
                  <version>${project.version}</version>
                </dependency>
                <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-clients_2.10</artifactId>
                  <version>${project.version}</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
                        <version>1.3-SNAPSHOT</version>
                </dependency>
                <dependency>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                        <version>${slf4j.version}</version>
                </dependency>
                <dependency>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                        <version>${log4j.version}</version>
                </dependency>
                
        </dependencies>

        <profiles>
                <profile>
                        <id>build-jar</id>
                        <activation>
                                <activeByDefault>false</activeByDefault>
                        </activation>
                        <dependencies>
                                <dependency>
                                        <groupId>org.apache.flink</groupId>
                                        <artifactId>flink-java</artifactId>
                                        <version>${project.version}</version>
                                        <scope>provided</scope>
                                </dependency>
                                <dependency>
                                        <groupId>org.apache.flink</groupId>
                                        
<artifactId>flink-streaming-java_2.10</artifactId>
                                        <version>${project.version}</version>
                                        <scope>provided</scope>
                                </dependency>
                                <dependency>
                                        <groupId>org.apache.flink</groupId>
                                        
<artifactId>flink-clients_2.10</artifactId>
                                        <version>1.3-SNAPSHOT</version>
                                        <scope>provided</scope>
                                </dependency>
                                <dependency>
                                        <groupId>org.slf4j</groupId>
                                        <artifactId>slf4j-log4j12</artifactId>
                                        <version>${slf4j.version}</version>
                                        <scope>provided</scope>
                                </dependency>
                                <dependency>
                                        <groupId>log4j</groupId>
                                        <artifactId>log4j</artifactId>
                                        <version>${log4j.version}</version>
                                        <scope>provided</scope>
                                </dependency>
                        </dependencies>

                        <build>
                                <plugins>
                                        <plugin>
                                                
<groupId>org.apache.maven.plugins</groupId>
                                                
<artifactId>maven-shade-plugin</artifactId>
                                                <version>2.4.1</version>
                                                <executions>
                                                        <execution>
                                                                
<phase>package</phase>
                                                                <goals>
                                                                        
<goal>shade</goal>
                                                                </goals>
                                                                <configuration>
                                                                        
<artifactSet>
                                                                                
<excludes combine.self="override"></excludes>
                                                                        
</artifactSet>
                                                                </configuration>
                                                        </execution>
                                                </executions>
                                        </plugin>
                                </plugins>
                        </build>
                </profile>
        </profiles>

        <build>
                <plugins>
                        <plugin>
                                <groupId>org.apache.maven.plugins</groupId>
                                <artifactId>maven-compiler-plugin</artifactId>
                                <version>3.1</version>
                                <configuration>
                                        <source>1.8</source>
                                        <target>1.8</target>
                                </configuration>
                        </plugin>
                </plugins>
        </build>
</project>

my java code : 

import java.util.Properties;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class App 
{
    public static void main(String[] args) throws Exception {
                
                System.out.println("Hello World!");
    
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

                Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9092");
            properties.setProperty("zookeeper.connect", "localhost:2181");
            properties.setProperty("group.id", "flink_consumer");

                DataStream<String> messageStream = env.addSource(new 
FlinkKafkaConsumer010<>
                        ("ux_logs", new SimpleStringSchema(), properties));

                messageStream.rebalance().map(new MapFunction<String, String>() 
{

                        private static final long serialVersionUID = 
-6867736771747690202L;

                        public String map(String value) throws Exception {
                                return "Kafka and Flink says: " + value;
                        }
                }).print();
                
            env.execute();
    }
}

And when i compile it, i get the following error:

java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010
        at ux.App.main(App.java:28)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320)
        at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
        at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

Do i need to remove my kafka, and run a older version?

Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to