I’m having some difficulty getting the desired results fromthe Spark Python example hbase_inputformat.py. I’m running with CDH5.4, hbaseVersion 1.0.0, Spark v 1.3.0 Using Python version 2.6.6 I followed the example to create a test HBase table. Here’sthe data from the table I created – hbase(main):001:0> scan 'dev_wx_test'ROW COLUMN+CELLrow1 column=f1:a, timestamp=1438716994027, value=value1row1 column=f1:b, timestamp=1438717004248, value=value2row2 column=f1:, timestamp=1438717014529, value=value3row3 column=f1:, timestamp=1438717022756, value=value43 row(s) in 0.2620 seconds When either of these statements are included -“hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n"))” or “hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n")).countByValue().items()” the result is - We only get the following printed; (row1, value2) is notprinted: ((u'row1', u'value1'), 1) ((u'row2', u'value3'), 1) ((u'row3', u'value4'), 1) This looks like similar results to the following post Ifound -http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-td18613.html#a18650but it appears the pythonconverterHBaseResultToStringConverter has been updated since then. When the statement “hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n")).mapValues(json.loads)” is included, the result is – ValueError: No JSON object could be decoded ************************************************************************************** Here is more info on this from the log – Traceback (most recent call last):
File"hbase_inputformat.py", line 87, in <module> output =hbase_rdd.collect() File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py",line 701, in collect File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/java_gateway.py",line 538, in __call__ File "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/protocol.py",line 300, in get_return_value py4j.protocol.Py4JJavaError: An erroroccurred while calling o44.collect. : org.apache.spark.SparkException: Jobaborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recentfailure: Lost task 0.3 in stage 1.0 (TID 4, stluhdpddev27.monsanto.com):org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py",line 101, in main process() File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py",line 96, in process serializer.dump_stream(func(split_index, iterator), outfile) File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/serializers.py",line 236, in dump_stream vs =list(itertools.islice(iterator, batch)) File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py",line 1807, in <lambda> File"/usr/lib64/python2.6/json/__init__.py", line 307, in loads return_default_decoder.decode(s) File"/usr/lib64/python2.6/json/decoder.py", line 319, in decode obj, end =self.raw_decode(s, idx=_w(s, 0).end()) File "/usr/lib64/python2.6/json/decoder.py",line 338, in raw_decode raiseValueError("No JSON object could be decoded") ValueError: No JSON object could bedecoded at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135) at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) atscala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) atorg.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) atorg.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) atorg.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Any suggestions would be most welcome. ****************************************************************Below is the code we’re running. We did add a few things tothe original example in our attempts to get it working. from __future__ import print_function import sysimport json from pyspark import SparkContextfrom pyspark.conf import SparkConf import os.pathos.environ["SPARK_HOME"] ="/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/"conf = (SparkConf().setMaster('local').setAppName('a')) if __name__ == "__main__": if len(sys.argv) != 3: print(""" Usage:hbase_inputformat <host> <table> Run with example jar: ./bin/spark-submit--driver-class-path /path/to/example/jar \ /path/to/examples/hbase_inputformat.py <host> <table> [<znode>] Assumes you have somedata in HBase already, running on <host>, in <table> optionally,you can specify parent znode for your hbase cluster - <znode> """,file=sys.stderr) exit(-1) host = sys.argv[1]table = sys.argv[2]sc = SparkContext(appName="HBaseInputFormat") # Other options for configuring scan behavior are available.More information available at# https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.javaconf = {"hbase.zookeeper.quorum": host,"hbase.mapreduce.inputtable": table}if len(sys.argv) > 3: conf ={"hbase.zookeeper.quorum": host, "zookeeper.znode.parent":sys.argv[3], "hbase.mapreduce.inputtable": table}keyConv ="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" hbase_rdd = sc.newAPIHadoopRDD( "org.apache.hadoop.hbase.mapreduce.TableInputFormat", "org.apache.hadoop.hbase.io.ImmutableBytesWritable", "org.apache.hadoop.hbase.client.Result", keyConverter=keyConv, valueConverter=valueConv, conf=conf)hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n")).mapValues(json.loads)# hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n"))# hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n")).countByValue().items() output = hbase_rdd.collect()# output = hbase_rddfor (k, v) in output: print((k, v)) sc.stop()