I concur that printSchema works; it just seems to be operations that use the data where trouble happens.
Thanks for posting the bug. -Brad On Tue, Aug 5, 2014 at 10:05 PM, Yin Huai <yh...@databricks.com> wrote: > I tried jsonRDD(...).printSchema() and it worked. Seems the problem is > when we take the data back to the Python side, SchemaRDD#javaToPython > failed on your cases. I have created > https://issues.apache.org/jira/browse/SPARK-2875 to track it. > > Thanks, > > Yin > > > On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller <bmill...@eecs.berkeley.edu> > wrote: > >> Hi All, >> >> I checked out and built master. Note that Maven had a problem building >> Kafka (in my case, at least); I was unable to fix this easily so I moved on >> since it seemed unlikely to have any influence on the problem at hand. >> >> Master improves functionality (including the example Nicholas just >> demonstrated) but unfortunately there still seems to be a bug related to >> using dictionaries as values. I've put some code below to illustrate the >> bug. >> >> *# dictionary as value works fine* >> > print sqlCtx.jsonRDD(sc.parallelize(['{"key0": {"key1": >> "value"}}'])).collect() >> [Row(key0=Row(key1=u'value'))] >> >> *# dictionary as value works fine, even when inner keys are varied* >> > print sqlCtx.jsonRDD(sc.parallelize(['{"key0": {"key1": "value1"}}', >> '{"key0": {"key2": "value2"}}'])).collect() >> [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None, >> key2=u'value2'))] >> >> *# dictionary as value works fine when inner keys are missing and outer >> key is present* >> > print sqlCtx.jsonRDD(sc.parallelize(['{"key0": {}}', '{"key0": {"key1": >> "value1"}}'])).collect() >> [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))] >> >> *# dictionary as value FAILS when outer key is missing* >> *> print sqlCtx.jsonRDD(sc.parallelize(['{}', '{"key0": {"key1": >> "value1"}}'])).collect()* >> Py4JJavaError: An error occurred while calling o84.collect. >> : org.apache.spark.SparkException: Job aborted due to stage failure: Task >> 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in >> stage 7.0 (TID 242, engelland.research.intel-research.net): >> java.lang.NullPointerException... >> >> *# dictionary as value FAILS when outer key is present with null value* >> *> print sqlCtx.jsonRDD(sc.parallelize(['{"key0": null}', '{"key0": >> {"key1": "value1"}}'])).collect()* >> Py4JJavaError: An error occurred while calling o98.collect. >> : org.apache.spark.SparkException: Job aborted due to stage failure: Task >> 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in >> stage 9.0 (TID 305, kunitz.research.intel-research.net): >> java.lang.NullPointerException... >> >> *# nested lists work even when outer key is missing* >> > print sqlCtx.jsonRDD(sc.parallelize(['{}', '{"key0": [["item0", >> "item1"], ["item2", "item3"]]}'])).collect() >> [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])] >> >> Is anyone able to replicate this behavior? >> >> -Brad >> >> >> >> >> On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust <mich...@databricks.com> >> wrote: >> >>> We try to keep master very stable, but this is where active development >>> happens. YMMV, but a lot of people do run very close to master without >>> incident (myself included). >>> >>> branch-1.0 has been cut for a while and we only merge bug fixes into it >>> (this is more strict for non-alpha components like spark core.). For Spark >>> SQL, this branch is pretty far behind as the project is very young and we >>> are fixing bugs / adding features very rapidly compared with Spark core. >>> >>> branch-1.1 was just cut and is being QAed for a release, at this point >>> its likely the same as master, but that will change as features start >>> getting added to master in the coming weeks. >>> >>> >>> >>> On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas < >>> nicholas.cham...@gmail.com> wrote: >>> >>>> collect() works, too. >>>> >>>> >>> sqlContext.jsonRDD(sc.parallelize(['{"foo":[[1,2,3], [4,5,6]]}', >>>> >>> '{"foo":[[1,2,3], [4,5,6]]}'])).collect() >>>> [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])] >>>> >>>> Can’t answer your question about branch stability, though. Spark is a >>>> very active project, so stuff is happening all the time. >>>> >>>> Nick >>>> >>>> >>>> >>>> On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller <bmill...@eecs.berkeley.edu >>>> > wrote: >>>> >>>>> Hi Nick, >>>>> >>>>> Can you check that the call to "collect()" works as well as >>>>> "printSchema()"? I actually experience that "printSchema()" works fine, >>>>> but then it crashes on "collect()". >>>>> >>>>> In general, should I expect the master (which seems to be on >>>>> branch-1.1) to be any more/less stable than branch-1.0? While it would be >>>>> great to have this fixed, it would be good to know if I should expect lots >>>>> of other instability. >>>>> >>>>> best, >>>>> -Brad >>>>> >>>>> >>>>> On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas < >>>>> nicholas.cham...@gmail.com> wrote: >>>>> >>>>>> This looks to be fixed in master: >>>>>> >>>>>> >>> from pyspark.sql import SQLContext>>> sqlContext = SQLContext(sc) >>>>>> >>> sc.parallelize(['{"foo":[[1,2,3], [4,5,6]]}', '{"foo":[[1,2,3], >>>>>> >>> [4,5,6]]}' >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> ]) >>>>>> ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315>>> >>>>>> sqlContext.jsonRDD(sc.parallelize(['{"foo":[[1,2,3], [4,5,6]]}', >>>>>> '{"foo":[[1,2,3], [4,5,6]]}'])) >>>>>> MapPartitionsRDD[14] at mapPartitions at SchemaRDD.scala:408>>> >>>>>> sqlContext.jsonRDD(sc.parallelize(['{"foo":[[1,2,3], [4,5,6]]}', >>>>>> '{"foo":[[1,2,3], [4,5,6]]}'])).printSchema() >>>>>> root >>>>>> |-- foo: array (nullable = true) >>>>>> | |-- element: array (containsNull = false) >>>>>> | | |-- element: integer (containsNull = false) >>>>>> >>>>>> >>> >>>>>> >>>>>> Nick >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Aug 5, 2014 at 7:12 PM, Brad Miller < >>>>>> bmill...@eecs.berkeley.edu> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I've built and deployed the current head of branch-1.0, but it seems >>>>>>> to have only partly fixed the bug. >>>>>>> >>>>>>> This code now runs as expected with the indicated output: >>>>>>> > srdd = sqlCtx.jsonRDD(sc.parallelize(['{"foo":[1,2,3]}', >>>>>>> '{"foo":[4,5,6]}'])) >>>>>>> > srdd.printSchema() >>>>>>> root >>>>>>> |-- foo: ArrayType[IntegerType] >>>>>>> > srdd.collect() >>>>>>> [{u'foo': [1, 2, 3]}, {u'foo': [4, 5, 6]}] >>>>>>> >>>>>>> This code still crashes: >>>>>>> > srdd = sqlCtx.jsonRDD(sc.parallelize(['{"foo":[[1,2,3], >>>>>>> [4,5,6]]}', '{"foo":[[1,2,3], [4,5,6]]}'])) >>>>>>> > srdd.printSchema() >>>>>>> root >>>>>>> |-- foo: ArrayType[ArrayType(IntegerType)] >>>>>>> > srdd.collect() >>>>>>> Py4JJavaError: An error occurred while calling o63.collect. >>>>>>> : org.apache.spark.SparkException: Job aborted due to stage failure: >>>>>>> Task 3.0:29 failed 4 times, most recent failure: Exception failure in >>>>>>> TID >>>>>>> 67 on host kunitz.research.intel-research.net: >>>>>>> net.razorvine.pickle.PickleException: couldn't introspect javabean: >>>>>>> java.lang.IllegalArgumentException: wrong number of arguments >>>>>>> >>>>>>> I may be able to see if this is fixed in master, but since it's not >>>>>>> fixed in 1.0.3 it seems unlikely to be fixed in master either. I >>>>>>> previously >>>>>>> tried master as well, but ran into a build problem that did not occur >>>>>>> with >>>>>>> the 1.0 branch. >>>>>>> >>>>>>> Can anybody else verify that the second example still crashes (and >>>>>>> is meant to work)? If so, would it be best to modify JIRA-2376 or start >>>>>>> a >>>>>>> new bug? >>>>>>> https://issues.apache.org/jira/browse/SPARK-2376 >>>>>>> >>>>>>> best, >>>>>>> -Brad >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Aug 5, 2014 at 12:10 PM, Brad Miller < >>>>>>> bmill...@eecs.berkeley.edu> wrote: >>>>>>> >>>>>>>> Nick: Thanks for both the original JIRA bug report and the link. >>>>>>>> >>>>>>>> Michael: This is on the 1.0.1 release. I'll update to master and >>>>>>>> follow-up if I have any problems. >>>>>>>> >>>>>>>> best, >>>>>>>> -Brad >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Aug 5, 2014 at 12:04 PM, Michael Armbrust < >>>>>>>> mich...@databricks.com> wrote: >>>>>>>> >>>>>>>>> Is this on 1.0.1? I'd suggest running this on master or the >>>>>>>>> 1.1-RC which should be coming out this week. Pyspark did not have >>>>>>>>> good >>>>>>>>> support for nested data previously. If you still encounter issues >>>>>>>>> using a >>>>>>>>> more recent version, please file a JIRA. Thanks! >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller < >>>>>>>>> bmill...@eecs.berkeley.edu> wrote: >>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> >>>>>>>>>> I am interested to use jsonRDD and jsonFile to create a SchemaRDD >>>>>>>>>> out of some JSON data I have, but I've run into some instability >>>>>>>>>> involving >>>>>>>>>> the following java exception: >>>>>>>>>> >>>>>>>>>> An error occurred while calling o1326.collect. >>>>>>>>>> : org.apache.spark.SparkException: Job aborted due to stage >>>>>>>>>> failure: Task 181.0:29 failed 4 times, most recent failure: Exception >>>>>>>>>> failure in TID 1664 on host neal.research.intel-research.net: >>>>>>>>>> net.razorvine.pickle.PickleException: couldn't introspect javabean: >>>>>>>>>> java.lang.IllegalArgumentException: wrong number of arguments >>>>>>>>>> >>>>>>>>>> I've pasted code which produces the error as well as the full >>>>>>>>>> traceback below. Note that I don't have any problem when I parse >>>>>>>>>> the JSON >>>>>>>>>> myself and use inferSchema. >>>>>>>>>> >>>>>>>>>> Is anybody able to reproduce this bug? >>>>>>>>>> >>>>>>>>>> -Brad >>>>>>>>>> >>>>>>>>>> > srdd = sqlCtx.jsonRDD(sc.parallelize(['{"foo":"bar", >>>>>>>>>> "baz":[1,2,3]}', '{"foo":"boom", "baz":[1,2,3]}'])) >>>>>>>>>> > srdd.printSchema() >>>>>>>>>> >>>>>>>>>> root >>>>>>>>>> |-- baz: ArrayType[IntegerType] >>>>>>>>>> |-- foo: StringType >>>>>>>>>> >>>>>>>>>> > srdd.collect() >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> --------------------------------------------------------------------------- >>>>>>>>>> Py4JJavaError Traceback (most recent >>>>>>>>>> call last) >>>>>>>>>> <ipython-input-89-ec7e8e8c68c4> in <module>() >>>>>>>>>> ----> 1 srdd.collect() >>>>>>>>>> >>>>>>>>>> /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in >>>>>>>>>> collect(self) >>>>>>>>>> 581 """ >>>>>>>>>> 582 with _JavaStackTrace(self.context) as st: >>>>>>>>>> --> 583 bytesInJava = self._jrdd.collect().iterator() >>>>>>>>>> 584 return >>>>>>>>>> list(self._collect_iterator_through_file(bytesInJava)) >>>>>>>>>> 585 >>>>>>>>>> >>>>>>>>>> /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in >>>>>>>>>> __call__(self, *args) >>>>>>>>>> 535 answer = self.gateway_client.send_command(command) >>>>>>>>>> 536 return_value = get_return_value(answer, >>>>>>>>>> self.gateway_client, >>>>>>>>>> --> 537 self.target_id, self.name) >>>>>>>>>> 538 >>>>>>>>>> 539 for temp_arg in temp_args: >>>>>>>>>> >>>>>>>>>> /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in >>>>>>>>>> get_return_value(answer, gateway_client, target_id, name) >>>>>>>>>> 298 raise Py4JJavaError( >>>>>>>>>> 299 'An error occurred while calling >>>>>>>>>> {0}{1}{2}.\n'. >>>>>>>>>> --> 300 format(target_id, '.', name), value) >>>>>>>>>> 301 else: >>>>>>>>>> 302 raise Py4JError( >>>>>>>>>> >>>>>>>>>> Py4JJavaError: An error occurred while calling o1326.collect. >>>>>>>>>> : org.apache.spark.SparkException: Job aborted due to stage >>>>>>>>>> failure: Task 181.0:29 failed 4 times, most recent failure: Exception >>>>>>>>>> failure in TID 1664 on host neal.research.intel-research.net: >>>>>>>>>> net.razorvine.pickle.PickleException: couldn't introspect javabean: >>>>>>>>>> java.lang.IllegalArgumentException: wrong number of arguments >>>>>>>>>> >>>>>>>>>> net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603) >>>>>>>>>> net.razorvine.pickle.Pickler.dispatch(Pickler.java:299) >>>>>>>>>> net.razorvine.pickle.Pickler.save(Pickler.java:125) >>>>>>>>>> net.razorvine.pickle.Pickler.put_map(Pickler.java:322) >>>>>>>>>> net.razorvine.pickle.Pickler.dispatch(Pickler.java:286) >>>>>>>>>> net.razorvine.pickle.Pickler.save(Pickler.java:125) >>>>>>>>>> >>>>>>>>>> net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392) >>>>>>>>>> net.razorvine.pickle.Pickler.dispatch(Pickler.java:195) >>>>>>>>>> net.razorvine.pickle.Pickler.save(Pickler.java:125) >>>>>>>>>> net.razorvine.pickle.Pickler.dump(Pickler.java:95) >>>>>>>>>> net.razorvine.pickle.Pickler.dumps(Pickler.java:80) >>>>>>>>>> >>>>>>>>>> org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) >>>>>>>>>> >>>>>>>>>> org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) >>>>>>>>>> scala.collection.Iterator$anon$11.next(Iterator.scala:328) >>>>>>>>>> >>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294) >>>>>>>>>> >>>>>>>>>> org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) >>>>>>>>>> >>>>>>>>>> org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) >>>>>>>>>> >>>>>>>>>> org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) >>>>>>>>>> >>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) >>>>>>>>>> >>>>>>>>>> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174) >>>>>>>>>> Driver stacktrace: >>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>>>>>>> $apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) >>>>>>>>>> at >>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>>>>> at >>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) >>>>>>>>>> at scala.Option.foreach(Option.scala:236) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) >>>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>>>>>>>>> at >>>>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>>>>>>>>> at >>>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>>>>> at >>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>>>>> at >>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>>>>> at >>>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >