[ 
https://issues.apache.org/jira/browse/PIG-4232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14191227#comment-14191227
 ] 

liyunzhang_intel commented on PIG-4232:
---------------------------------------

Hi [~praveenr019]
I think that it is better not to change original code of POUserFunc and 
POForEach. As Rohini said
{quote}
Please do udfc.addJobConf(job); with the right configuration and call 
udfc.deserialize(); in your executor when it is initialized.
{quote}
I think that she meaned that we could call udfc.addJobConf(job),then call 
udfc.deserialize() in the executor of spark to avoid the problem UDFContext is 
not initialized correctly.

*why cause problem?*
1. The problem is that 
{code}
PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFun 
c#setFuncInputSchema(String) -> UDFContext#getUDFProperties(Class c)
{code}
 is executed before
{code}
PigInputFormat#createRecordReader->PigInputFormat#passLoadSignature->Map 
RedUtil#setupUDFContext(conf)
{code}

In UDFContext#setupUDFContext, udfc.addJobConf and udfc.deserialize are called 
when udfc.isUDFConfEmpty returns true. If 
PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFun 
c#setFuncInputSchema(String)->UDFContext#getUDFProperties(Class c) is executed 
first, udfc.isUDFConfEmpty returns false and udfc.deserialize will not be 
called.
{code}
 public static void setupUDFContext(Configuration job) throws IOException {
        UDFContext udfc = UDFContext.getUDFContext();
        udfc.addJobConf(job);
        // don't deserialize in front-end
        if (udfc.isUDFConfEmpty()) {
            udfc.deserialize();
        }
    }
{code}


*what  i did in PIG-4232_1.patch?*
1. add a new class PigInputFormatSpark extends PigInputFormat 
2. reset UDFContext in PigInputFormatSpark#resetUDFContext when 
PigInputFormatSpark#createRecordReader is called. When 
PigInputFormatSpark#createRecordReader->PigInputFormat#passLoadSignature->Map 
RedUtil#setupUDFContext(conf) is executed,  udfc.isUDFConfEmpty returns true 
and  udfc.addJobConf, udfc.deserialize are all called. I think this patch works 
as Rohini said and does not modify original code.
3. add following code in ForEachConverter.java and POUserFunc.java.
{code}
 ForEachConverter.java 
  @Override
       public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
              POForEach physicalOperator) {
                public class ForEachConverter implements POConverter<Tupl    e, 
Tuple, POForEach> {
  
          public Iterable<Tuple> call(final Iterator<Tuple> input) {
  
        +     PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps();
        +    if (planLeafOps != null) {
        +         for (PhysicalOperator op : planLeafOps) {
        +            if (op.getClass() == POUserFunc.class) {
        +                 POUserFunc udf = (POUserFunc) op;
        +                 udf.setFuncInputSchema();
        +             }
        +         }
        +     }
 
{code}

{code}
    POUserFunc.java
        +  public void setFuncInputSchema(){
        +         setFuncInputSchema(signature);
        +   }
{code}
   add above code is to initialize the schema of POUserFunc correctly. Why it 
is not initialized correctly in previous code?  In 
PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFunc#setFuncInputSchema(String).
 tmpS is null,   this.func.setInputSchema(tmpS) will not be called.
{code}
   POUserFunc#setFuncInputSchema
   public void setFuncInputSchema(String signature) {
        Properties props = 
UDFContext.getUDFContext().getUDFProperties(func.getClass());
        Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
        if(tmpS!=null) {  // tmpS is null when 
PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFunc#setFuncInputSchema(String)
 is called
            this.func.setInputSchema(tmpS);
        }
    } 
{code}
4. I guess that  you may have a question about why POUserFunc#readObject will 
be executed first in spark mode. Because spark deserializes first( the detail 
stack trace you can see the attached pouserfunc.readObject.stacktrace)
   


> UDFContext is not initialized in executors when running on Spark cluster
> ------------------------------------------------------------------------
>
>                 Key: PIG-4232
>                 URL: https://issues.apache.org/jira/browse/PIG-4232
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Praveen Rachabattuni
>            Assignee: liyunzhang_intel
>         Attachments: PIG-4232.patch, PIG-4232_1.patch
>
>
> UDFContext is used in lot of features across pig code base. For example its 
> used in PigStorage to pass columns information between the frontend and the 
> backend code. 
> https://github.com/apache/pig/blob/spark/src/org/apache/pig/builtin/PigStorage.java#L246-L247



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to