Hi Rohini: We met some problem when fixing jira PIG-4232(https://issues.apache.org/jira/browse/PIG-4232): UDFContext is not initialized in executors when running on Spark cluster. Now patch PIG-4232_4.patch contains some modification out of org.apache.pig.backend.hadoop.executionengine.spark package.
POUserFunc.java + public void setFuncInputSchema(){ + setFuncInputSchema(signature); + } Why need add setFuncInputSchema in POUserFunc.java? The reason why add setFuncInputSchema is to call setFuncInputSchema(String signature) in spark. In previous code, POUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFunc#setFuncInputSchema(String)-> EvalFunc#setInputSchema is called when variable "tmpS" is not null. But in spark mode, tmpS is null which means UDFContext is not initialized by configuration file when POUserFunc#readObject is called. POUserFunc#setFuncInputSchema public void setFuncInputSchema(String signature) { Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass()); Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature); if(tmpS!=null) { // tmpS is null when PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFunc#setFuncInputSchema(String) is called this.func.setInputSchema(tmpS); } } Where first initialize UDFContext? PigInputFormat#passLoadSignature->MapRedUtil.setupUDFContext(conf)->UDFContext.setUDFContext public static void setupUDFContext(Configuration job) throws IOException { UDFContext udfc = UDFContext.getUDFContext(); udfc.addJobConf(job); // don't deserialize in front-end if (udfc.isUDFConfEmpty()) { udfc.deserialize(); //UDFContext deserializes from jobConf. } } Why POUserFunc#readObject is called firstly, MapRedUtil.setupUDFContext(conf) is called later in spark mode while in mapreduce mode, the sequence is different? In Mapreduce mode: PigGenericMapBase.java public void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration job = context.getConfiguration(); SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job)); PigMapReduce.sJobContext = context; PigMapReduce.sJobConfInternal.set(context.getConfiguration()); PigMapReduce.sJobConf = context.getConfiguration(); inIllustrator = inIllustrator(context); PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list"))); pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext")); // This attempts to fetch all of the generated code from the distributed cache, and resolve it SchemaTupleBackend.initialize(job, pigContext); if (pigContext.getLog4jProperties()!=null) PropertyConfigurator.configure(pigContext.getLog4jProperties()); if (mp == null) mp = (PhysicalPlan) ObjectSerializer.deserialize( job.get("pig.mapPlan")); // here POUserFunc#readObject will be called. So MapRedUtil#setupUDFContext is called first then POUserFunc#readObject is called. stores = PlanHelper.getPhysicalOperators(mp, POStore.class); .... } In Spark mode: org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)->org.apache.spark.scheduler.Task.run(Task.scala:54)->org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)-> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)-> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62). Spark first deserializes all objects which causes POUserFunc#readObject is called first. So my modification are: 1. Add a new class PigInputFormatSpark extends PigInputFormat and in PigInputFormatSpark#createRecordReader, reset UDFContext and it makes UDFContext will be initialized by configuration file later(change in spark package, Praveen think ok) 2. Add a function "setFuncInputSchema" in POUserFunc(change not in spark package,Praveen think not ok) and some code in ForEachConverter.java(change in spark package, Praveen think ok). These code made EvalFunc#setInputSchema called and EvalFunc is given its input schema. Praveen worried about POUserFunc#setFuncInputSchema(change not in spark package) is suitable or not because I changed original code. I think this new add function is only used by spark engine and will not influence others execution engines like mapreduce and tez like POUserFunc#setFuncSpec is only used in tez code. If you think it is not suitable. I have another way which code maybe some confusing but need not change original code In ForEachConverter.java call POUserFunc#setFuncSpec(funcSpec), then EvalFunc#setInputSchema is called in this function public void setFuncSpec(FuncSpec funcSpec) { this.funcSpec = funcSpec; instantiateFunc(funcSpec); } ForEachConverter.java public Iterable<Tuple> call(final Iterator<Tuple> input) { initializeJobConf(); PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps(); if (planLeafOps != null) { for (PhysicalOperator op : planLeafOps) { if (op.getClass() == POUserFunc.class) { POUserFunc udf = (POUserFunc) op; FuncSpec funcSpec = udf.getFuncSpec(); // these two line code may be confusing but need not change original code udf.setFuncSpec(funcSpec); // } } } I provide two ways to fix the problem. First need add a function to original code but code not confusing Second does not change original code but code confusing. Which one is better? Look forward your answer. If anyone have better idea, contact me. Very thanks. Best Regards Zhang,Liyun