Hi Rohini:
We met some problem when fixing jira
PIG-4232(https://issues.apache.org/jira/browse/PIG-4232): UDFContext is not
initialized in executors when running on Spark cluster.
Now patch PIG-4232_4.patch contains some modification out of
org.apache.pig.backend.hadoop.executionengine.spark package.
POUserFunc.java
+ public void setFuncInputSchema(){
+ setFuncInputSchema(signature);
+ }
Why need add setFuncInputSchema in POUserFunc.java?
The reason why add setFuncInputSchema is to call setFuncInputSchema(String
signature) in spark. In previous code,
POUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFunc#setFuncInputSchema(String)->
EvalFunc#setInputSchema is called when variable "tmpS" is not null. But in
spark mode, tmpS is null which means UDFContext is not initialized by
configuration file when POUserFunc#readObject is called.
POUserFunc#setFuncInputSchema
public void setFuncInputSchema(String signature) {
Properties props =
UDFContext.getUDFContext().getUDFProperties(func.getClass());
Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
if(tmpS!=null) { // tmpS is null when
PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFunc#setFuncInputSchema(String)
is called
this.func.setInputSchema(tmpS);
}
}
Where first initialize UDFContext?
PigInputFormat#passLoadSignature->MapRedUtil.setupUDFContext(conf)->UDFContext.setUDFContext
public static void setupUDFContext(Configuration job) throws IOException {
UDFContext udfc = UDFContext.getUDFContext();
udfc.addJobConf(job);
// don't deserialize in front-end
if (udfc.isUDFConfEmpty()) {
udfc.deserialize(); //UDFContext deserializes from jobConf.
}
}
Why POUserFunc#readObject is called firstly, MapRedUtil.setupUDFContext(conf)
is called later in spark mode while in mapreduce mode, the sequence is
different?
In Mapreduce mode:
PigGenericMapBase.java
public void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration job = context.getConfiguration();
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
PigMapReduce.sJobContext = context;
PigMapReduce.sJobConfInternal.set(context.getConfiguration());
PigMapReduce.sJobConf = context.getConfiguration();
inIllustrator = inIllustrator(context);
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
pigContext =
(PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
// This attempts to fetch all of the generated code from the
distributed cache, and resolve it
SchemaTupleBackend.initialize(job, pigContext);
if (pigContext.getLog4jProperties()!=null)
PropertyConfigurator.configure(pigContext.getLog4jProperties());
if (mp == null)
mp = (PhysicalPlan) ObjectSerializer.deserialize(
job.get("pig.mapPlan")); // here POUserFunc#readObject will
be called. So MapRedUtil#setupUDFContext is called first then
POUserFunc#readObject is called.
stores = PlanHelper.getPhysicalOperators(mp, POStore.class);
....
}
In Spark mode:
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)->org.apache.spark.scheduler.Task.run(Task.scala:54)->org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)->
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)->
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62).
Spark first deserializes all objects which causes POUserFunc#readObject is
called first.
So my modification are:
1. Add a new class PigInputFormatSpark extends PigInputFormat and in
PigInputFormatSpark#createRecordReader, reset UDFContext and it makes
UDFContext will be initialized by configuration file later(change in spark
package, Praveen think ok)
2. Add a function "setFuncInputSchema" in POUserFunc(change not in spark
package,Praveen think not ok) and some code in ForEachConverter.java(change in
spark package, Praveen think ok). These code made EvalFunc#setInputSchema
called and EvalFunc is given its input schema. Praveen worried about
POUserFunc#setFuncInputSchema(change not in spark package) is suitable or not
because I changed original code.
I think this new add function is only used by spark engine and will not
influence others execution engines like mapreduce and tez like
POUserFunc#setFuncSpec is only used in tez code.
If you think it is not suitable. I have another way which code maybe some
confusing but need not change original code
In ForEachConverter.java call POUserFunc#setFuncSpec(funcSpec), then
EvalFunc#setInputSchema is called in this function
public void setFuncSpec(FuncSpec funcSpec) {
this.funcSpec = funcSpec;
instantiateFunc(funcSpec);
}
ForEachConverter.java
public Iterable<Tuple> call(final Iterator<Tuple> input) {
initializeJobConf();
PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps();
if (planLeafOps != null) {
for (PhysicalOperator op : planLeafOps) {
if (op.getClass() == POUserFunc.class) {
POUserFunc udf = (POUserFunc) op;
FuncSpec funcSpec = udf.getFuncSpec(); // these two
line code may be confusing but need not change original code
udf.setFuncSpec(funcSpec); //
}
}
}
I provide two ways to fix the problem. First need add a function to original
code but code not confusing Second does not change original code but code
confusing. Which one is better?
Look forward your answer. If anyone have better idea, contact me. Very thanks.
Best Regards
Zhang,Liyun