This is unrelated to joda time or Kryo, that's just an info message in the
log.

What version of Flink and Kafka are you using?



On Wed, 1 Jun 2016 at 07:02 arpit srivastava <arpit8...@gmail.com> wrote:

> Flink uses kryo serialization which doesn't support joda time object
> serialization.
>
> Use java.util.date or you have to change kryo.
>
> Thanks,
> Arpit
>
> On Tue, May 31, 2016 at 11:18 PM, ahmad Sa P <aspp...@gmail.com> wrote:
>
>> Hi
>> I have a problem at running a sample code from the hands-in examples of
>> Apache Flink,
>> I used the  following code to send output of a stream to already running
>> Apache Kafka, and get the below error. Could anyone tell me what is going
>> wrong?
>>
>> Best regards
>> Ahmad
>>
>> public class RideCleansing {
>>
>>     private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
>>     public static final String CLEANSED_RIDES_TOPIC = "mytopic";
>>
>>
>>     public static void main(String[] args) throws Exception {
>>
>>         StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>         DataStream<TaxiRide> rides = env.addSource(new 
>> TaxiRideGenerator("C://data/nycTaxiRides.gz", 1000.0f));
>>
>>         DataStream<TaxiRide> filteredRides = rides.filter(new NYCFilter());
>>
>>         filteredRides.addSink(new FlinkKafkaProducer<>(LOCAL_KAFKA_BROKER,
>>                 CLEANSED_RIDES_TOPIC,
>>                 new TaxiRideSchema()));
>>
>>         env.execute("Taxi Ride Cleansing");
>>     }
>>
>> Error:
>> 18:43:15,734 INFO  org.apache.flink.api.java.typeutils.TypeExtractor        
>> - class org.joda.time.DateTime is not a valid POJO type
>> Exception in thread "main" java.lang.NoClassDefFoundError: 
>> kafka/producer/Partitioner
>>     at java.lang.ClassLoader.defineClass1(Native Method)
>>     at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
>>     at 
>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>     at java.net.URLClassLoader.defineClass(URLClassLoader.java:455)
>>     at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:367)
>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>>     at java.security.AccessController.doPrivileged(Native Method)
>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>     at 
>> com.dataArtisans.flinkTraining.exercises.dataStreamJava.rideCleansing.RideCleansing.main(RideCleansing.java:51)
>> Caused by: java.lang.ClassNotFoundException: kafka.producer.Partitioner
>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>>     at java.security.AccessController.doPrivileged(Native Method)
>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>     ... 13 more
>>
>>
>

Reply via email to