Actually, the closure cleaner is supposed to take care of the "anonymous inner class" situation.
Did you deactivate that one, by any chance? On Mon, Jun 15, 2015 at 5:31 PM, Maximilian Alber < [email protected]> wrote: > Hi everyone! > Thanks! It seems the variable that makes the problems. Making an inner > class solved the issue. > Cheers, > Max > > On Mon, Jun 15, 2015 at 2:58 PM, Kruse, Sebastian <[email protected]> > wrote: > >> Hi everyone, >> >> >> >> I did not reenact it, but I think the problem here is rather the >> anonymous class. It looks like it is created within a class, not an object. >> Thus it is not “static” in Java terms, which means that also its >> surrounding class (the job class) will be serialized. And in this job >> class, there seems to be a DataSet field, that cannot be serialized. >> >> >> >> If that really is the problem, you should either define your anonymous >> class within the companion object of your job class or resort directly to a >> function (and make sure that you do not pass a variable from your job class >> into the scope of the function). >> >> >> >> Cheers, >> >> Sebastian >> >> >> >> *From:* Till Rohrmann [mailto:[email protected]] >> *Sent:* Montag, 15. Juni 2015 14:16 >> *To:* [email protected] >> *Subject:* Re: Random Selection >> >> >> >> Hi Max, >> >> the problem is that you’re trying to serialize the companion object of >> scala.util.Random. Try to create an instance of the scala.util.Random >> class and use this instance within your RIchFilterFunction to generate >> the random numbers. >> >> Cheers, >> Till >> >> On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber >> [email protected] <http://mailto:[email protected]> >> wrote: >> >> Hi Flinksters, >> >> I would like to randomly choose a element of my data set. But somehow I >> cannot use scala.util inside my filter functions: >> >> val sample_x = X filter(new RichFilterFunction[Vector](){ >> var i: Int = -1 >> >> override def open(config: Configuration) = { >> i = scala.util.Random.nextInt(N) >> } >> def filter(a: Vector) = a.id == i >> }) >> val sample_y = Y filter(new RichFilterFunction[Vector](){ >> def filter(a: Vector) = a.id == scala.util.Random.nextInt(N) >> }) >> >> That's the error I get: >> >> Exception in thread "main" org.apache.flink.optimizer.CompilerException: >> An error occurred while translating the optimized plan to a nephele >> JobGraph: Error translating node 'Filter "Filter at >> Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties >> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, >> grouped=null, unique=null] ]]': Could not write the user code wrapper class >> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >> java.io.NotSerializableException: org.apache.flink.api.scala.DataSet >> at >> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578) >> at >> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103) >> at >> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205) >> at >> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >> at >> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >> at >> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) >> at >> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) >> at >> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176) >> at >> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) >> at >> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) >> at >> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) >> at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) >> at Test$delayedInit$body.apply(test.scala:304) >> at scala.Function0$class.apply$mcV$sp(Function0.scala:40) >> at >> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) >> at scala.App$anonfun$main$1.apply(App.scala:71) >> at scala.App$anonfun$main$1.apply(App.scala:71) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at >> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) >> at scala.App$class.main(App.scala:71) >> at Test$.main(test.scala:45) >> at Test.main(test.scala) >> Caused by: org.apache.flink.optimizer.CompilerException: Error >> translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" >> : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ >> LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not >> write the user code wrapper class >> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >> java.io.NotSerializableException: org.apache.flink.api.scala.DataSet >> at >> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360) >> at >> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103) >> at >> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) >> at >> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202) >> at >> org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137) >> at >> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427) >> ... 21 more >> Caused by: >> org.apache.flink.runtime.operators.util.CorruptConfigurationException: >> Could not write the user code wrapper class >> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >> java.io.NotSerializableException: org.apache.flink.api.scala.DataSet >> at >> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) >> at >> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803) >> at >> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305) >> ... 26 more >> Caused by: java.io.NotSerializableException: >> org.apache.flink.api.scala.DataSet >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >> at >> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314) >> at >> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268) >> at >> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) >> >> Did I miss something or it is simply not possible? >> >> Thanks! >> >> Cheers, >> >> Max >> >> >> > >
