Flink uses kryo serialization which doesn't support joda time object
serialization.

Use java.util.date or you have to change kryo.

Thanks,
Arpit

On Tue, May 31, 2016 at 11:18 PM, ahmad Sa P <aspp...@gmail.com> wrote:

> Hi
> I have a problem at running a sample code from the hands-in examples of
> Apache Flink,
> I used the  following code to send output of a stream to already running
> Apache Kafka, and get the below error. Could anyone tell me what is going
> wrong?
>
> Best regards
> Ahmad
>
> public class RideCleansing {
>
>     private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
>     public static final String CLEANSED_RIDES_TOPIC = "mytopic";
>
>
>     public static void main(String[] args) throws Exception {
>
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         DataStream<TaxiRide> rides = env.addSource(new 
> TaxiRideGenerator("C://data/nycTaxiRides.gz", 1000.0f));
>
>         DataStream<TaxiRide> filteredRides = rides.filter(new NYCFilter());
>
>         filteredRides.addSink(new FlinkKafkaProducer<>(LOCAL_KAFKA_BROKER,
>                 CLEANSED_RIDES_TOPIC,
>                 new TaxiRideSchema()));
>
>         env.execute("Taxi Ride Cleansing");
>     }
>
> Error:
> 18:43:15,734 INFO  org.apache.flink.api.java.typeutils.TypeExtractor        - 
> class org.joda.time.DateTime is not a valid POJO type
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> kafka/producer/Partitioner
>     at java.lang.ClassLoader.defineClass1(Native Method)
>     at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
>     at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>     at java.net.URLClassLoader.defineClass(URLClassLoader.java:455)
>     at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:367)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     at 
> com.dataArtisans.flinkTraining.exercises.dataStreamJava.rideCleansing.RideCleansing.main(RideCleansing.java:51)
> Caused by: java.lang.ClassNotFoundException: kafka.producer.Partitioner
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     ... 13 more
>
>

Reply via email to