Hi Tarandeep,

the exception suggests that Flink tries to serialize RecordsFilterer as a
user function (this happens via Java Serialization).
I said suggests because the code that uses RecordsFilterer is not included.

To me it looks like RecordsFilterer should not be used as a user function.
It is a helper class to construct a DataSet program, so it should not be
shipped for execution.
You would use such a class as follows:

DataSet<T> records = ...
DataSet<String> filterIDs = ...

RecordsFilterer rf = new RecordsFilterer();
DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, filterIDs,
"myField");

Regarding the join code, I would suggest an optimization.
Instead of using CoGroup, I would use distinct and an OuterJoin like this:

DataSet<String> distIds = filtereredIds.distinct();
DataSet<Tuple2<Boolean, T> result = records
  .leftOuterJoin(distIds)
  .where(KEYSELECTOR)
  .equalTo("*") // use full string as key
  .with(JOINFUNC) // set Bool to false if right == null, true otherwise

Best, Fabian

2016-06-09 2:28 GMT+02:00 Tarandeep Singh <tarand...@gmail.com>:

> Hi,
>
> I am getting NoSerializableException in this class-
>
> 
>
> public class RecordsFilterer<T extends GenericRecord> {
>
>     public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, 
> DataSet<String> filteredIds, String fieldName) {
>         return dataset.coGroup(filteredIds)
>                 .where(new KeySelector<T, String>() {
>                     @Override
>                     public String getKey(T t) throws Exception {
>                         String s = (String) t.get(fieldName);
>                         return s != null ? s : UUID.randomUUID().toString();
>                     }
>                 })
>                 .equalTo((KeySelector<String, String>) s -> s)
>                 .with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() {
>                     @Override
>                     public void coGroup(Iterable<T> records, Iterable<String> 
> ids,
>                                         Collector<Tuple2<Boolean,T>> 
> collector) throws Exception {
>                         boolean filterFlag = false;
>                         for (String id : ids) {
>                             filterFlag = true;
>                         }
>
>                         for (T record : records) {
>                             collector.collect(new Tuple2<>(filterFlag, 
> record));
>                         }
>                     }
>                 });
>
>     }
> }
>
>
> What I am trying to do is write a generic code that will join Avro records
> (of different types) with String records and there is a match add a filter
> flag. This way I can use the same code for different Avro record types. But
> I am getting this exception-
>
> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
> Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties
> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
> grouped=null, unique=null] ]]': Could not write the user code wrapper class
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> java.io.NotSerializableException:
> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
>     at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
>     at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
>     at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
>     at
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>     at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>     at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>     at
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
>     at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
>     at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
>     at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
>     at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>     at
> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
>     at
> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:497)
>     at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> Could not write the user code wrapper class
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> java.io.NotSerializableException: RecordsFilterer
>     at
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
>     at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843)
>     at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331)
>     ... 17 more
> Caused by: java.io.NotSerializableException: RecordsFilterer
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>     at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
>     at
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
>     at
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
>     ... 19 more
>
>
> Please help me understand why I get this exception and how to fix it
> [rewrite code may be?]
>
> Thanks,
> Tarandeep
>
>
>

Reply via email to