[ https://issues.apache.org/jira/browse/PIG-4232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14191227#comment-14191227 ]
liyunzhang_intel commented on PIG-4232: --------------------------------------- Hi [~praveenr019] I think that it is better not to change original code of POUserFunc and POForEach. As Rohini said {quote} Please do udfc.addJobConf(job); with the right configuration and call udfc.deserialize(); in your executor when it is initialized. {quote} I think that she meaned that we could call udfc.addJobConf(job),then call udfc.deserialize() in the executor of spark to avoid the problem UDFContext is not initialized correctly. *why cause problem?* 1. The problem is that {code} PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFun c#setFuncInputSchema(String) -> UDFContext#getUDFProperties(Class c) {code} is executed before {code} PigInputFormat#createRecordReader->PigInputFormat#passLoadSignature->Map RedUtil#setupUDFContext(conf) {code} In UDFContext#setupUDFContext, udfc.addJobConf and udfc.deserialize are called when udfc.isUDFConfEmpty returns true. If PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFun c#setFuncInputSchema(String)->UDFContext#getUDFProperties(Class c) is executed first, udfc.isUDFConfEmpty returns false and udfc.deserialize will not be called. {code} public static void setupUDFContext(Configuration job) throws IOException { UDFContext udfc = UDFContext.getUDFContext(); udfc.addJobConf(job); // don't deserialize in front-end if (udfc.isUDFConfEmpty()) { udfc.deserialize(); } } {code} *what i did in PIG-4232_1.patch?* 1. add a new class PigInputFormatSpark extends PigInputFormat 2. reset UDFContext in PigInputFormatSpark#resetUDFContext when PigInputFormatSpark#createRecordReader is called. When PigInputFormatSpark#createRecordReader->PigInputFormat#passLoadSignature->Map RedUtil#setupUDFContext(conf) is executed, udfc.isUDFConfEmpty returns true and udfc.addJobConf, udfc.deserialize are all called. I think this patch works as Rohini said and does not modify original code. 3. add following code in ForEachConverter.java and POUserFunc.java. {code} ForEachConverter.java @Override public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POForEach physicalOperator) { public class ForEachConverter implements POConverter<Tupl e, Tuple, POForEach> { public Iterable<Tuple> call(final Iterator<Tuple> input) { + PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps(); + if (planLeafOps != null) { + for (PhysicalOperator op : planLeafOps) { + if (op.getClass() == POUserFunc.class) { + POUserFunc udf = (POUserFunc) op; + udf.setFuncInputSchema(); + } + } + } {code} {code} POUserFunc.java + public void setFuncInputSchema(){ + setFuncInputSchema(signature); + } {code} add above code is to initialize the schema of POUserFunc correctly. Why it is not initialized correctly in previous code? In PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFunc#setFuncInputSchema(String). tmpS is null, this.func.setInputSchema(tmpS) will not be called. {code} POUserFunc#setFuncInputSchema public void setFuncInputSchema(String signature) { Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass()); Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature); if(tmpS!=null) { // tmpS is null when PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFunc#setFuncInputSchema(String) is called this.func.setInputSchema(tmpS); } } {code} 4. I guess that you may have a question about why POUserFunc#readObject will be executed first in spark mode. Because spark deserializes first( the detail stack trace you can see the attached pouserfunc.readObject.stacktrace) > UDFContext is not initialized in executors when running on Spark cluster > ------------------------------------------------------------------------ > > Key: PIG-4232 > URL: https://issues.apache.org/jira/browse/PIG-4232 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: Praveen Rachabattuni > Assignee: liyunzhang_intel > Attachments: PIG-4232.patch, PIG-4232_1.patch > > > UDFContext is used in lot of features across pig code base. For example its > used in PigStorage to pass columns information between the frontend and the > backend code. > https://github.com/apache/pig/blob/spark/src/org/apache/pig/builtin/PigStorage.java#L246-L247 -- This message was sent by Atlassian JIRA (v6.3.4#6332)