[ https://issues.apache.org/jira/browse/PIG-4232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14176692#comment-14176692 ]
liyunzhang_intel commented on PIG-4232: --------------------------------------- Scripting.pig register '/home/zly/prj/oss/pig/bin/libexec/python/scriptingudf.py' using jython as myfuncs; a = load '/user/pig/tests/data/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double); b = foreach a generate age; explain b; store b into '/user/pig/out/root-1412926432-nightly.conf/Scripting_1.out'; *Scripting.pig successes in spark mode* Scripting.udf.pig register '/home/zly/prj/oss/pig/bin/libexec/python/scriptingudf.py' using jython as myfuncs; a = load '/user/pig/tests/data/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double); b = foreach a generate myfuncs.square(age); explain b; store b into '/user/pig/out/root-1412926432-nightly.conf/Scripting_1.out'; *this script fails in spark mode* After debug, I found that UDFContext is not initialized in spark executors. In https://github.com/apache/pig/blob/spark/src/org/apache/pig/builtin/PigStorage.java#L246-L247 {code} Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass()); mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty(signature)); {code} When executing Scripting.pig, UDFContext.getUDFContext().getUDFProperties(this.getClass()) returns a property which contains info about PigStorage. After deserialization, variable "mRequiredColumns" is correctly initialized. Wehn executing Scripting.udf.pig, UDFContext.getUDFContext().getUDFProperties(this.getClass()) returns a property which does not contain info about PigStorage. After deserialization, variable "mRequiredColumns" is null. *Where to setUDFContext?* {code} LoadConverter#convert -> SparkUtil#newJobConf public static JobConf newJobConf(PigContext pigContext) throws IOException { JobConf jobConf = new JobConf( ConfigurationUtil.toConfiguration(pigContext.getProperties())); jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext)); UDFContext.getUDFContext().serialize(jobConf);//serialize all udf info(include PigStorage info) to jobConf jobConf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList())); return jobConf; } PigInputFormat#passLoadSignature ->MapRedUtil.setupUDFContext(conf); ->UDFContext.setUDFContext public static void setupUDFContext(Configuration job) throws IOException { UDFContext udfc = UDFContext.getUDFContext(); udfc.addJobConf(job); // don't deserialize in front-end if (udfc.isUDFConfEmpty()) { udfc.deserialize(); //UDFContext deserializes from jobConf. } } {code} In LoadConverter#convert, first serializes all udf info (include PigStorage) to jobConf. Then in PigInputFormat#passLoadSignature, UDFContext deserializes from jobConf. *Why need serialization and deserialization?* UDFContext#tss is a ThreadLocal variable, the value is different in different threads. LoadConverter#convert is executed in Thread-A and PigInputFormat#passLoadSignature is executed in Thread-B because spark send its task to executors to be executed. Actually PigInputFormat#passLoadSignature is executed in spark-executor(Thread-B).Serialization and deserialization is used to initialize the UDFContext’s value in different threads. {code} UDFContext.java private static ThreadLocal<UDFContext> tss = new ThreadLocal<UDFContext>() { @Override public UDFContext initialValue() { return new UDFContext(); } }; public static UDFContext getUDFContext() { UDFContext res= tss.get(); return res; } {code} *Why has difference between situation with udf and without udf?* With udf: Before PigInputFormat#passLoadSignature is executed, POUserFunc# setFuncInputSchema is executed. In POUserFunc#setFuncInputSchema, UDFContext#udfConfs is put an entry(the key is POUserFunc, the value is an empty property ). When PigInputFormat#passLoadSignature is executed, the condition to deserialize UDFContext( udfc.isUDFConfEmpty) can not be matched. Deserialization is not executed. {code} PoUserFunc#readObject ->POUserFunc#instantiateFunc(FuncSpec) ->POUserFunc#setFuncInputSchema(String) public void setFuncInputSchema(String signature) { Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass()); Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature); if(tmpS!=null) { this.func.setInputSchema(tmpS); } } ->UDFContext.java public Properties getUDFProperties(Class c) { UDFContextKey k = generateKey(c, null); Properties p = udfConfs.get(k); if (p == null) { p = new Properties(); // an empty property udfConfs.put(k, p); //UDFContext#udfConfs is put an entry with PoUserFunc info } return p; } {code} {code} PigInputFormat#passLoadSignature ->MapRedUtil.setupUDFContext(conf); ->UDFContext.setUDFContext public static void setupUDFContext(Configuration job) throws IOException { UDFContext udfc = UDFContext.getUDFContext(); udfc.addJobConf(job); // don't deserialize in front-end if (udfc.isUDFConfEmpty()) { // the condition to deserialize UDFContext udfc.deserialize(); //UDFContext deserializes from jobConf. } } {code} *My fix* Now my patch is following: When all properties in udfConfs in UDFContext are empty , isUDFConfEmpty also returns true; {code} public boolean isUDFConfEmpty() { - return udfConfs.isEmpty(); + // return udfConfs.isEmpty(); + if( udfConfs.isEmpty()){ + return true; + }else{ + boolean res = true; + for(UDFContextKey udfContextKey:udfConfs.keySet()){ + if(!udfConfs.get(udfContextKey).isEmpty()){ + res = false; + break; + } + } + return res; + } + } {code} *My question :Is there any better way to fix this jira because my patch modifies UDFContext.java( a java file not in package org.apache.pig.backend.hadoop.executionengine.spark)?* > UDFContext is not initialized in executors when running on Spark cluster > ------------------------------------------------------------------------ > > Key: PIG-4232 > URL: https://issues.apache.org/jira/browse/PIG-4232 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: Praveen Rachabattuni > Assignee: liyunzhang_intel > > UDFContext is used in lot of features across pig code base. For example its > used in PigStorage to pass columns information between the frontend and the > backend code. > https://github.com/apache/pig/blob/spark/src/org/apache/pig/builtin/PigStorage.java#L246-L247 -- This message was sent by Atlassian JIRA (v6.3.4#6332)