I’m having some difficulty getting the desired results fromthe Spark Python 
example hbase_inputformat.py. I’m running with CDH5.4, hbaseVersion 1.0.0, 
Spark v 1.3.0 Using Python version 2.6.6
 
I followed the example to create a test HBase table. Here’sthe data from the 
table I created – hbase(main):001:0> scan 'dev_wx_test'ROW                      
COLUMN+CELLrow1                    column=f1:a, timestamp=1438716994027, 
value=value1row1                    column=f1:b, timestamp=1438717004248, 
value=value2row2                    column=f1:, timestamp=1438717014529, 
value=value3row3                    column=f1:, timestamp=1438717022756, 
value=value43 row(s) in 0.2620 seconds
 
When either of these statements are included -“hbase_rdd = 
hbase_rdd.flatMapValues(lambda v:v.split("\n"))”  or “hbase_rdd = 
hbase_rdd.flatMapValues(lambda v:v.split("\n")).countByValue().items()” the 
result is - We only get the following printed; (row1, value2) is notprinted: 
        ((u'row1', u'value1'), 1)        ((u'row2', u'value3'), 1)        
((u'row3', u'value4'), 1)
 
This looks like similar results to the following post Ifound 
-http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-td18613.html#a18650but
 it appears the pythonconverterHBaseResultToStringConverter has been updated 
since then.
 
When the statement “hbase_rdd = hbase_rdd.flatMapValues(lambda 
v:v.split("\n")).mapValues(json.loads)” is included, the result is – 
ValueError: No JSON object could be decoded 
**************************************************************************************
 Here is more info on this from the log – Traceback (most recent call last):

  File"hbase_inputformat.py", line 87, in <module>

    output =hbase_rdd.collect()

  
File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py",line
 701, in collect

  
File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/java_gateway.py",line
 538, in __call__

  File 
"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/protocol.py",line
 300, in get_return_value

py4j.protocol.Py4JJavaError: An erroroccurred while calling o44.collect.

: org.apache.spark.SparkException: Jobaborted due to stage failure: Task 0 in 
stage 1.0 failed 4 times, most recentfailure: Lost task 0.3 in stage 1.0 (TID 
4, stluhdpddev27.monsanto.com):org.apache.spark.api.python.PythonException: 
Traceback (most recent call last):

  File 
"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py",line
 101, in main

    process()

  
File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py",line
 96, in process

   serializer.dump_stream(func(split_index, iterator), outfile)

  
File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/serializers.py",line
 236, in dump_stream

    vs =list(itertools.islice(iterator, batch))

  
File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py",line
 1807, in <lambda>

  File"/usr/lib64/python2.6/json/__init__.py", line 307, in loads

    return_default_decoder.decode(s)

  File"/usr/lib64/python2.6/json/decoder.py", line 319, in decode

    obj, end =self.raw_decode(s, idx=_w(s, 0).end())

  File "/usr/lib64/python2.6/json/decoder.py",line 338, in raw_decode

    raiseValueError("No JSON object could be decoded")

ValueError: No JSON object could bedecoded


 
       at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)

       at 
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)

       at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)

       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)

       at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)

       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

       at org.apache.spark.scheduler.Task.run(Task.scala:64)

       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

       
atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

       at java.lang.Thread.run(Thread.java:745)


 
Driver stacktrace:

       at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)

       
atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)

       
atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)

       
atscala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

       at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)

       
atorg.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

       at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

       at scala.Option.foreach(Option.scala:236)

       
atorg.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)

       at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)

       
atorg.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)

       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


 
Any suggestions would be most welcome.
 
****************************************************************Below is the 
code we’re running. We did add a few things tothe original example in our 
attempts to get it working. 
 
from __future__ import print_function
 
import sysimport json
 
from pyspark import SparkContextfrom pyspark.conf import SparkConf
 
import os.pathos.environ["SPARK_HOME"] 
="/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/"conf = 
(SparkConf().setMaster('local').setAppName('a'))
 
if __name__ == "__main__":   if len(sys.argv) != 3:      print("""       
Usage:hbase_inputformat <host> <table>       Run with example jar:       
./bin/spark-submit--driver-class-path /path/to/example/jar \      
/path/to/examples/hbase_inputformat.py <host> <table> [<znode>]       Assumes 
you have somedata in HBase already, running on <host>, in <table>         
optionally,you can specify parent znode for your hbase cluster - <znode>       
""",file=sys.stderr)       exit(-1)
 
host = sys.argv[1]table = sys.argv[2]sc = 
SparkContext(appName="HBaseInputFormat")
 
# Other options for configuring scan behavior are available.More information 
available at# 
https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.javaconf
 = {"hbase.zookeeper.quorum": host,"hbase.mapreduce.inputtable": table}if 
len(sys.argv) > 3:    conf ={"hbase.zookeeper.quorum": host, 
"zookeeper.znode.parent":sys.argv[3],           "hbase.mapreduce.inputtable": 
table}keyConv 
="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"valueConv
 = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
 
hbase_rdd = sc.newAPIHadoopRDD(   
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",   
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",   
"org.apache.hadoop.hbase.client.Result",    keyConverter=keyConv,    
valueConverter=valueConv,    conf=conf)hbase_rdd = 
hbase_rdd.flatMapValues(lambda v:v.split("\n")).mapValues(json.loads)# 
hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n"))# hbase_rdd = 
hbase_rdd.flatMapValues(lambda v:v.split("\n")).countByValue().items()
 
output = hbase_rdd.collect()# output = hbase_rddfor (k, v) in output:    
print((k, v))
 
sc.stop()

Reply via email to