Hi Rohini:
   We met some problem when fixing jira 
PIG-4232(https://issues.apache.org/jira/browse/PIG-4232): UDFContext is not 
initialized in executors when running on Spark cluster.
Now patch PIG-4232_4.patch contains some modification out of 
org.apache.pig.backend.hadoop.executionengine.spark package.

  POUserFunc.java
        +  public void setFuncInputSchema(){
        +         setFuncInputSchema(signature);
        +   }

Why need add setFuncInputSchema in POUserFunc.java?
The reason why add setFuncInputSchema is to call setFuncInputSchema(String 
signature) in spark. In previous code,
POUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFunc#setFuncInputSchema(String)->
 EvalFunc#setInputSchema is called when variable "tmpS" is not null. But in 
spark mode, tmpS is null which means UDFContext is not initialized by 
configuration file when POUserFunc#readObject is called.


POUserFunc#setFuncInputSchema

   public void setFuncInputSchema(String signature) {

        Properties props = 
UDFContext.getUDFContext().getUDFProperties(func.getClass());

        Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);

        if(tmpS!=null) {  // tmpS is null when 
PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFunc#setFuncInputSchema(String)
 is called

            this.func.setInputSchema(tmpS);

        }

    }

Where first initialize UDFContext?

 
PigInputFormat#passLoadSignature->MapRedUtil.setupUDFContext(conf)->UDFContext.setUDFContext
public static void setupUDFContext(Configuration job) throws IOException {
        UDFContext udfc = UDFContext.getUDFContext();
        udfc.addJobConf(job);
       // don't deserialize in front-end
        if (udfc.isUDFConfEmpty()) {
            udfc.deserialize(); //UDFContext deserializes from jobConf.
        }
    }

Why POUserFunc#readObject is called firstly, MapRedUtil.setupUDFContext(conf) 
is called later in spark mode while in mapreduce mode, the sequence is 
different?


In Mapreduce mode:

PigGenericMapBase.java

  public void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);

        Configuration job = context.getConfiguration();
        SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
        PigMapReduce.sJobContext = context;
        PigMapReduce.sJobConfInternal.set(context.getConfiguration());
        PigMapReduce.sJobConf = context.getConfiguration();
        inIllustrator = inIllustrator(context);

        
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
        pigContext = 
(PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));

        // This attempts to fetch all of the generated code from the 
distributed cache, and resolve it
        SchemaTupleBackend.initialize(job, pigContext);

        if (pigContext.getLog4jProperties()!=null)
            PropertyConfigurator.configure(pigContext.getLog4jProperties());

        if (mp == null)
            mp = (PhysicalPlan) ObjectSerializer.deserialize(
                job.get("pig.mapPlan"));    // here POUserFunc#readObject will 
be called.  So MapRedUtil#setupUDFContext is called first then 
POUserFunc#readObject is called.
        stores = PlanHelper.getPhysicalOperators(mp, POStore.class);
....

}

In Spark mode:
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)->org.apache.spark.scheduler.Task.run(Task.scala:54)->org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)->
 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)->
 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62).
 Spark first deserializes all objects which causes POUserFunc#readObject is 
called first.

So my modification are:

1.    Add a new class PigInputFormatSpark extends PigInputFormat and in 
PigInputFormatSpark#createRecordReader, reset UDFContext and it makes 
UDFContext will be initialized by configuration file later(change in spark 
package, Praveen think ok)

2.    Add a function "setFuncInputSchema" in POUserFunc(change not in spark 
package,Praveen think not ok) and some code in ForEachConverter.java(change in 
spark package, Praveen think ok). These code made EvalFunc#setInputSchema 
called and EvalFunc is given its input schema.  Praveen worried about 
POUserFunc#setFuncInputSchema(change not in spark package) is suitable or not 
because I changed original code.



I think this new add function is only used by spark engine and will not 
influence others execution engines like mapreduce and tez like 
POUserFunc#setFuncSpec is only used in tez code.

If you think it is not suitable. I have another way which code maybe some 
confusing but need not change original code

In ForEachConverter.java call POUserFunc#setFuncSpec(funcSpec), then 
EvalFunc#setInputSchema is called in this function

public void setFuncSpec(FuncSpec funcSpec) {

        this.funcSpec = funcSpec;

        instantiateFunc(funcSpec);

    }
ForEachConverter.java
public Iterable<Tuple> call(final Iterator<Tuple> input) {
            initializeJobConf();
            PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps();
            if (planLeafOps != null) {
                for (PhysicalOperator op : planLeafOps) {
                    if (op.getClass() == POUserFunc.class) {
                        POUserFunc udf = (POUserFunc) op;
                        FuncSpec funcSpec = udf.getFuncSpec(); // these two 
line code may be confusing but need not change original code
                        udf.setFuncSpec(funcSpec); //

                    }
                }
            }

I provide two ways to fix the problem. First need add a function to original 
code but code not confusing Second does not change original code but code 
confusing. Which one is better?
Look forward your answer. If anyone have better idea, contact me. Very thanks.



Best Regards
Zhang,Liyun

Reply via email to