Hi PySpark users,
We need to be able to run large Hive queries in PySpark 1.2.1. Users are
running PySpark on an Edge Node, and submit jobs to a Cluster that
allocates YARN resources to the clients.
We are using MapR as the Hadoop Distribution on top of Hive 0.13 and Spark
1.2.1.
Currently, our process for writing queries works only for small result
sets, for example:
*from pyspark.sql import HiveContext*
*sqlContext = HiveContext(sc)*
*results = sqlContext.sql("select column from database.table limit
10").collect()*
*results*
<outputs resultset here>
How do I save the HiveQL query to RDD first, then output the results?
This is the error I get when running a query that requires output of
400,000 rows:
*from pyspark.sql import HiveContext*
*sqlContext = HiveContext(sc)*
*results = sqlContext.sql("select column from database.table").collect()*
*results*
...
/path/to/mapr/spark/spark-1.2.1/python/pyspark/sql.py in collect(self)
1976 """ 1977 with SCCallSiteSync(self.context) as
css:-> 1978 bytesInJava =
self._jschema_rdd.baseSchemaRDD().collectToPython().iterator() 1979
cls = _create_cls(self.schema()) 1980 return map(cls,
self._collect_iterator_through_file(bytesInJava))
/path/to/mapr/spark/spark-1.2.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args) 536 answer =
self.gateway_client.send_command(command) 537 return_value
= get_return_value(answer, self.gateway_client,--> 538
self.target_id, self.name) 539 540 for temp_arg in
temp_args:
/path/to/mapr/spark/spark-1.2.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name) 298
raise Py4JJavaError( 299 'An error
occurred while calling {0}{1}{2}.\n'.--> 300
format(target_id, '.', name), value) 301 else: 302
raise Py4JError(
Py4JJavaError: An error occurred while calling o76.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Exception while getting task result: java.io.IOException: Failed to
connect to cluster_node/IP_address:port
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
For this example, ideally, this query should output the 400,000 row
resultset.
Thanks for your help,
*Nikolay Voronchikhin*
https://www.linkedin.com/in/nvoronchikhin
*E-mail: [email protected] <[email protected]>*
* <[email protected]>*