Hi everyone,

I did not reenact it, but I think the problem here is rather the anonymous 
class. It looks like it is created within a class, not an object. Thus it is 
not “static” in Java terms, which means that also its surrounding class (the 
job class) will be serialized. And in this job class, there seems to be a 
DataSet field, that cannot be serialized.

If that really is the problem, you should either define your anonymous class 
within the companion object of your job class or resort directly to a function 
(and make sure that you do not pass a variable from your job class into the 
scope of the function).

Cheers,
Sebastian

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: Montag, 15. Juni 2015 14:16
To: user@flink.apache.org
Subject: Re: Random Selection


Hi Max,

the problem is that you’re trying to serialize the companion object of 
scala.util.Random. Try to create an instance of the scala.util.Random class and 
use this instance within your RIchFilterFunction to generate the random numbers.

Cheers,
Till

On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber 
alber.maximil...@gmail.com<http://mailto:alber.maximil...@gmail.com> wrote:
Hi Flinksters,
I would like to randomly choose a element of my data set. But somehow I cannot 
use scala.util inside my filter functions:

      val sample_x = X filter(new RichFilterFunction[Vector](){
        var i: Int = -1

        override def open(config: Configuration) = {
          i = scala.util.Random.nextInt(N)
        }
        def filter(a: Vector) = a.id<http://a.id> == i
      })
      val sample_y = Y filter(new RichFilterFunction[Vector](){
        def filter(a: Vector) = a.id<http://a.id> == 
scala.util.Random.nextInt(N)
      })
That's the error I get:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: An 
error occurred while translating the optimized plan to a nephele JobGraph: 
Error translating node 'Filter "Filter at 
Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties 
[partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, 
grouped=null, unique=null] ]]': Could not write the user code wrapper class 
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : 
java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
    at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
    at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
    at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at 
org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
    at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
    at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
    at 
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
    at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
    at 
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
    at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
    at Test$delayedInit$body.apply(test.scala:304)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$anonfun$main$1.apply(App.scala:71)
    at scala.App$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at Test$.main(test.scala:45)
    at Test.main(test.scala)
Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 
'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ 
GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties 
[ordering=null, grouped=null, unique=null] ]]': Could not write the user code 
wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper 
: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
    at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
    at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
    at 
org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137)
    at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427)
    ... 21 more
Caused by: 
org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could 
not write the user code wrapper class 
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : 
java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at 
org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803)
    at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305)
    ... 26 more
Caused by: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
    at 
org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
    at 
org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)

Did I miss something or it is simply not possible?
Thanks!
Cheers,
Max
​

Reply via email to