[
https://issues.apache.org/jira/browse/PIG-4232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14191227#comment-14191227
]
liyunzhang_intel commented on PIG-4232:
---------------------------------------
Hi [~praveenr019]
I think that it is better not to change original code of POUserFunc and
POForEach. As Rohini said
{quote}
Please do udfc.addJobConf(job); with the right configuration and call
udfc.deserialize(); in your executor when it is initialized.
{quote}
I think that she meaned that we could call udfc.addJobConf(job),then call
udfc.deserialize() in the executor of spark to avoid the problem UDFContext is
not initialized correctly.
*why cause problem?*
1. The problem is that
{code}
PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFun
c#setFuncInputSchema(String) -> UDFContext#getUDFProperties(Class c)
{code}
is executed before
{code}
PigInputFormat#createRecordReader->PigInputFormat#passLoadSignature->Map
RedUtil#setupUDFContext(conf)
{code}
In UDFContext#setupUDFContext, udfc.addJobConf and udfc.deserialize are called
when udfc.isUDFConfEmpty returns true. If
PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFun
c#setFuncInputSchema(String)->UDFContext#getUDFProperties(Class c) is executed
first, udfc.isUDFConfEmpty returns false and udfc.deserialize will not be
called.
{code}
public static void setupUDFContext(Configuration job) throws IOException {
UDFContext udfc = UDFContext.getUDFContext();
udfc.addJobConf(job);
// don't deserialize in front-end
if (udfc.isUDFConfEmpty()) {
udfc.deserialize();
}
}
{code}
*what i did in PIG-4232_1.patch?*
1. add a new class PigInputFormatSpark extends PigInputFormat
2. reset UDFContext in PigInputFormatSpark#resetUDFContext when
PigInputFormatSpark#createRecordReader is called. When
PigInputFormatSpark#createRecordReader->PigInputFormat#passLoadSignature->Map
RedUtil#setupUDFContext(conf) is executed, udfc.isUDFConfEmpty returns true
and udfc.addJobConf, udfc.deserialize are all called. I think this patch works
as Rohini said and does not modify original code.
3. add following code in ForEachConverter.java and POUserFunc.java.
{code}
ForEachConverter.java
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
POForEach physicalOperator) {
public class ForEachConverter implements POConverter<Tupl e,
Tuple, POForEach> {
public Iterable<Tuple> call(final Iterator<Tuple> input) {
+ PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps();
+ if (planLeafOps != null) {
+ for (PhysicalOperator op : planLeafOps) {
+ if (op.getClass() == POUserFunc.class) {
+ POUserFunc udf = (POUserFunc) op;
+ udf.setFuncInputSchema();
+ }
+ }
+ }
{code}
{code}
POUserFunc.java
+ public void setFuncInputSchema(){
+ setFuncInputSchema(signature);
+ }
{code}
add above code is to initialize the schema of POUserFunc correctly. Why it
is not initialized correctly in previous code? In
PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFunc#setFuncInputSchema(String).
tmpS is null, this.func.setInputSchema(tmpS) will not be called.
{code}
POUserFunc#setFuncInputSchema
public void setFuncInputSchema(String signature) {
Properties props =
UDFContext.getUDFContext().getUDFProperties(func.getClass());
Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
if(tmpS!=null) { // tmpS is null when
PoUserFunc#readObject->POUserFunc#instantiateFunc(FuncSpec)->POUserFunc#setFuncInputSchema(String)
is called
this.func.setInputSchema(tmpS);
}
}
{code}
4. I guess that you may have a question about why POUserFunc#readObject will
be executed first in spark mode. Because spark deserializes first( the detail
stack trace you can see the attached pouserfunc.readObject.stacktrace)
> UDFContext is not initialized in executors when running on Spark cluster
> ------------------------------------------------------------------------
>
> Key: PIG-4232
> URL: https://issues.apache.org/jira/browse/PIG-4232
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: Praveen Rachabattuni
> Assignee: liyunzhang_intel
> Attachments: PIG-4232.patch, PIG-4232_1.patch
>
>
> UDFContext is used in lot of features across pig code base. For example its
> used in PigStorage to pass columns information between the frontend and the
> backend code.
> https://github.com/apache/pig/blob/spark/src/org/apache/pig/builtin/PigStorage.java#L246-L247
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)