Hi,
I am new to Flink and Kafka

I am trying to read from Flink a Kafka topic and sent it back to another
Kafka topic

Here my setup:
Flink 0.10.1
Kafka 0.9

All that on a single node

I successfully wrote a Java program that send message to Kafka (topic =
demo), and I have a consumer (in a shell) that read it. so that working.

When I execute the flink program I got this error
See code and Error Below...it is something between step C & D

What I am doing wrong ?

Thanks

Sylvain

<Code start>
package com.sylvain;
import java.util.Properties;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * Skeleton for a Flink Job.
 *
 * For a full example of a Flink Job, see the WordCountJob.java file in the
 * same package/directory or have a look at the website.
 *
 * You can also generate a .jar file that you can submit on your Flink
 * cluster.
 * Just type
 *              mvn clean package
 * in the projects root directory.
 * You will find the jar in
 *              target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
 *
 */
public class Job {

        public static void main(String[] args) throws Exception {
                // set up the execution environment

                System.out.println("Step A");
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        
        System.out.println("Step B");
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");

        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");
        
        System.out.println("Step C");


                DataStream<String> messageStream = env.addSource(new
FlinkKafkaConsumer082<>("demo", new SimpleStringSchema(), properties));
                
                System.out.println("Step D");
                messageStream.map(new MapFunction<String, String>(){

                        @Override
                        public String map(String value) throws Exception {
                                // TODO Auto-generated method stub
                                return "Blablabla " +  value;
                        }
                        
                        
                }).addSink(new FlinkKafkaProducer("localhost:9092", "demo2", new
SimpleStringSchema()));
                
                
                System.out.println("Step E");
                env.execute();
                System.out.println("Step F");
                
                
        }
}
<Code end> 

<Error Start>
[shotte@localhost flink-kafka]$ flink run ./target/flink-kafka-0.1.jar 
Step A
Step B
Step C
java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
        at kafka.utils.Pool.<init>(Pool.scala:28)
        at
kafka.consumer.FetchRequestAndResponseStatsRegistry$.<init>(FetchRequestAndResponseStats.scala:60)
        at
kafka.consumer.FetchRequestAndResponseStatsRegistry$.<clinit>(FetchRequestAndResponseStats.scala)
        at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39)
        at
kafka.javaapi.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:34)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:691)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:281)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.<init>(FlinkKafkaConsumer082.java:49)
        at com.sylvain.Job.main(Job.java:64)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at
org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
Caused by: java.lang.ClassNotFoundException:
scala.collection.GenTraversableOnce$class
        at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 20 more

<Error End>






--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-tp4828.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to