Hi Flinksters,

I would like to randomly choose a element of my data set. But somehow I
cannot use scala.util inside my filter functions:

      val sample_x = X filter(new RichFilterFunction[Vector](){
        var i: Int = -1

        override def open(config: Configuration) = {
          i = scala.util.Random.nextInt(N)
        }
        def filter(a: Vector) = a.id == i
      })
      val sample_y = Y filter(new RichFilterFunction[Vector](){
        def filter(a: Vector) = a.id == scala.util.Random.nextInt(N)
      })

That's the error I get:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: An
error occurred while translating the optimized plan to a nephele JobGraph:
Error translating node 'Filter "Filter at
Test$$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties
[partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
grouped=null, unique=null] ]]': Could not write the user code wrapper class
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
    at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
    at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
    at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at
org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
    at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
    at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
    at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
    at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
    at
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
    at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
    at Test$delayedInit$body.apply(test.scala:304)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at Test$.main(test.scala:45)
    at Test.main(test.scala)
Caused by: org.apache.flink.optimizer.CompilerException: Error translating
node 'Filter "Filter at Test$$anonfun$10.apply(test.scala:276)" : FLAT_MAP
[[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
[ordering=null, grouped=null, unique=null] ]]': Could not write the user
code wrapper class
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
    at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
    at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
    at
org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137)
    at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427)
    ... 21 more
Caused by:
org.apache.flink.runtime.operators.util.CorruptConfigurationException:
Could not write the user code wrapper class
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
    at
org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803)
    at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305)
    ... 26 more
Caused by: java.io.NotSerializableException:
org.apache.flink.api.scala.DataSet
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
    at
org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
    at
org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)


Did I miss something or it is simply not possible?
Thanks!
Cheers,
Max

Reply via email to