Hi everyone, I did not reenact it, but I think the problem here is rather the anonymous class. It looks like it is created within a class, not an object. Thus it is not “static” in Java terms, which means that also its surrounding class (the job class) will be serialized. And in this job class, there seems to be a DataSet field, that cannot be serialized.
If that really is the problem, you should either define your anonymous class within the companion object of your job class or resort directly to a function (and make sure that you do not pass a variable from your job class into the scope of the function). Cheers, Sebastian From: Till Rohrmann [mailto:trohrm...@apache.org] Sent: Montag, 15. Juni 2015 14:16 To: user@flink.apache.org Subject: Re: Random Selection Hi Max, the problem is that you’re trying to serialize the companion object of scala.util.Random. Try to create an instance of the scala.util.Random class and use this instance within your RIchFilterFunction to generate the random numbers. Cheers, Till On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber alber.maximil...@gmail.com<http://mailto:alber.maximil...@gmail.com> wrote: Hi Flinksters, I would like to randomly choose a element of my data set. But somehow I cannot use scala.util inside my filter functions: val sample_x = X filter(new RichFilterFunction[Vector](){ var i: Int = -1 override def open(config: Configuration) = { i = scala.util.Random.nextInt(N) } def filter(a: Vector) = a.id<http://a.id> == i }) val sample_y = Y filter(new RichFilterFunction[Vector](){ def filter(a: Vector) = a.id<http://a.id> == scala.util.Random.nextInt(N) }) That's the error I get: Exception in thread "main" org.apache.flink.optimizer.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Error translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) at Test$delayedInit$body.apply(test.scala:304) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$anonfun$main$1.apply(App.scala:71) at scala.App$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at Test$.main(test.scala:45) at Test.main(test.scala) Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202) at org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427) ... 21 more Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305) ... 26 more Caused by: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314) at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268) at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) Did I miss something or it is simply not possible? Thanks! Cheers, Max