Hi Tarandeep, the exception suggests that Flink tries to serialize RecordsFilterer as a user function (this happens via Java Serialization). I said suggests because the code that uses RecordsFilterer is not included.
To me it looks like RecordsFilterer should not be used as a user function. It is a helper class to construct a DataSet program, so it should not be shipped for execution. You would use such a class as follows: DataSet<T> records = ... DataSet<String> filterIDs = ... RecordsFilterer rf = new RecordsFilterer(); DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, filterIDs, "myField"); Regarding the join code, I would suggest an optimization. Instead of using CoGroup, I would use distinct and an OuterJoin like this: DataSet<String> distIds = filtereredIds.distinct(); DataSet<Tuple2<Boolean, T> result = records .leftOuterJoin(distIds) .where(KEYSELECTOR) .equalTo("*") // use full string as key .with(JOINFUNC) // set Bool to false if right == null, true otherwise Best, Fabian 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <tarand...@gmail.com>: > Hi, > > I am getting NoSerializableException in this class- > > > > public class RecordsFilterer<T extends GenericRecord> { > > public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, > DataSet<String> filteredIds, String fieldName) { > return dataset.coGroup(filteredIds) > .where(new KeySelector<T, String>() { > @Override > public String getKey(T t) throws Exception { > String s = (String) t.get(fieldName); > return s != null ? s : UUID.randomUUID().toString(); > } > }) > .equalTo((KeySelector<String, String>) s -> s) > .with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() { > @Override > public void coGroup(Iterable<T> records, Iterable<String> > ids, > Collector<Tuple2<Boolean,T>> > collector) throws Exception { > boolean filterFlag = false; > for (String id : ids) { > filterFlag = true; > } > > for (T record : records) { > collector.collect(new Tuple2<>(filterFlag, > record)); > } > } > }); > > } > } > > > What I am trying to do is write a generic code that will join Avro records > (of different types) with String records and there is a match add a filter > flag. This way I can use the same code for different Avro record types. But > I am getting this exception- > > Exception in thread "main" org.apache.flink.optimizer.CompilerException: > Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties > [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, > grouped=null, unique=null] ]]': Could not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: > com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188) > at > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187) > at > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) > at > com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57) > at > com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: > Could not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: RecordsFilterer > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331) > ... 17 more > Caused by: java.io.NotSerializableException: RecordsFilterer > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) > at > org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252) > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) > ... 19 more > > > Please help me understand why I get this exception and how to fix it > [rewrite code may be?] > > Thanks, > Tarandeep > > >