[ 
https://issues.apache.org/jira/browse/PIG-4232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14176692#comment-14176692
 ] 

liyunzhang_intel commented on PIG-4232:
---------------------------------------

Scripting.pig
register '/home/zly/prj/oss/pig/bin/libexec/python/scriptingudf.py' using 
jython as myfuncs;
a = load '/user/pig/tests/data/singlefile/studenttab10k' using PigStorage() as 
(name, age:int, gpa:double);
b = foreach a generate age;
explain b;
store b into '/user/pig/out/root-1412926432-nightly.conf/Scripting_1.out';
*Scripting.pig successes in spark mode*


Scripting.udf.pig
register '/home/zly/prj/oss/pig/bin/libexec/python/scriptingudf.py' using 
jython as myfuncs;
a = load '/user/pig/tests/data/singlefile/studenttab10k' using PigStorage() as 
(name, age:int, gpa:double);
b = foreach a generate myfuncs.square(age);
explain b;
store b into '/user/pig/out/root-1412926432-nightly.conf/Scripting_1.out';
*this script fails in spark mode*

After debug, I found that UDFContext is not initialized in spark executors.
In 
https://github.com/apache/pig/blob/spark/src/org/apache/pig/builtin/PigStorage.java#L246-L247
{code}
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
mRequiredColumns = 
(boolean[])ObjectSerializer.deserialize(p.getProperty(signature));
{code}
When executing Scripting.pig, 
UDFContext.getUDFContext().getUDFProperties(this.getClass()) returns a property 
which contains info about PigStorage. After deserialization, variable 
"mRequiredColumns" is correctly initialized. 
Wehn executing Scripting.udf.pig, 
UDFContext.getUDFContext().getUDFProperties(this.getClass()) returns a property 
which does not contain info about PigStorage. After deserialization, variable 
"mRequiredColumns" is null.

*Where to setUDFContext?*
{code}
LoadConverter#convert   
-> SparkUtil#newJobConf
 public static JobConf newJobConf(PigContext pigContext) throws IOException {
        JobConf jobConf = new JobConf(
                ConfigurationUtil.toConfiguration(pigContext.getProperties()));
        jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext)); 
        UDFContext.getUDFContext().serialize(jobConf);//serialize all udf 
info(include PigStorage info) to jobConf
        jobConf.set("udf.import.list",
                ObjectSerializer.serialize(PigContext.getPackageImportList()));
        return jobConf;
    }

PigInputFormat#passLoadSignature
->MapRedUtil.setupUDFContext(conf);
->UDFContext.setUDFContext
public static void setupUDFContext(Configuration job) throws IOException {
        UDFContext udfc = UDFContext.getUDFContext();
        udfc.addJobConf(job);
        // don't deserialize in front-end
        if (udfc.isUDFConfEmpty()) {
            udfc.deserialize(); //UDFContext deserializes from jobConf.
        }
    }
{code}
In LoadConverter#convert, first serializes all udf info (include PigStorage) to 
jobConf. Then in PigInputFormat#passLoadSignature, UDFContext deserializes from 
jobConf.

*Why need serialization and deserialization?*
UDFContext#tss is a ThreadLocal variable, the value is different in different 
threads. LoadConverter#convert is executed in Thread-A and 
PigInputFormat#passLoadSignature is executed in Thread-B because spark send its 
task to executors to be executed. Actually PigInputFormat#passLoadSignature is 
executed in spark-executor(Thread-B).Serialization and deserialization is used 
to initialize the UDFContext’s value in different threads.
{code}
UDFContext.java
    private static ThreadLocal<UDFContext> tss = new ThreadLocal<UDFContext>() {
        @Override
        public UDFContext initialValue() {
            return new UDFContext();
        }
};

public static UDFContext getUDFContext() {
        UDFContext res= tss.get();
        return res;
    }
{code}

*Why has difference between situation with udf and without udf?*
With udf:
 Before PigInputFormat#passLoadSignature is executed, POUserFunc# 
setFuncInputSchema is executed.  In POUserFunc#setFuncInputSchema, 
UDFContext#udfConfs is put an entry(the key is POUserFunc, the value is an 
empty property  ). When PigInputFormat#passLoadSignature is executed, the 
condition to deserialize UDFContext( udfc.isUDFConfEmpty) can not be matched. 
Deserialization is not executed.
{code}
 PoUserFunc#readObject
->POUserFunc#instantiateFunc(FuncSpec)
->POUserFunc#setFuncInputSchema(String)
     public void setFuncInputSchema(String signature) {
        Properties props = 
UDFContext.getUDFContext().getUDFProperties(func.getClass());
        Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
        if(tmpS!=null) {
            this.func.setInputSchema(tmpS);
        }
    }
->UDFContext.java
  public Properties getUDFProperties(Class c) {
        UDFContextKey k = generateKey(c, null);
        Properties p = udfConfs.get(k);
        if (p == null) {
            p = new Properties(); // an empty property
            udfConfs.put(k, p); //UDFContext#udfConfs is put an entry with 
PoUserFunc info
        }
        return p;
    }
{code}
{code}
PigInputFormat#passLoadSignature
->MapRedUtil.setupUDFContext(conf);
->UDFContext.setUDFContext
public static void setupUDFContext(Configuration job) throws IOException {
        UDFContext udfc = UDFContext.getUDFContext();
        udfc.addJobConf(job);
        // don't deserialize in front-end
        if (udfc.isUDFConfEmpty()) { // the condition to deserialize UDFContext
            udfc.deserialize(); //UDFContext deserializes from jobConf.
        }
    }
{code}

*My fix*
Now my patch is following: When all properties in udfConfs in UDFContext are 
empty , isUDFConfEmpty also returns true;
{code}
   public boolean isUDFConfEmpty() {
-               return udfConfs.isEmpty();
+       // return udfConfs.isEmpty();
+        if( udfConfs.isEmpty()){
+            return true;
+        }else{
+            boolean res = true;
+            for(UDFContextKey udfContextKey:udfConfs.keySet()){  
+                if(!udfConfs.get(udfContextKey).isEmpty()){   
+                    res = false;
+                    break;
+                }
+            }
+            return res;
+        }
+       }
{code}

*My question :Is there any better way to fix this jira because my patch 
modifies UDFContext.java( a java file not in package 
org.apache.pig.backend.hadoop.executionengine.spark)?*


> UDFContext is not initialized in executors when running on Spark cluster
> ------------------------------------------------------------------------
>
>                 Key: PIG-4232
>                 URL: https://issues.apache.org/jira/browse/PIG-4232
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Praveen Rachabattuni
>            Assignee: liyunzhang_intel
>
> UDFContext is used in lot of features across pig code base. For example its 
> used in PigStorage to pass columns information between the frontend and the 
> backend code. 
> https://github.com/apache/pig/blob/spark/src/org/apache/pig/builtin/PigStorage.java#L246-L247



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to