[ 
https://issues.apache.org/jira/browse/PIG-4611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14611792#comment-14611792
 ] 

liyunzhang_intel commented on PIG-4611:
---------------------------------------

[~mohitsabharwal]:  with my workaround , HBaseStorage will not always use the 
default caster(ie. Utf8StorageConverter).
when you debug the code as following steps, you will find the following 
stacktrace.
1. append conf/pig.properties:
pig.hbase.caster=HBaseBinaryConverter

2. debug the code:
in thread 'Spark executor', it first serializes all object. At first time, 
[{{defaultCaster}}|https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java#L304]
 is default value:Utf8StorageConverter because 
UDFContext.getUDFContext().getClientSystemProps() is null.
  {code}
at 
org.apache.pig.backend.hadoop.hbase.HBaseStorage.<init>(HBaseStorage.java:307)
          at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(NativeConstructorAccessorImpl.java:-1)
          at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
          at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
          at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
          at 
org.apache.pig.impl.PigContext.instantiateFuncFromSpec(PigContext.java:746)
          at 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast.instantiateFunc(POCast.java:86)
          at 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast.readObject(POCast.java:1993)
          at 
sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
          at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
          at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:606)
          at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
          at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
  {code}

after that,  UDFContext#deserialize will be called in following 
stacktrace(PigInputFormatSpark.createRecordReader->PigInputFormat.createRecordReader->MapRedUtil.setupUDFContext->UDFContext.deserialize)
 and UDFContext will be initialized at that point
{code}
                  at 
org.apache.pig.impl.util.UDFContext.deserialize(UDFContext.java:213)
                  at 
org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil.setupUDFContext(MapRedUtil.java:176)
                  at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat.createRecordReader(PigInputFormat.java:96)
                  at 
org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark.createRecordReader(PigInputFormatSpark.java:41)
                  at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:131)
                  at 
org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
                  at 
org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
                  at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
                  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
                  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
                  at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
                  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
                  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
                  at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
                  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
                  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
                  at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
                  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
                  at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
                  at org.apache.spark.scheduler.Task.run(Task.scala:64)
                  at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
                  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
                  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
                  at java.lang.Thread.run(Thread.java:744)
{code}

later, when 
PigInputFormat.createRecordReader->PigInputFormat.getLoadFunc->PigContext.instantiateFuncFromSpec->HBaseStorage.<init>
 is called.At that point, 
[{{defaultCaster}}|https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java#L304]
 is HBaseBinaryConverter (this value  is what you set in pig.properties).
{code}
  at 
org.apache.pig.backend.hadoop.hbase.HBaseStorage.<init>(HBaseStorage.java:305)
          at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(NativeConstructorAccessorImpl.java:-1)
          at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
          at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
          at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
          at 
org.apache.pig.impl.PigContext.instantiateFuncFromSpec(PigContext.java:746)
          at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat.getLoadFunc(PigInputFormat.java:149)
          at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat.createRecordReader(PigInputFormat.java:97)
          at 
org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark.createRecordReader(PigInputFormatSpark.java:41)
          at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:131)
          at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
          at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
          at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
          at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
          at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
          at org.apache.spark.scheduler.Task.run(Task.scala:64)
          at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
          at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
          at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
          at java.lang.Thread.run(Thread.java:744)
{code}


When you debug pig-hbase script, you need deploy hbase environment. It seems 
that it is hard to debug TestHBaseStorage code in unit test environment (ant 
-Dtestcase=$unittest  -Dexectype=TestHBaseStorage  -DdebugPort=9999 
-Dhadoopversion=23  test) and i don't know why.
pig-hbase.pig
{code}
a = load 'hbase://pigtable_1' using 
org.apache.pig.backend.hadoop.hbase.HBaseStorage('pig:col_a pig:col_b 
pig:col_c','-loadKey') as (rowKey:chararray,col_a:int, col_b:double, 
col_c:chararray);
b = FOREACH a GENERATE col_a, col_c;
store b into './testLoadWithProjection_2.out'; 
{code}

If you don't deploy the hbase enviroment , you can add log as following to 
verify:
org.apache.pig.backend.hadoop.hbase.HBaseStorage#HBaseStorage(java.lang.String, 
java.lang.String)
{code}
public HBaseStorage(String columnList, String optString) throws ParseException, 
IOException {
....
        String defaultCaster = 
UDFContext.getUDFContext().getClientSystemProps()!=null? 
UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, 
STRING_CASTER):STRING_CASTER;
        +  LOG.debug("thread:"+Thread.currentThread().getName()+" 
UDFContext.getUDFContext().getClientSystemProps().getProperty(\"pig.hbase.caster\"):"+UDFContext.getUDFContext().getClientSystemProps().getProperty("pig.hbase.caster"));
 ...
} 
{code}


> Fix remaining unit test failures about "TestHBaseStorage"
> ---------------------------------------------------------
>
>                 Key: PIG-4611
>                 URL: https://issues.apache.org/jira/browse/PIG-4611
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4611.patch
>
>
> In https://builds.apache.org/job/Pig-spark/lastCompletedBuild/testReport/, it 
> shows following unit test failures about TestHBaseStorage:
>  org.apache.pig.test.TestHBaseStorage.testStoreToHBase_1_with_delete  
>  org.apache.pig.test.TestHBaseStorage.testLoadWithProjection_1
>  org.apache.pig.test.TestHBaseStorage.testLoadWithProjection_2        
>  org.apache.pig.test.TestHBaseStorage.testStoreToHBase_2_with_projection
>  org.apache.pig.test.TestHBaseStorage.testCollectedGroup      
>  org.apache.pig.test.TestHBaseStorage.testHeterogeneousScans



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to