Re: SQLCtx cacheTable
On 08/04/2014 10:57 PM, Michael Armbrust wrote: If mesos is allocating a container that is exactly the same as the max heap size then that is leaving no buffer space for non-heap JVM memory, which seems wrong to me. This can be a cause. I am now wondering how mesos pick up the size and setup the -Xmx parameter. The problem here is that cacheTable is more aggressive about grabbing large ByteBuffers during caching (which it later releases when it knows the exact size of the data) There is a discussion here about trying to improve this: https://issues.apache.org/jira/browse/SPARK-2650 I am not sure if this issue is the one which is causing issue for us. As we have approx 60GB of cached data size, where as each executor memory is 17GB and there are 15 of them so in total 255GB which is way more than cached data of 60GB. Any suggestions as where to look for changing the mesos setting in this case. - Gurvinder On Sun, Aug 3, 2014 at 11:35 PM, Gurvinder Singh gurvinder.si...@uninett.no mailto:gurvinder.si...@uninett.no wrote: On 08/03/2014 02:33 AM, Michael Armbrust wrote: I am not a mesos expert... but it sounds like there is some mismatch between the size that mesos is giving you and the maximum heap size of the executors (-Xmx). It seems that mesos is giving the correct size to java process. It has exact size set in -Xms/-Xmx params. Do you if somehow I can find which class or thread inside the spark jvm process is using how much memory and see which makes it to reach the memory limit on CacheTable case where as not in cache RDD case. - Gurvinder On Fri, Aug 1, 2014 at 12:07 AM, Gurvinder Singh gurvinder.si...@uninett.no mailto:gurvinder.si...@uninett.no mailto:gurvinder.si...@uninett.no mailto:gurvinder.si...@uninett.no wrote: It is not getting out of memory exception. I am using Mesos as cluster manager and it says when I use cacheTable that the container has used all of its allocated memory and thus kill it. I can see it in the logs on mesos-slave where executor runs. But on the web UI of spark application, it shows that is still have 4-5GB space left for caching/storing. So I am wondering how the memory is handled in cacheTable case. Does it reserve the memory storage and other parts run out of their memory. I also tries to change the spark.storage.memoryFraction but that did not help. - Gurvinder On 08/01/2014 08:42 AM, Michael Armbrust wrote: Are you getting OutOfMemoryExceptions with cacheTable? or what do you mean when you say you have to specify larger executor memory? You might be running into SPARK-2650 https://issues.apache.org/jira/browse/SPARK-2650. Is there something else you are trying to accomplish by setting the persistence level? If you are looking for something like DISK_ONLY you can simulate that now using saveAsParquetFile and parquetFile. It is possible long term that we will automatically map the standard RDD persistence levels to these more efficient implementations in the future. On Thu, Jul 31, 2014 at 11:26 PM, Gurvinder Singh gurvinder.si...@uninett.no mailto:gurvinder.si...@uninett.no mailto:gurvinder.si...@uninett.no mailto:gurvinder.si...@uninett.no mailto:gurvinder.si...@uninett.no mailto:gurvinder.si...@uninett.no mailto:gurvinder.si...@uninett.no mailto:gurvinder.si...@uninett.no wrote: Thanks Michael for explaination. Actually I tried caching the RDD and making table on it. But the performance for cacheTable was 3X better than caching RDD. Now I know why it is better. But is it possible to add the support for persistence level into cacheTable itself like RDD. May be it is not related, but on the same size of data set, when I use cacheTable I have to specify larger executor memory than I need in case of caching RDD. Although in the storage tab on status web UI, the memory footprint is almost same 58.3 GB in cacheTable and 59.7GB in cache RDD. Is it possible that there is some memory leak or cacheTable works differently and thus require higher memory. The difference is 5GB per executor for the dataset of size 122 GB. Thanks, Gurvinder On 08/01/2014 04:42 AM, Michael Armbrust wrote: cacheTable uses a special columnar
Re: Can't see any thing one the storage panel of application UI
You need to use persist or cache those rdds to appear in the Storage. Unless you do it, those rdds will be computed again. Thanks Best Regards On Tue, Aug 5, 2014 at 8:03 AM, binbinbin915 binbinbin...@live.cn wrote: Actually, if you don’t use method like persist or cache, it even not store the rdd to the disk. Every time you use this rdd, they just compute it from the original one. In logistic regression from mllib, they don't persist the changed input , so I can't see the rdd from the web gui. I have changed the code and gained a 10x speed up. -- binbinbin915 Sent with Airmail -- View this message in context: Re: Can't see any thing one the storage panel of application UI http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-see-any-thing-one-the-storage-panel-of-application-UI-tp10296p11403.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: java.lang.IllegalStateException: unread block data while running the sampe WordCount program from Eclipse
Did you ever find a sln to this problem? I'm having similar issues. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalStateException-unread-block-data-while-running-the-sampe-WordCount-program-from-Ecle-tp8388p11412.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Deployment Patterns - Automated Deployment Performance Testing
Thanks AL! Thats what I though. I've setup nexus to maintain spark libs and download them when needed. For development purposes. Suppose we have a dev cluster. Is it possible to run the driver program locally (on a developers machine)? I..e just run the driver from the ID and have it connect to the master and worker nodes to ship out its tasks? Cheers, N -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Deployment-Patterns-Automated-Deployment-Performance-Testing-tp11000p11414.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark stream data from kafka topics and output as parquet file on HDFS
Hi, I am new to Apache Spark and Trying to Develop spark streaming program to *stream data from kafka topics and output as parquet file on HDFS*. Please share the *sample reference* program to stream data from kafka topics and output as parquet file on HDFS. Thanks in Advance. Regards, Rafeeq S *(“What you do is what matters, not what you think or say or plan.” )*
Running driver/SparkContent locally
I'm trying to run a local driver (on a development machine) and have this driver communicate with the Spark master and workers however I'm having a few problems getting the driver to connect and run a simple job from within an IDE. It all looks like it works but when I try to do something simple like a count it falls over with a *java.lang.IllegalStateException: unread block data*. *The stack-trace: * 14/08/05 17:09:48 WARN TaskSetManager: Lost TID 3 (task 0.0:0) 14/08/05 17:09:48 INFO TaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 3] 14/08/05 17:09:48 ERROR TaskSetManager: Task 0.0:0 failed 4 times; aborting job 14/08/05 17:09:48 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/08/05 17:09:48 INFO TaskSchedulerImpl: Cancelling stage 0 14/08/05 17:09:48 INFO DAGScheduler: Failed to run runJob at basicOperators.scala:136 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 4 times, most recent failure: Exception failure in TID 3 on host hadoop-004: java.lang.IllegalStateException: unread block data java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:148) java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1046) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1030) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:632) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:632) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:632) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 05/08/2014 5:09:45 PM INFO: parquet.hadoop.ParquetInputFormat: Total input paths to process : 1 05/08/2014 5:09:45 PM INFO: parquet.hadoop.ParquetFileReader: reading another 1 footers Some error on socket 3168 Any ideas where I've gone wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-driver-SparkContent-locally-tp11415.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail:
Re: about spark and using machine learning model
You can find in the following presentation a simple example of a clustering model use to classify new incoming tweet : https://www.youtube.com/watch?v=sPhyePwo7FA Regards, Julien 2014-08-05 7:08 GMT+02:00 Xiangrui Meng men...@gmail.com: Some extra work is needed to close the loop. One related example is streaming linear regression added by Jeremy very recently: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala You can use a model trained offline to serve a DStream and save the predictions (also a DStream) to somewhere, e.g., HDFS or stdout. Best, Xiangrui On Mon, Aug 4, 2014 at 9:28 PM, Hoai-Thu Vuong thuv...@gmail.com wrote: Hello everybody! I'm getting started with spark and mllib. I'm successful in building a small cluster and follow the tutorial. However, I would like to ask about how to use the model, which is trained by mllib. I understand that, with data we can training the model such as Classifier model, then use it to classify new input. Is there any case study to build a service upon spark or hdfs and using model (trained by above steps) and give output to user (class of input data). Thank you very much! -- Thu. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running driver/SparkContent locally
The code for this example is very simple; object SparkMain extends App with Serializable { val conf = new SparkConf(false) //.setAppName(cc-test) //.setMaster(spark://hadoop-001:7077) //.setSparkHome(/tmp) .set(spark.driver.host, 192.168.23.108) .set(spark.cores.max, 10) .set(spark.executor.memory, 512M) val sc = new SparkContext(spark://hadoop-001:7077, cc-test, conf) val hc = new HiveContext(sc) val input = hc.hql(select * from prod_qdw.prod_sales_summary where dt = '2013-01-01' limit 10) println(#Result: + input.collect) } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-driver-SparkContent-locally-tp11415p11418.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming at-least once guarantee
Hi Sanjeet, I have been using spark streaming for processing of files present in S3 and HDFS. I am also using SQS messages for the same purpose as yours i.e. pointer to S3 file. As of now, I have a separate SQS job which receive message from SQS queue and gets the corresponding file from S3. Now, I wasnt to integrate the SQS receiver with spark streaming. Like, my spark streaming job would listen for new SQS messages and proceed accordingly. I was wondering if you find any solution to this. Please let me know in case!! In your above approach, you can achieve #4 in the following way: When you are passing a forEach function to be applied on each RDD of Dstream, you can pass information of SQS message (lke receipthandle for deleting message) associated with that particualar file. After success/failure in processing you can perform deletion of your SQS message accordingly. Thanks --Lalit - Lalit Yadav la...@sigmoidanalytics.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-at-least-once-guarantee-tp10902p11419.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.lang.StackOverflowError
Hi, I am doing some basic preprocessing in pyspark (local mode as follows): files = [ input files] def read(filename,sc): #process file return rdd if __name__ ==__main__: conf = SparkConf() conf.setMaster('local') sc = SparkContext(conf =conf) sc.setCheckpointDir(root+temp/) data = sc.parallelize([]) for i,f in enumerate(files): data = data.union(read(f,sc)) if i ==20: data.checkpoint() data.count() if i == 500:break #print data.count() #rdd_1 = read(files[0],sc) data.saveAsTextFile(root+output/) But I see this error: keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o9564.saveAsTextFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError java.io.Bits.putInt(Bits.java:93) java.io.ObjectOutputStream$BlockDataOutputStream.writeInt(ObjectOutputStream.java:1927)
Re: Bad Digest error while doing aws s3 put
Is it possible that the Content-MD5 changes during multipart upload to s3? But even then, it succeeds if I increase the cluster configuration.. For ex. it throws Bad Digest error after writing 48/100 files when the cluster is of 3 m3.2xlarge slaves it throws Bad Digest error after writing 64/100 files when the cluster is of 4 m3.2xlarge slaves it throws Bad Digest error after writing 86/100 files when the cluster is of 5 m3.2xlarge slaves it succeeds writing all the 100 files when the cluster is of 6 m3.2xlarge slaves.. Please clarify. Regards, lmk -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bad-Digest-error-while-doing-aws-s3-put-tp10036p11421.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark stream data from kafka topics and output as parquet file on HDFS
You can try this Kafka Spark Consumer which I recently wrote. This uses the Low Level Kafka Consumer https://github.com/dibbhatt/kafka-spark-consumer Dibyendu On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s rafeeq.ec...@gmail.com wrote: Hi, I am new to Apache Spark and Trying to Develop spark streaming program to *stream data from kafka topics and output as parquet file on HDFS*. Please share the *sample reference* program to stream data from kafka topics and output as parquet file on HDFS. Thanks in Advance. Regards, Rafeeq S *(“What you do is what matters, not what you think or say or plan.” )*
Re: Low Level Kafka Consumer for Spark
Thanks Jonathan, Yes, till non-ZK based offset management is available in Kafka, I need to maintain the offset in ZK. And yes, both cases explicit commit is necessary. I modified the Low Level Kafka Spark Consumer little bit to have Receiver spawns threads for every partition of the topic and perform the 'store' operation in multiple threads. It would be good if the receiver.store methods are made thread safe..which is not now presently . Waiting for TD's comment on this Kafka Spark Low Level consumer. Regards, Dibyendu On Tue, Aug 5, 2014 at 5:32 AM, Jonathan Hodges hodg...@gmail.com wrote: Hi Yan, That is a good suggestion. I believe non-Zookeeper offset management will be a feature in the upcoming Kafka 0.8.2 release tentatively scheduled for September. https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management That should make this fairly easy to implement, but it will still require explicit offset commits to avoid data loss which is different than the current KafkaUtils implementation. Jonathan On Mon, Aug 4, 2014 at 4:51 PM, Yan Fang yanfang...@gmail.com wrote: Another suggestion that may help is that, you can consider use Kafka to store the latest offset instead of Zookeeper. There are at least two benefits: 1) lower the workload of ZK 2) support replay from certain offset. This is how Samza http://samza.incubator.apache.org/ deals with the Kafka offset, the doc is here http://samza.incubator.apache.org/learn/documentation/0.7.0/container/checkpointing.html . Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Sun, Aug 3, 2014 at 8:59 PM, Patrick Wendell pwend...@gmail.com wrote: I'll let TD chime on on this one, but I'm guessing this would be a welcome addition. It's great to see community effort on adding new streams/receivers, adding a Java API for receivers was something we did specifically to allow this :) - Patrick On Sat, Aug 2, 2014 at 10:09 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, I have implemented a Low Level Kafka Consumer for Spark Streaming using Kafka Simple Consumer API. This API will give better control over the Kafka offset management and recovery from failures. As the present Spark KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better control over the offset management which is not possible in Kafka HighLevel consumer. This Project is available in below Repo : https://github.com/dibbhatt/kafka-spark-consumer I have implemented a Custom Receiver consumer.kafka.client.KafkaReceiver. The KafkaReceiver uses low level Kafka Consumer API (implemented in consumer.kafka packages) to fetch messages from Kafka and 'store' it in Spark. The logic will detect number of partitions for a topic and spawn that many threads (Individual instances of Consumers). Kafka Consumer uses Zookeeper for storing the latest offset for individual partitions, which will help to recover in case of failure. The Kafka Consumer logic is tolerant to ZK Failures, Kafka Leader of Partition changes, Kafka broker failures, recovery from offset errors and other fail-over aspects. The consumer.kafka.client.Consumer is the sample Consumer which uses this Kafka Receivers to generate DStreams from Kafka and apply a Output operation for every messages of the RDD. We are planning to use this Kafka Spark Consumer to perform Near Real Time Indexing of Kafka Messages to target Search Cluster and also Near Real Time Aggregation using target NoSQL storage. Kindly let me know your view. Also if this looks good, can I contribute to Spark Streaming project. Regards, Dibyendu
Re: Spark stream data from kafka topics and output as parquet file on HDFS
Thanks Dibyendu. 1. Spark itself have api jar for kafka, still we require manual offset management (using simple consumer concept) and manual consumer ? 2.Kafka Spark Consumer which is implemented in kafka 0.8.0 ,Can we use it for kafka 0.8.1 ? 3.How to use Kafka Spark Consumer to produce output *as parquet file on HDFS ?* *Please give your suggestion.* Regards, Rafeeq S *(“What you do is what matters, not what you think or say or plan.” )* On Tue, Aug 5, 2014 at 11:55 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: You can try this Kafka Spark Consumer which I recently wrote. This uses the Low Level Kafka Consumer https://github.com/dibbhatt/kafka-spark-consumer Dibyendu On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s rafeeq.ec...@gmail.com wrote: Hi, I am new to Apache Spark and Trying to Develop spark streaming program to *stream data from kafka topics and output as parquet file on HDFS*. Please share the *sample reference* program to stream data from kafka topics and output as parquet file on HDFS. Thanks in Advance. Regards, Rafeeq S *(“What you do is what matters, not what you think or say or plan.” )*
Running Hive UDF from spark-shell fails due to datatype issue
Hi, I'm running Hive 0.13.1 and the latest master branch of Spark (built with SPARK_HIVE=true). I'm trying to compute Jaccard similarity using the Hive UDF from Brickhouse (https://github.com/klout/brickhouse/blob/master/src/main/java/brickhouse/udf/sketch/SetSimilarityUDF.java). *Hive table data:* hive select * from test_1; 1 [rock,pop] 2 [metal,rock] *DDL* create table test_1 (id int, val arraystring); From spark-shell, I am executing the following commands: val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext.hql(CREATE TEMPORARY FUNCTION jaccard_similarity AS 'brickhouse.udf.sketch.SetSimilarityUDF') hiveContext.hql(select jaccard_similarity(a.val, b.val) from test_1 a join test_1 b) I get the following error: warning: there were 1 deprecation warning(s); re-run with -deprecation for details 14/08/05 13:54:53 INFO ParseDriver: Parsing command: select jaccard_similarity(a.val, b.val) from test_1 a join test_1 b 14/08/05 13:54:53 INFO ParseDriver: Parse Completed 14/08/05 13:54:53 INFO HiveMetaStore: 0: get_table : db=default tbl=test_1 14/08/05 13:54:53 INFO audit: ugi=chandrv1 ip=unknown-ip-addr cmd=get_table : db=default tbl=test_1 14/08/05 13:54:53 INFO HiveMetaStore: 0: get_table : db=default tbl=test_1 14/08/05 13:54:53 INFO audit: ugi=chandrv1 ip=unknown-ip-addr cmd=get_table : db=default tbl=test_1 scala.MatchError: ArrayType(StringType,false) (of class org.apache.spark.sql.catalyst.types.ArrayType) at org.apache.spark.sql.hive.HiveInspectors$typeInfoConversions.toTypeInfo(HiveInspectors.scala:216) at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$2.apply(hiveUdfs.scala:52) at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$2.apply(hiveUdfs.scala:52) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:52) at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:41) at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:41) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:41) at org.apache.spark.sql.hive.HiveContext$$anon$3.lookupFunction(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5$$anonfun$applyOrElse$3.applyOrElse(Analyzer.scala:131) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5$$anonfun$applyOrElse$3.applyOrElse(Analyzer.scala:129) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionDown$1(QueryPlan.scala:52) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1$$anonfun$apply$1.apply(QueryPlan.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at
Understanding RDD.GroupBy OutOfMemory Exceptions
I'm doing a simple groupBy on a fairly small dataset (80 files in HDFS, few gigs in total, line based, 500-2000 chars per line). I'm running Spark on 8 low-memory machines in a yarn cluster, i.e. something along the lines of: spark-submit ... --master yarn-client --num-executors 8 --executor-memory 3000m --executor-cores 1 I'm trying to do a simple groupByKey (see below), but it fails with a java.lang.OutOfMemoryError: GC overhead limit exceeded exception val keyvals = sc.newAPIHadoopFile(hdfs://...).map( someobj.produceKeyValTuple ) keyvals.groupByKey().count() I can count the group sizes using reduceByKey without problems, ensuring myself the problem isn't caused by a single excessively large group, nor by an excessive amount of groups : keyvals.map(s = (s._1, 1)).reduceByKey((a,b) = a+b).collect().foreach(println) // produces: // (key1,139368) // (key2,35335) // (key3,392744) // ... // (key13,197941) I've tried reformatting, reshuffling and increasing the groupBy level of parallelism: keyvals.groupByKey(24).count // fails keyvals.groupByKey(3000).count // fails keyvals.coalesce(24, true).groupByKey(24).count // fails keyvals.coalesce(3000, true).groupByKey(3000).count // fails I've tried playing around with spark.default.parallelism, and increasing spark.shuffle.memoryFraction to 0.8 while lowering spark.storage.memoryFraction to 0.1 The failing stage (count) will fail on task 2999 of 3000. I can't seem to find anything that suggests that groupBy shouldn't just spill to disk instead of keeping things in memory, but I just can't get it to work right, even on fairly small datasets. This should obviosuly not be the case, and I must be doing something wrong, but I have no idea where to start debugging this, or even trying to understand what's going on - for the same reason, I'm not looking for a solution to my specific problem, as much as I'm looking for insight into how to reliably group datasets in Spark. Notice that I've also posted this question to SO, before realising this mailing list is more active. I will update the SO thread, if I receive an answer here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-tp11427.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Setting spark.executor.memory problem
Hi, I wanted to make simple Spark app running in local mode with 2g spark.executor.memory and 1g for caching. But following code: val conf = new SparkConf() .setMaster(local) .setAppName(app) .set(spark.executor.memory, 2g) .set(spark.storage.memoryFraction, 0.5) val sc = new SparkContext(conf) doesn't work. In spark UI this variables are set properly but memory store is around 0.5 * 512MB (default spark.executor.memory) not 0.5 * 2GB: 14/08/05 15:34:00 INFO MemoryStore: MemoryStore started with capacity 245.8 MB. I have neither spark-defaults.conf nor spark-env.sh in my $SPARK_HOME/conf directory. I use Spark 1.0.0 How can I set this values properly? Thanks, Grzegorz
RE: Spark stream data from kafka topics and output as parquet file on HDFS
Hi Rafeeq, I think current Spark Streaming api can offer you the ability to fetch data from Kafka and store to another external store, if you do not care about management of consumer offset manually, there’s no need to use low level api as SimpleConsumer. For Kafka 0.8.1 compatibility, you can try to modify the pom file and rebuild Spark to try it, mostly I think it can work. For parquet file, I think if parquet offers its own OutputFormat that is extended from Hadoop’s OutputFormat, Spark can write data into parquet file, like sequence file or text file, you can do this as: DStream.foreach { rdd = rdd.saveAsHadoopFile(…) } to specify the OutputFormat you want. Thanks Jerry From: rafeeq s [mailto:rafeeq.ec...@gmail.com] Sent: Tuesday, August 05, 2014 5:37 PM To: Dibyendu Bhattacharya Cc: u...@spark.incubator.apache.org Subject: Re: Spark stream data from kafka topics and output as parquet file on HDFS Thanks Dibyendu. 1. Spark itself have api jar for kafka, still we require manual offset management (using simple consumer concept) and manual consumer ? 2.Kafka Spark Consumer which is implemented in kafka 0.8.0 ,Can we use it for kafka 0.8.1 ? 3.How to use Kafka Spark Consumer to produce output as parquet file on HDFS ? Please give your suggestion. Regards, Rafeeq S (“What you do is what matters, not what you think or say or plan.” ) On Tue, Aug 5, 2014 at 11:55 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.commailto:dibyendu.bhattach...@gmail.com wrote: You can try this Kafka Spark Consumer which I recently wrote. This uses the Low Level Kafka Consumer https://github.com/dibbhatt/kafka-spark-consumer Dibyendu On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s rafeeq.ec...@gmail.commailto:rafeeq.ec...@gmail.com wrote: Hi, I am new to Apache Spark and Trying to Develop spark streaming program to stream data from kafka topics and output as parquet file on HDFS. Please share the sample reference program to stream data from kafka topics and output as parquet file on HDFS. Thanks in Advance. Regards, Rafeeq S (“What you do is what matters, not what you think or say or plan.” )
Re: spark sql left join gives KryoException: Buffer overflow
I am also experiencing this kryo buffer problem. My join is left outer with under 40mb on the right side. I would expect the broadcast join to succeed in this case (hive did) Another problem is that the optimizer chose nested loop join for some reason I would expect broadcast (map side) hash join. Am I correct in my expectations? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-left-join-gives-KryoException-Buffer-overflow-tp10157p11432.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark sql left join gives KryoException: Buffer overflow
Yes Sent from my iPhone On Aug 5, 2014, at 7:38 AM, Dima Zhiyanov [via Apache Spark User List] ml-node+s1001560n11432...@n3.nabble.com wrote: I am also experiencing this kryo buffer problem. My join is left outer with under 40mb on the right side. I would expect the broadcast join to succeed in this case (hive did) Another problem is that the optimizer chose nested loop join for some reason I would expect broadcast (map side) hash join. Am I correct in my expectations? If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-left-join-gives-KryoException-Buffer-overflow-tp10157p11432.html To unsubscribe from spark sql left join gives KryoException: Buffer overflow, click here. NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-left-join-gives-KryoException-Buffer-overflow-tp10157p11433.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
master=local vs master=local[*]
Hi, I have Spark application which computes join of two RDDs. One contains around 150MB of data (7 million entries) second around 1,5MB (80 thousand entries) and result of this join contains 50MB of data (2 million entries). When I run it on one core (with master=local) it works correctly (whole process uses between 600 and 700MB of memory) but when I run it on all cores (with master=local[*]) it throws: java.lang.OutOfMemoryError: GC overhead limit exceeded and sometimes java.lang.OutOfMemoryError: Java heap space I have set spark.executor.memory=512m (default value). Does anyone know why above occurs? Thanks, Grzegorz
Spark SQL Thrift Server
I gave things working on my cluster with the sparksql thrift server. (Thank you Yin Huai at Databricks!) That said, I was curious how I can cache a table via my instance here? I tried the shark like create table table_cached as select * from table and that did not create a cached table. cacheTable(table) didn't parse in beeline. Any thoughts? Any pointers to documentation (*crosses fingers)?
Re: Spark SQL Thrift Server
We are working on an overhaul of the docs before the 1.1 release. In the mean time try: CACHE TABLE tableName. On Tue, Aug 5, 2014 at 9:02 AM, John Omernik j...@omernik.com wrote: I gave things working on my cluster with the sparksql thrift server. (Thank you Yin Huai at Databricks!) That said, I was curious how I can cache a table via my instance here? I tried the shark like create table table_cached as select * from table and that did not create a cached table. cacheTable(table) didn't parse in beeline. Any thoughts? Any pointers to documentation (*crosses fingers)?
Re: spark sql left join gives KryoException: Buffer overflow
For outer joins I'd recommend upgrading to master or waiting for a 1.1 release candidate (which should be out this week). On Tue, Aug 5, 2014 at 7:38 AM, Dima Zhiyanov dimazhiya...@hotmail.com wrote: I am also experiencing this kryo buffer problem. My join is left outer with under 40mb on the right side. I would expect the broadcast join to succeed in this case (hive did) Another problem is that the optimizer chose nested loop join for some reason I would expect broadcast (map side) hash join. Am I correct in my expectations? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-left-join-gives-KryoException-Buffer-overflow-tp10157p11432.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: master=local vs master=local[*]
The more cores you have, the less memory they will get. 512M is already quite small, and if you have 4 cores it will mean roughly 128M per task. Sometimes it is interesting to have less cores and more memory. how many cores do you have ? André On 2014-08-05 16:43, Grzegorz Białek wrote: Hi, I have Spark application which computes join of two RDDs. One contains around 150MB of data (7 million entries) second around 1,5MB (80 thousand entries) and result of this join contains 50MB of data (2 million entries). When I run it on one core (with master=local) it works correctly (whole process uses between 600 and 700MB of memory) but when I run it on all cores (with master=local[*]) it throws: java.lang.OutOfMemoryError: GC overhead limit exceeded and sometimes java.lang.OutOfMemoryError: Java heap space I have set spark.executor.memory=512m (default value). Does anyone know why above occurs? Thanks, Grzegorz -- André Bois-Crettez Software Architect Big Data Developer http://www.kelkoo.com/ Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 8, rue du Sentier 75002 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can't see any thing one the storage panel of application UI
Ah yes, Spark doesn't cache all of your RDDs by default. It turns out that caching things too aggressively can lead to suboptimal performance because there might be a lot of churn. If you don't call persist or cache then your RDDs won't actually be cached. Note that even once they're cached they can still be kicked out by LRU, however. -Andrew 2014-08-05 0:13 GMT-07:00 Akhil Das ak...@sigmoidanalytics.com: You need to use persist or cache those rdds to appear in the Storage. Unless you do it, those rdds will be computed again. Thanks Best Regards On Tue, Aug 5, 2014 at 8:03 AM, binbinbin915 binbinbin...@live.cn wrote: Actually, if you don’t use method like persist or cache, it even not store the rdd to the disk. Every time you use this rdd, they just compute it from the original one. In logistic regression from mllib, they don't persist the changed input , so I can't see the rdd from the web gui. I have changed the code and gained a 10x speed up. -- binbinbin915 Sent with Airmail -- View this message in context: Re: Can't see any thing one the storage panel of application UI http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-see-any-thing-one-the-storage-panel-of-application-UI-tp10296p11403.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Setting spark.executor.memory problem
Hi Grzegorz, For local mode you only have one executor, and this executor is your driver, so you need to set the driver's memory instead. *That said, in local mode, by the time you run spark-submit, a JVM has already been launched with the default memory settings, so setting spark.driver.memory in your conf won't actually do anything for you. Instead, you need to run spark-submit as follows bin/spark-submit --driver-memory 2g --class your.class.here app.jar This will start the JVM with 2G instead of the default 512M. -Andrew 2014-08-05 6:43 GMT-07:00 Grzegorz Białek grzegorz.bia...@codilime.com: Hi, I wanted to make simple Spark app running in local mode with 2g spark.executor.memory and 1g for caching. But following code: val conf = new SparkConf() .setMaster(local) .setAppName(app) .set(spark.executor.memory, 2g) .set(spark.storage.memoryFraction, 0.5) val sc = new SparkContext(conf) doesn't work. In spark UI this variables are set properly but memory store is around 0.5 * 512MB (default spark.executor.memory) not 0.5 * 2GB: 14/08/05 15:34:00 INFO MemoryStore: MemoryStore started with capacity 245.8 MB. I have neither spark-defaults.conf nor spark-env.sh in my $SPARK_HOME/conf directory. I use Spark 1.0.0 How can I set this values properly? Thanks, Grzegorz
Re: Setting spark.executor.memory problem
(Clarification: you'll need to pass in --driver-memory not just for local mode, but for any application you're launching with client deploy mode) 2014-08-05 9:24 GMT-07:00 Andrew Or and...@databricks.com: Hi Grzegorz, For local mode you only have one executor, and this executor is your driver, so you need to set the driver's memory instead. *That said, in local mode, by the time you run spark-submit, a JVM has already been launched with the default memory settings, so setting spark.driver.memory in your conf won't actually do anything for you. Instead, you need to run spark-submit as follows bin/spark-submit --driver-memory 2g --class your.class.here app.jar This will start the JVM with 2G instead of the default 512M. -Andrew 2014-08-05 6:43 GMT-07:00 Grzegorz Białek grzegorz.bia...@codilime.com: Hi, I wanted to make simple Spark app running in local mode with 2g spark.executor.memory and 1g for caching. But following code: val conf = new SparkConf() .setMaster(local) .setAppName(app) .set(spark.executor.memory, 2g) .set(spark.storage.memoryFraction, 0.5) val sc = new SparkContext(conf) doesn't work. In spark UI this variables are set properly but memory store is around 0.5 * 512MB (default spark.executor.memory) not 0.5 * 2GB: 14/08/05 15:34:00 INFO MemoryStore: MemoryStore started with capacity 245.8 MB. I have neither spark-defaults.conf nor spark-env.sh in my $SPARK_HOME/conf directory. I use Spark 1.0.0 How can I set this values properly? Thanks, Grzegorz
Re: Gradient Boosted Machines
Hi Daniel, Thanks a lot for your interest. Gradient boosting and AdaBoost algorithms are under active development and should be a part of release 1.2. -Manish On Mon, Jul 14, 2014 at 11:24 AM, Daniel Bendavid daniel.benda...@creditkarma.com wrote: Hi, My company is strongly considering implementing a recommendation engine that is built off of statistical models using Spark. We attended the Spark Summit and were incredibly impressed with the technology and the entire community. Since then, we have been exploring the technology and determining how we could use it for our specific needs. One algorithm that we ideally want to use as part of our project is Gradient Boosted Machines. We are aware that they have not yet been implemented in MLib and would like to submit our request that they be considered for future implementation. Additionally, we would love to see the AdaBoost algorithm implemented in Mlib and Feature Preprocessing implemented in Python (as it already exists for Scala). Otherwise, thank you for taking our feedback and for providing us with this incredible technology. Daniel
Re: Writing to RabbitMQ
You are correct in that I am trying to publish inside of a foreachRDD loop. I am currently refactoring and will try publishing inside the foreachPartition loop. Below is the code showing the way it is currently written, thanks! object myData { def main(args: Array[String]) { val ssc = new StreamingContext(local[8], Data, Seconds(10)) ssc.checkpoint(checkpoint) val topicMap = Map(pagehit.data - 1) val factory = new ConnectionFactory() factory.setUsername(officialUsername) factory.setPassword(crypticPassword) factory.setVirtualHost(/) factory.setHost(rabbit-env) factory.setPort() val connection = factory.newConnection() val SQLChannel = connection.createChannel() SQLChannel.queueDeclare(SQLQueue, true, false, false, null) val Pipe = KafkaUtils.createStream(ssc, Zookeeper_1,Zookeeper_1,Zookeeper_3, Cons1, topicMap).map(_._2) //PARSE SOME JSON ETC windowStream.foreachRDD(pagehit = { val mongoClient = MongoClient(my-mongodb) val db = mongoClient(myClient) val SQLCollection = db(SQLCalls) val callArray = pagehit.map(_._1).collect val avg = (callArray.reduceLeft[Long](_+_))/callArray.length val URL = pagehit.take(1).map(_._2) SQLCollection += MongoDBObject(URL - URL(0).substring(7, URL(0).length - 1), Avg Page Load Time - avg) val toBuildJSON = Seq(baseMsg, avg.toString, closingBrace) val byteArray = toBuildJSON.mkString.getBytes() SQLChannel.basicPublish(, SQLQueue, null, byteArray) }) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p11445.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.lang.IllegalArgumentException: Unable to create serializer com.esotericsoftware.kryo.serializers.FieldSerializer
Hi All, I am trying to move away from spark-shell to spark-submit and have been making some code changes. However, I am now having problem with serialization. It used to work fine before the code update. Not sure what I did wrong. However, here is the code JaccardScore.scala package approxstrmatch class JaccardScore { val mjc = new Jaccard() with Serializable def main(args: Array[String]) { val conf = new SparkConf().setAppName(ApproxStrMatch).set(spark.storage.memoryFraction, 0.0) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator) val sc = new SparkContext(conf) // More code here… score.calculateSortedJaccardScore(srcFile, distFile) sc.stop() } def calculateSortedJaccardScore (sourcerdd: RDD[String], destrdd: RDD[String]) { // Code over here…} MyRegistrator.scala: This is the central place for registering all the classes. package approxstrmatch import com.wcohen.ss.BasicStringWrapper;import com.wcohen.ss.Jaccard;import com.wcohen.ss.Level2MongeElkan;import com.wcohen.ss.Levenstein;import com.wcohen.ss.ScaledLevenstein;import com.wcohen.ss.Jaro;import com.wcohen.ss.JensenShannonDistance; import com.esotericsoftware.kryo.Kryo// import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[approxstrmatch.JaccardScore]) kryo.register(classOf[com.wcohen.ss.BasicStringWrapper]) kryo.register(classOf[com.wcohen.ss.Jaccard]) // Bunch of other registrations here. }} I run it as: spark-submit --class approxstrmatch.JaccardDriver --master local --executor-memory 8G /apps/sameert/software/approxstrmatch/target/scala-2.10/classes/approxstrmatch_2.10-1.0.jar I get the following error message: java.lang.IllegalArgumentException: Unable to create serializer com.esotericsoftware.kryo.serializers.FieldSerializer for class: approxstrmatch.JaccardScoreat com.esotericsoftware.kryo.Kryo.newSerializer(Kryo.java:335)at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:314)at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:49)at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:307)at com.esotericsoftware.kryo.Kryo.register(Kryo.java:351)at approxstrmatch.MyRegistrator.registerClasses(MyRegistrator.scala:18)at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:77)at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:73)at scala.Option.foreach(Option.scala:236)at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:73)at org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:130)at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:92)at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:995)at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:80)at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:66)at org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:847)at org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:205)at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:76)at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92)at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:661)at org.apache.spark.storage.BlockManager.put(BlockManager.scala:546)at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:812)at org.apache.spark.broadcast.HttpBroadcast.init(HttpBroadcast.scala:52)at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35)
graph reduceByKey
Hey all! I'm a total beginner with spark / hadoop / graph computation so please excuse my beginner question. I've created a graph, using graphx. Now, for every vertex, I want to get all its second degree neighbors. so if my graph is: v1 -- v2 v1 -- v4 v1 -- v6 I want to get something like: v2 -- v4 v2 -- v6 v4 -- v2 v4 -- v6 v6 -- v2 v6 -- v4 Does anyone have advice on what will be the best way to do that over a graph instance? I attempted to do it using mapReduceTriplets but I need the reduce function to work like reduceByKey, which I wasn't able to do. Thank you. -- Omer
Problem running Spark shell (1.0.0) on EMR
I'm having similar problem to: http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/browser I'm trying to follow the tutorial at: When I run: val file = sc.textFile(s3://bigdatademo/sample/wiki/) I get: WARN storage.BlockManager: Putting block broadcast_1 failed java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; I found a few other people raising this issue, but wasn't able to find a solution or an explanation. Have anyone encountered this? Any help or advice will be highly appreciated! thank you, -- Omer
pyspark inferSchema
Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]}, {'foo':'boom', 'baz':[1,2,3]}])) Works fine: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]}, {'foo':'boom', 'baz':[]}])) To be fair inferSchema says it peeks at the first row, so a possible work-around would be to make sure the type of any collection can be determined using the first instance. However, I don't believe that items in an RDD are guaranteed to remain in an ordered, so this approach seems somewhat brittle. Does anybody know a robust solution to this problem in PySpark? I'm am running the 1.0.1 release. -Brad
Re: pyspark inferSchema
I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) a.printSchema() root |-- baz: array (nullable = true) ||-- element: integer (containsNull = false) |-- foo: string (nullable = true) It works really well! It handles fields with inconsistent value types by inferring a value type that covers all the possible values. But say you’ve already deserialized the JSON to do some pre-processing or filtering. You’d commonly want to do this, say, to remove bad data. So now you have an RDD of Python dictionaries, as opposed to an RDD of JSON strings. It would be perfect if you could get the completeness of the json...() methods, but against dictionaries. Unfortunately, as you noted, inferSchema() only looks at the first element in the set. Furthermore, inferring schemata from RDDs of dictionaries is being deprecated https://issues.apache.org/jira/browse/SPARK-2010 in favor of doing so from RDDs of Rows. I’m not sure what the intention behind this move is, but as a user I’d like to be able to convert RDDs of dictionaries directly to SchemaRDDs with the completeness of the jsonRDD()/jsonFile() methods. Right now if I really want that, I have to serialize the dictionaries to JSON text and then call jsonRDD(), which is expensive. Nick On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]}, {'foo':'boom', 'baz':[1,2,3]}])) Works fine: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]}, {'foo':'boom', 'baz':[]}])) To be fair inferSchema says it peeks at the first row, so a possible work-around would be to make sure the type of any collection can be determined using the first instance. However, I don't believe that items in an RDD are guaranteed to remain in an ordered, so this approach seems somewhat brittle. Does anybody know a robust solution to this problem in PySpark? I'm am running the 1.0.1 release. -Brad
Spark Memory Issues
Hi, I'm trying to run a spark application with the executor-memory 3G. but I'm running into the following error: 14/08/05 18:02:58 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[5] at map at KMeans.scala:123), which has no missing parents 14/08/05 18:02:58 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[5] at map at KMeans.scala:123) 14/08/05 18:02:58 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks 14/08/05 18:02:59 INFO CoarseGrainedSchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@test-hadoop2.vpc.natero.com:54358/user/Executor#1670455157] with ID 2 14/08/05 18:02:59 INFO BlockManagerInfo: Registering block manager test-hadoop2.vpc.natero.com:39156 with 1766.4 MB RAM 14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:28 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:43 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:58 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Tried tweaking executor-memory as well, but same result. It always gets stuck registering the block manager. Are there any other settings that needs to be adjusted. Thanks Sunny
Re: Spark SQL Thrift Server
Thanks Michael. Is there a way to specify off_heap? I.e. Tachyon via the thrift server? Thanks! On Tue, Aug 5, 2014 at 11:06 AM, Michael Armbrust mich...@databricks.com wrote: We are working on an overhaul of the docs before the 1.1 release. In the mean time try: CACHE TABLE tableName. On Tue, Aug 5, 2014 at 9:02 AM, John Omernik j...@omernik.com wrote: I gave things working on my cluster with the sparksql thrift server. (Thank you Yin Huai at Databricks!) That said, I was curious how I can cache a table via my instance here? I tried the shark like create table table_cached as select * from table and that did not create a cached table. cacheTable(table) didn't parse in beeline. Any thoughts? Any pointers to documentation (*crosses fingers)?
Re: How to read from OpenTSDB using PySpark (or Scala Spark)?
Thank you!! Could you give me any sample code for the receiver? I'm still new to Spark and not quite sure how I would do that. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211p11454.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Memory Issues
Are you able to see the job on the WebUI (8080)? If yes, how much memory are you seeing there specifically for this job? [image: Inline image 1] Here you can see i have 11.8Gb RAM on both workers and my app is using 11GB. 1. What are all the memory that you are seeing in your case? 2. Make sure your application is using the same spark URI (as seen in the top left of the webUI) while creating the SparkContext. Thanks Best Regards On Tue, Aug 5, 2014 at 11:38 PM, Sunny Khatri sunny.k...@gmail.com wrote: Hi, I'm trying to run a spark application with the executor-memory 3G. but I'm running into the following error: 14/08/05 18:02:58 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[5] at map at KMeans.scala:123), which has no missing parents 14/08/05 18:02:58 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[5] at map at KMeans.scala:123) 14/08/05 18:02:58 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks 14/08/05 18:02:59 INFO CoarseGrainedSchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@test-hadoop2.vpc.natero.com:54358/user/Executor#1670455157] with ID 2 14/08/05 18:02:59 INFO BlockManagerInfo: Registering block manager test-hadoop2.vpc.natero.com:39156 with 1766.4 MB RAM 14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:28 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:43 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:58 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Tried tweaking executor-memory as well, but same result. It always gets stuck registering the block manager. Are there any other settings that needs to be adjusted. Thanks Sunny
Re: Spark shell creating a local SparkContext instead of connecting to connecting to Spark Master
You can always start your spark-shell by specifying the master as MASTER=spark://*whatever*:7077 $SPARK_HOME/bin/spark-shell Then it will connect to that *whatever* master. Thanks Best Regards On Tue, Aug 5, 2014 at 8:51 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Hi Apologies if this is a noob question. I have setup Spark 1.0.1 on EMR using a slightly modified version of script @ s3://elasticmapreduce/samples/spark/1.0.0/install-spark-shark-yarn.rb. It seems to be running fine with master logs stating: 14/08/05 14:36:56 INFO Master: I have been elected leader! New state: ALIVE 14/08/05 14:37:21 INFO Master: Registering worker ip-10-0-2-80.ec2.internal:52029 with 2 cores, 6.3 GB RAM The script has also created spark-env.sh under conf which has the following content: export SPARK_MASTER_IP=x.x.x.x export SCALA_HOME=/home/hadoop/.versions/scala-2.10.3 export SPARK_LOCAL_DIRS=/mnt/spark/ export SPARK_CLASSPATH=/usr/share/aws/emr/emr-fs/lib/*:/usr/share/aws/emr/lib/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar export SPARK_DAEMON_JAVA_OPTS=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps export SPARK_ASSEMBLY_JAR=/home/hadoop/spark/lib/spark-assembly-1.0.1-hadoop2.4.0.jar However, when I run the spark-shell, sc.isLocal returns true. Also, no matter how many RDDs I cache, the used memory in the master UI (x.x.x.x:7077) shows 0B used. This leads me to believe that the spark-shell isn't connecting to Spark master and has started a local instance of spark. Is there something I am missing in my setup that allows for spark-shell to connect to master? Thanks, Aniket
Re: Spark Memory Issues
The only UI I have currently is the Application Master (Cluster mode), with the following executor nodes status: Executors (3) - *Memory:* 0.0 B Used (3.7 GB Total) - *Disk:* 0.0 B Used Executor IDAddressRDD BlocksMemory UsedDisk UsedActive TasksFailed TasksComplete TasksTotal TasksTask TimeShuffle ReadShuffle Write1add100.0 B / 1766.4 MB0.0 B0 ms0.0 B0.0 B2add200.0 B / 1766.4 MB0.0 B0 ms0.0 B0.0 Bdriver add300.0 B / 294.6 MB0.0 B0 ms0.0 B0.0 B On Tue, Aug 5, 2014 at 11:32 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you able to see the job on the WebUI (8080)? If yes, how much memory are you seeing there specifically for this job? [image: Inline image 1] Here you can see i have 11.8Gb RAM on both workers and my app is using 11GB. 1. What are all the memory that you are seeing in your case? 2. Make sure your application is using the same spark URI (as seen in the top left of the webUI) while creating the SparkContext. Thanks Best Regards On Tue, Aug 5, 2014 at 11:38 PM, Sunny Khatri sunny.k...@gmail.com wrote: Hi, I'm trying to run a spark application with the executor-memory 3G. but I'm running into the following error: 14/08/05 18:02:58 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[5] at map at KMeans.scala:123), which has no missing parents 14/08/05 18:02:58 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[5] at map at KMeans.scala:123) 14/08/05 18:02:58 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks 14/08/05 18:02:59 INFO CoarseGrainedSchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@test-hadoop2.vpc.natero.com:54358/user/Executor#1670455157] with ID 2 14/08/05 18:02:59 INFO BlockManagerInfo: Registering block manager test-hadoop2.vpc.natero.com:39156 with 1766.4 MB RAM 14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:28 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:43 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:58 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Tried tweaking executor-memory as well, but same result. It always gets stuck registering the block manager. Are there any other settings that needs to be adjusted. Thanks Sunny
Re: pyspark inferSchema
Hi Nick, Thanks for the great response. I actually already investigated jsonRDD and jsonFile, although I did not realize they provide more complete schema inference. I did however have other problems with jsonRDD and jsonFile, but I will now describe in a separate thread with an appropriate subject. I did notice that when I run your example code, I do not receive the exact same output. For example, I see: srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) srdd.printSchema() root |-- baz: ArrayType[IntegerType] |-- foo: StringType Notice the difference in the schema. Are you running the 1.0.1 release, or a more bleeding-edge version from the repository? best, -Brad On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) a.printSchema() root |-- baz: array (nullable = true) ||-- element: integer (containsNull = false) |-- foo: string (nullable = true) It works really well! It handles fields with inconsistent value types by inferring a value type that covers all the possible values. But say you’ve already deserialized the JSON to do some pre-processing or filtering. You’d commonly want to do this, say, to remove bad data. So now you have an RDD of Python dictionaries, as opposed to an RDD of JSON strings. It would be perfect if you could get the completeness of the json...() methods, but against dictionaries. Unfortunately, as you noted, inferSchema() only looks at the first element in the set. Furthermore, inferring schemata from RDDs of dictionaries is being deprecated https://issues.apache.org/jira/browse/SPARK-2010 in favor of doing so from RDDs of Rows. I’m not sure what the intention behind this move is, but as a user I’d like to be able to convert RDDs of dictionaries directly to SchemaRDDs with the completeness of the jsonRDD()/jsonFile() methods. Right now if I really want that, I have to serialize the dictionaries to JSON text and then call jsonRDD(), which is expensive. Nick On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]}, {'foo':'boom', 'baz':[1,2,3]}])) Works fine: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]}, {'foo':'boom', 'baz':[]}])) To be fair inferSchema says it peeks at the first row, so a possible work-around would be to make sure the type of any collection can be determined using the first instance. However, I don't believe that items in an RDD are guaranteed to remain in an ordered, so this approach seems somewhat brittle. Does anybody know a robust solution to this problem in PySpark? I'm am running the 1.0.1 release. -Brad
Re: Spark Memory Issues
For that UI to have some values, your process should do some operation. Which is not happening here ( 14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory ) Can you open up a spark-shell and try some simple code? ( *val x = sc.parallelize(1 to 100).filter(_100).collect()* ) Just to make sure your cluster setup is proper and is working. Thanks Best Regards On Wed, Aug 6, 2014 at 12:17 AM, Sunny Khatri sunny.k...@gmail.com wrote: The only UI I have currently is the Application Master (Cluster mode), with the following executor nodes status: Executors (3) - *Memory:* 0.0 B Used (3.7 GB Total) - *Disk:* 0.0 B Used Executor IDAddress RDD BlocksMemory Used Disk UsedActive Tasks Failed TasksComplete Tasks Total TasksTask Time Shuffle ReadShuffle Write 1 add1 0 0.0 B / 1766.4 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 2add2 0 0.0 B / 1766.4 MB 0.0 B0 0 00 0 ms0.0 B 0.0 B driver add3 0 0.0 B / 294.6 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B On Tue, Aug 5, 2014 at 11:32 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you able to see the job on the WebUI (8080)? If yes, how much memory are you seeing there specifically for this job? [image: Inline image 1] Here you can see i have 11.8Gb RAM on both workers and my app is using 11GB. 1. What are all the memory that you are seeing in your case? 2. Make sure your application is using the same spark URI (as seen in the top left of the webUI) while creating the SparkContext. Thanks Best Regards On Tue, Aug 5, 2014 at 11:38 PM, Sunny Khatri sunny.k...@gmail.com wrote: Hi, I'm trying to run a spark application with the executor-memory 3G. but I'm running into the following error: 14/08/05 18:02:58 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[5] at map at KMeans.scala:123), which has no missing parents 14/08/05 18:02:58 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[5] at map at KMeans.scala:123) 14/08/05 18:02:58 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks 14/08/05 18:02:59 INFO CoarseGrainedSchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@test-hadoop2.vpc.natero.com:54358/user/Executor#1670455157] with ID 2 14/08/05 18:02:59 INFO BlockManagerInfo: Registering block manager test-hadoop2.vpc.natero.com:39156 with 1766.4 MB RAM 14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:28 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:43 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:58 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Tried tweaking executor-memory as well, but same result. It always gets stuck registering the block manager. Are there any other settings that needs to be adjusted. Thanks Sunny
Re: pyspark inferSchema
Notice the difference in the schema. Are you running the 1.0.1 release, or a more bleeding-edge version from the repository? Yep, my bad. I’m running off master at commit 184048f80b6fa160c89d5bb47b937a0a89534a95. Nick
trouble with jsonRDD and jsonFile in pyspark
Hi All, I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of some JSON data I have, but I've run into some instability involving the following java exception: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments I've pasted code which produces the error as well as the full traceback below. Note that I don't have any problem when I parse the JSON myself and use inferSchema. Is anybody able to reproduce this bug? -Brad srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}', '{foo:boom, baz:[1,2,3]}'])) srdd.printSchema() root |-- baz: ArrayType[IntegerType] |-- foo: StringType srdd.collect() --- Py4JJavaError Traceback (most recent call last) ipython-input-89-ec7e8e8c68c4 in module() 1 srdd.collect() /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self) 581 582 with _JavaStackTrace(self.context) as st: -- 583 bytesInJava = self._jrdd.collect().iterator() 584 return list(self._collect_iterator_through_file(bytesInJava)) 585 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 535 answer = self.gateway_client.send_command(command) 536 return_value = get_return_value(answer, self.gateway_client, -- 537 self.target_id, self.name) 538 539 for temp_arg in temp_args: /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603) net.razorvine.pickle.Pickler.dispatch(Pickler.java:299) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.put_map(Pickler.java:322) net.razorvine.pickle.Pickler.dispatch(Pickler.java:286) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392) net.razorvine.pickle.Pickler.dispatch(Pickler.java:195) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.dump(Pickler.java:95) net.razorvine.pickle.Pickler.dumps(Pickler.java:80) org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) scala.collection.Iterator$anon$11.next(Iterator.scala:328) org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at
Re: pyspark inferSchema
On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) a.printSchema() root |-- baz: array (nullable = true) ||-- element: integer (containsNull = false) |-- foo: string (nullable = true) It works really well! It handles fields with inconsistent value types by inferring a value type that covers all the possible values. But say you’ve already deserialized the JSON to do some pre-processing or filtering. You’d commonly want to do this, say, to remove bad data. So now you have an RDD of Python dictionaries, as opposed to an RDD of JSON strings. It would be perfect if you could get the completeness of the json...() methods, but against dictionaries. Unfortunately, as you noted, inferSchema() only looks at the first element in the set. Furthermore, inferring schemata from RDDs of dictionaries is being deprecated in favor of doing so from RDDs of Rows. I’m not sure what the intention behind this move is, but as a user I’d like to be able to convert RDDs of dictionaries directly to SchemaRDDs with the completeness of the jsonRDD()/jsonFile() methods. Right now if I really want that, I have to serialize the dictionaries to JSON text and then call jsonRDD(), which is expensive. Before upcoming 1.1 release, we did not support nested structures via inferSchema, the nested dictionary will be MapType. This introduces inconsistance for dictionary that the top level will be structure type (can be accessed by name of field) but others will be MapType (can be accesses as map). So deprecated top level dictionary is try to solve this kind of inconsistance. The Row class in pyspark.sql has a similar interface to dict, so you can easily convert you dic into a Row: ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d))) In order to get the correct schema, so we need another argument to specify the number of rows to be infered? Such as: inferSchema(rdd, sample=None) with sample=None, it will take the first row, or it will do the sampling to figure out the complete schema. Does this work for you? Nick On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]}, {'foo':'boom', 'baz':[1,2,3]}])) Works fine: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]}, {'foo':'boom', 'baz':[]}])) To be fair inferSchema says it peeks at the first row, so a possible work-around would be to make sure the type of any collection can be determined using the first instance. However, I don't believe that items in an RDD are guaranteed to remain in an ordered, so this approach seems somewhat brittle. Does anybody know a robust solution to this problem in PySpark? I'm am running the 1.0.1 release. -Brad - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: trouble with jsonRDD and jsonFile in pyspark
I believe this is a known issue in 1.0.1 that's fixed in 1.0.2. See: SPARK-2376: Selecting list values inside nested JSON objects raises java.lang.IllegalArgumentException https://issues.apache.org/jira/browse/SPARK-2376 On Tue, Aug 5, 2014 at 2:55 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of some JSON data I have, but I've run into some instability involving the following java exception: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments I've pasted code which produces the error as well as the full traceback below. Note that I don't have any problem when I parse the JSON myself and use inferSchema. Is anybody able to reproduce this bug? -Brad srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}', '{foo:boom, baz:[1,2,3]}'])) srdd.printSchema() root |-- baz: ArrayType[IntegerType] |-- foo: StringType srdd.collect() --- Py4JJavaError Traceback (most recent call last) ipython-input-89-ec7e8e8c68c4 in module() 1 srdd.collect() /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self) 581 582 with _JavaStackTrace(self.context) as st: -- 583 bytesInJava = self._jrdd.collect().iterator() 584 return list(self._collect_iterator_through_file(bytesInJava)) 585 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 535 answer = self.gateway_client.send_command(command) 536 return_value = get_return_value(answer, self.gateway_client, -- 537 self.target_id, self.name) 538 539 for temp_arg in temp_args: /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603) net.razorvine.pickle.Pickler.dispatch(Pickler.java:299) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.put_map(Pickler.java:322) net.razorvine.pickle.Pickler.dispatch(Pickler.java:286) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392) net.razorvine.pickle.Pickler.dispatch(Pickler.java:195) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.dump(Pickler.java:95) net.razorvine.pickle.Pickler.dumps(Pickler.java:80) org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) scala.collection.Iterator$anon$11.next(Iterator.scala:328) org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at
Re: pyspark inferSchema
Got it. Thanks! On Tue, Aug 5, 2014 at 11:53 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Notice the difference in the schema. Are you running the 1.0.1 release, or a more bleeding-edge version from the repository? Yep, my bad. I’m running off master at commit 184048f80b6fa160c89d5bb47b937a0a89534a95. Nick
Re: trouble with jsonRDD and jsonFile in pyspark
Is this on 1.0.1? I'd suggest running this on master or the 1.1-RC which should be coming out this week. Pyspark did not have good support for nested data previously. If you still encounter issues using a more recent version, please file a JIRA. Thanks! On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of some JSON data I have, but I've run into some instability involving the following java exception: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments I've pasted code which produces the error as well as the full traceback below. Note that I don't have any problem when I parse the JSON myself and use inferSchema. Is anybody able to reproduce this bug? -Brad srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}', '{foo:boom, baz:[1,2,3]}'])) srdd.printSchema() root |-- baz: ArrayType[IntegerType] |-- foo: StringType srdd.collect() --- Py4JJavaError Traceback (most recent call last) ipython-input-89-ec7e8e8c68c4 in module() 1 srdd.collect() /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self) 581 582 with _JavaStackTrace(self.context) as st: -- 583 bytesInJava = self._jrdd.collect().iterator() 584 return list(self._collect_iterator_through_file(bytesInJava)) 585 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 535 answer = self.gateway_client.send_command(command) 536 return_value = get_return_value(answer, self.gateway_client, -- 537 self.target_id, self.name) 538 539 for temp_arg in temp_args: /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603) net.razorvine.pickle.Pickler.dispatch(Pickler.java:299) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.put_map(Pickler.java:322) net.razorvine.pickle.Pickler.dispatch(Pickler.java:286) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392) net.razorvine.pickle.Pickler.dispatch(Pickler.java:195) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.dump(Pickler.java:95) net.razorvine.pickle.Pickler.dumps(Pickler.java:80) org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) scala.collection.Iterator$anon$11.next(Iterator.scala:328) org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at
Re: java.lang.StackOverflowError
Bump On Tuesday, August 5, 2014, Chengi Liu chengi.liu...@gmail.com wrote: Hi, I am doing some basic preprocessing in pyspark (local mode as follows): files = [ input files] def read(filename,sc): #process file return rdd if __name__ ==__main__: conf = SparkConf() conf.setMaster('local') sc = SparkContext(conf =conf) sc.setCheckpointDir(root+temp/) data = sc.parallelize([]) for i,f in enumerate(files): data = data.union(read(f,sc)) if i ==20: data.checkpoint() data.count() if i == 500:break #print data.count() #rdd_1 = read(files[0],sc) data.saveAsTextFile(root+output/) But I see this error: keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o9564.saveAsTextFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError java.io.Bits.putInt(Bits.java:93) java.io.ObjectOutputStream$BlockDataOutputStream.writeInt(ObjectOutputStream.java:1927)
Re: trouble with jsonRDD and jsonFile in pyspark
Nick: Thanks for both the original JIRA bug report and the link. Michael: This is on the 1.0.1 release. I'll update to master and follow-up if I have any problems. best, -Brad On Tue, Aug 5, 2014 at 12:04 PM, Michael Armbrust mich...@databricks.com wrote: Is this on 1.0.1? I'd suggest running this on master or the 1.1-RC which should be coming out this week. Pyspark did not have good support for nested data previously. If you still encounter issues using a more recent version, please file a JIRA. Thanks! On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of some JSON data I have, but I've run into some instability involving the following java exception: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments I've pasted code which produces the error as well as the full traceback below. Note that I don't have any problem when I parse the JSON myself and use inferSchema. Is anybody able to reproduce this bug? -Brad srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}', '{foo:boom, baz:[1,2,3]}'])) srdd.printSchema() root |-- baz: ArrayType[IntegerType] |-- foo: StringType srdd.collect() --- Py4JJavaError Traceback (most recent call last) ipython-input-89-ec7e8e8c68c4 in module() 1 srdd.collect() /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self) 581 582 with _JavaStackTrace(self.context) as st: -- 583 bytesInJava = self._jrdd.collect().iterator() 584 return list(self._collect_iterator_through_file(bytesInJava)) 585 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 535 answer = self.gateway_client.send_command(command) 536 return_value = get_return_value(answer, self.gateway_client, -- 537 self.target_id, self.name) 538 539 for temp_arg in temp_args: /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603) net.razorvine.pickle.Pickler.dispatch(Pickler.java:299) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.put_map(Pickler.java:322) net.razorvine.pickle.Pickler.dispatch(Pickler.java:286) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392) net.razorvine.pickle.Pickler.dispatch(Pickler.java:195) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.dump(Pickler.java:95) net.razorvine.pickle.Pickler.dumps(Pickler.java:80) org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) scala.collection.Iterator$anon$11.next(Iterator.scala:328) org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at
Re: pyspark inferSchema
Hi Davies, Thanks for the response and tips. Is the sample argument to inferSchema available in the 1.0.1 release of pyspark? I'm not sure (based on the documentation linked below) that it is. http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema It sounds like updating to master may help address my issue (and may also make the sample argument available), so I'm going to go ahead and do that. best, -Brad On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com wrote: On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) a.printSchema() root |-- baz: array (nullable = true) ||-- element: integer (containsNull = false) |-- foo: string (nullable = true) It works really well! It handles fields with inconsistent value types by inferring a value type that covers all the possible values. But say you’ve already deserialized the JSON to do some pre-processing or filtering. You’d commonly want to do this, say, to remove bad data. So now you have an RDD of Python dictionaries, as opposed to an RDD of JSON strings. It would be perfect if you could get the completeness of the json...() methods, but against dictionaries. Unfortunately, as you noted, inferSchema() only looks at the first element in the set. Furthermore, inferring schemata from RDDs of dictionaries is being deprecated in favor of doing so from RDDs of Rows. I’m not sure what the intention behind this move is, but as a user I’d like to be able to convert RDDs of dictionaries directly to SchemaRDDs with the completeness of the jsonRDD()/jsonFile() methods. Right now if I really want that, I have to serialize the dictionaries to JSON text and then call jsonRDD(), which is expensive. Before upcoming 1.1 release, we did not support nested structures via inferSchema, the nested dictionary will be MapType. This introduces inconsistance for dictionary that the top level will be structure type (can be accessed by name of field) but others will be MapType (can be accesses as map). So deprecated top level dictionary is try to solve this kind of inconsistance. The Row class in pyspark.sql has a similar interface to dict, so you can easily convert you dic into a Row: ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d))) In order to get the correct schema, so we need another argument to specify the number of rows to be infered? Such as: inferSchema(rdd, sample=None) with sample=None, it will take the first row, or it will do the sampling to figure out the complete schema. Does this work for you? Nick On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]}, {'foo':'boom', 'baz':[1,2,3]}])) Works fine: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]}, {'foo':'boom', 'baz':[]}])) To be fair inferSchema says it peeks at the first row, so a possible work-around would be to make sure the type of any collection can be determined using the first instance. However, I don't believe that items in an RDD are guaranteed to remain in an ordered, so this approach seems somewhat brittle. Does anybody know a robust solution to this problem in PySpark? I'm am running the 1.0.1 release. -Brad
Re: java.lang.StackOverflowError
Could you create an re-producable script (and data) to allow us to investigate this? Davies On Tue, Aug 5, 2014 at 1:10 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, I am doing some basic preprocessing in pyspark (local mode as follows): files = [ input files] def read(filename,sc): #process file return rdd if __name__ ==__main__: conf = SparkConf() conf.setMaster('local') sc = SparkContext(conf =conf) sc.setCheckpointDir(root+temp/) data = sc.parallelize([]) for i,f in enumerate(files): data = data.union(read(f,sc)) union is an lazy transformation, you could union them at once, rdds = [read(f,sc) for f in files] rdd = sc.union(rdds) if i ==20: data.checkpoint() data.count() if i == 500:break #print data.count() #rdd_1 = read(files[0],sc) data.saveAsTextFile(root+output/) But I see this error: keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o9564.saveAsTextFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError java.io.Bits.putInt(Bits.java:93) java.io.ObjectOutputStream$BlockDataOutputStream.writeInt(ObjectOutputStream.java:1927) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Memory Issues
Are you sure that you were not running SparkPi in local mode? Thanks Best Regards On Wed, Aug 6, 2014 at 12:43 AM, Sunny Khatri sunny.k...@gmail.com wrote: Well I was able to run the SparkPi, that also does the similar stuff, successfully. On Tue, Aug 5, 2014 at 11:52 AM, Akhil Das ak...@sigmoidanalytics.com wrote: For that UI to have some values, your process should do some operation. Which is not happening here ( 14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory ) Can you open up a spark-shell and try some simple code? ( *val x = sc.parallelize(1 to 100).filter(_100).collect()* ) Just to make sure your cluster setup is proper and is working. Thanks Best Regards On Wed, Aug 6, 2014 at 12:17 AM, Sunny Khatri sunny.k...@gmail.com wrote: The only UI I have currently is the Application Master (Cluster mode), with the following executor nodes status: Executors (3) - *Memory:* 0.0 B Used (3.7 GB Total) - *Disk:* 0.0 B Used Executor IDAddress RDD BlocksMemory Used Disk UsedActive Tasks Failed TasksComplete Tasks Total TasksTask Time Shuffle ReadShuffle Write 1 add1 0 0.0 B / 1766.4 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 2add2 0 0.0 B / 1766.4 MB 0.0 B0 0 00 0 ms0.0 B 0.0 B driver add3 0 0.0 B / 294.6 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B On Tue, Aug 5, 2014 at 11:32 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you able to see the job on the WebUI (8080)? If yes, how much memory are you seeing there specifically for this job? [image: Inline image 1] Here you can see i have 11.8Gb RAM on both workers and my app is using 11GB. 1. What are all the memory that you are seeing in your case? 2. Make sure your application is using the same spark URI (as seen in the top left of the webUI) while creating the SparkContext. Thanks Best Regards On Tue, Aug 5, 2014 at 11:38 PM, Sunny Khatri sunny.k...@gmail.com wrote: Hi, I'm trying to run a spark application with the executor-memory 3G. but I'm running into the following error: 14/08/05 18:02:58 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[5] at map at KMeans.scala:123), which has no missing parents 14/08/05 18:02:58 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[5] at map at KMeans.scala:123) 14/08/05 18:02:58 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks 14/08/05 18:02:59 INFO CoarseGrainedSchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@test-hadoop2.vpc.natero.com:54358/user/Executor#1670455157] with ID 2 14/08/05 18:02:59 INFO BlockManagerInfo: Registering block manager test-hadoop2.vpc.natero.com:39156 with 1766.4 MB RAM 14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:28 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:43 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:58 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Tried tweaking executor-memory as well, but same result. It always gets stuck registering the block manager. Are there any other settings that needs to be adjusted. Thanks Sunny
Re: Spark Memory Issues
Yeah, ran it on yarn-cluster mode. On Tue, Aug 5, 2014 at 12:17 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you sure that you were not running SparkPi in local mode? Thanks Best Regards On Wed, Aug 6, 2014 at 12:43 AM, Sunny Khatri sunny.k...@gmail.com wrote: Well I was able to run the SparkPi, that also does the similar stuff, successfully. On Tue, Aug 5, 2014 at 11:52 AM, Akhil Das ak...@sigmoidanalytics.com wrote: For that UI to have some values, your process should do some operation. Which is not happening here ( 14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory ) Can you open up a spark-shell and try some simple code? ( *val x = sc.parallelize(1 to 100).filter(_100).collect()* ) Just to make sure your cluster setup is proper and is working. Thanks Best Regards On Wed, Aug 6, 2014 at 12:17 AM, Sunny Khatri sunny.k...@gmail.com wrote: The only UI I have currently is the Application Master (Cluster mode), with the following executor nodes status: Executors (3) - *Memory:* 0.0 B Used (3.7 GB Total) - *Disk:* 0.0 B Used Executor IDAddress RDD BlocksMemory Used Disk UsedActive Tasks Failed TasksComplete Tasks Total TasksTask Time Shuffle ReadShuffle Write 1 add1 0 0.0 B / 1766.4 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 2add2 0 0.0 B / 1766.4 MB 0.0 B0 0 00 0 ms0.0 B 0.0 B driver add3 0 0.0 B / 294.6 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B On Tue, Aug 5, 2014 at 11:32 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you able to see the job on the WebUI (8080)? If yes, how much memory are you seeing there specifically for this job? [image: Inline image 1] Here you can see i have 11.8Gb RAM on both workers and my app is using 11GB. 1. What are all the memory that you are seeing in your case? 2. Make sure your application is using the same spark URI (as seen in the top left of the webUI) while creating the SparkContext. Thanks Best Regards On Tue, Aug 5, 2014 at 11:38 PM, Sunny Khatri sunny.k...@gmail.com wrote: Hi, I'm trying to run a spark application with the executor-memory 3G. but I'm running into the following error: 14/08/05 18:02:58 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[5] at map at KMeans.scala:123), which has no missing parents 14/08/05 18:02:58 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[5] at map at KMeans.scala:123) 14/08/05 18:02:58 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks 14/08/05 18:02:59 INFO CoarseGrainedSchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@test-hadoop2.vpc.natero.com:54358/user/Executor#1670455157] with ID 2 14/08/05 18:02:59 INFO BlockManagerInfo: Registering block manager test-hadoop2.vpc.natero.com:39156 with 1766.4 MB RAM 14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:28 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:43 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/08/05 18:03:58 WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Tried tweaking executor-memory as well, but same result. It always gets stuck registering the block manager. Are there any other settings that needs to be adjusted. Thanks Sunny
Re: pyspark inferSchema
This sample argument of inferSchema is still no in master, if will try to add it if it make sense. On Tue, Aug 5, 2014 at 12:14 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, Thanks for the response and tips. Is the sample argument to inferSchema available in the 1.0.1 release of pyspark? I'm not sure (based on the documentation linked below) that it is. http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema It sounds like updating to master may help address my issue (and may also make the sample argument available), so I'm going to go ahead and do that. best, -Brad On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com wrote: On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) a.printSchema() root |-- baz: array (nullable = true) ||-- element: integer (containsNull = false) |-- foo: string (nullable = true) It works really well! It handles fields with inconsistent value types by inferring a value type that covers all the possible values. But say you’ve already deserialized the JSON to do some pre-processing or filtering. You’d commonly want to do this, say, to remove bad data. So now you have an RDD of Python dictionaries, as opposed to an RDD of JSON strings. It would be perfect if you could get the completeness of the json...() methods, but against dictionaries. Unfortunately, as you noted, inferSchema() only looks at the first element in the set. Furthermore, inferring schemata from RDDs of dictionaries is being deprecated in favor of doing so from RDDs of Rows. I’m not sure what the intention behind this move is, but as a user I’d like to be able to convert RDDs of dictionaries directly to SchemaRDDs with the completeness of the jsonRDD()/jsonFile() methods. Right now if I really want that, I have to serialize the dictionaries to JSON text and then call jsonRDD(), which is expensive. Before upcoming 1.1 release, we did not support nested structures via inferSchema, the nested dictionary will be MapType. This introduces inconsistance for dictionary that the top level will be structure type (can be accessed by name of field) but others will be MapType (can be accesses as map). So deprecated top level dictionary is try to solve this kind of inconsistance. The Row class in pyspark.sql has a similar interface to dict, so you can easily convert you dic into a Row: ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d))) In order to get the correct schema, so we need another argument to specify the number of rows to be infered? Such as: inferSchema(rdd, sample=None) with sample=None, it will take the first row, or it will do the sampling to figure out the complete schema. Does this work for you? Nick On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]}, {'foo':'boom', 'baz':[1,2,3]}])) Works fine: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]}, {'foo':'boom', 'baz':[]}])) To be fair inferSchema says it peeks at the first row, so a possible work-around would be to make sure the type of any collection can be determined using the first instance. However, I don't believe that items in an RDD are guaranteed to remain in an ordered, so this approach seems somewhat brittle. Does anybody know a robust solution to this problem in PySpark? I'm am running the 1.0.1 release. -Brad - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark inferSchema
Assuming updating to master fixes the bug I was experiencing with jsonRDD and jsonFile, then pushing sample to master will probably not be necessary. We believe that the link below was the bug I experienced, and I've been told it is fixed in master. https://issues.apache.org/jira/browse/SPARK-2376 best, -brad On Tue, Aug 5, 2014 at 12:18 PM, Davies Liu dav...@databricks.com wrote: This sample argument of inferSchema is still no in master, if will try to add it if it make sense. On Tue, Aug 5, 2014 at 12:14 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, Thanks for the response and tips. Is the sample argument to inferSchema available in the 1.0.1 release of pyspark? I'm not sure (based on the documentation linked below) that it is. http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema It sounds like updating to master may help address my issue (and may also make the sample argument available), so I'm going to go ahead and do that. best, -Brad On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com wrote: On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) a.printSchema() root |-- baz: array (nullable = true) ||-- element: integer (containsNull = false) |-- foo: string (nullable = true) It works really well! It handles fields with inconsistent value types by inferring a value type that covers all the possible values. But say you’ve already deserialized the JSON to do some pre-processing or filtering. You’d commonly want to do this, say, to remove bad data. So now you have an RDD of Python dictionaries, as opposed to an RDD of JSON strings. It would be perfect if you could get the completeness of the json...() methods, but against dictionaries. Unfortunately, as you noted, inferSchema() only looks at the first element in the set. Furthermore, inferring schemata from RDDs of dictionaries is being deprecated in favor of doing so from RDDs of Rows. I’m not sure what the intention behind this move is, but as a user I’d like to be able to convert RDDs of dictionaries directly to SchemaRDDs with the completeness of the jsonRDD()/jsonFile() methods. Right now if I really want that, I have to serialize the dictionaries to JSON text and then call jsonRDD(), which is expensive. Before upcoming 1.1 release, we did not support nested structures via inferSchema, the nested dictionary will be MapType. This introduces inconsistance for dictionary that the top level will be structure type (can be accessed by name of field) but others will be MapType (can be accesses as map). So deprecated top level dictionary is try to solve this kind of inconsistance. The Row class in pyspark.sql has a similar interface to dict, so you can easily convert you dic into a Row: ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d))) In order to get the correct schema, so we need another argument to specify the number of rows to be infered? Such as: inferSchema(rdd, sample=None) with sample=None, it will take the first row, or it will do the sampling to figure out the complete schema. Does this work for you? Nick On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]}, {'foo':'boom', 'baz':[1,2,3]}])) Works fine: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]}, {'foo':'boom', 'baz':[]}])) To be fair inferSchema says it peeks at the first row, so a possible work-around would be to make sure the type of any collection can be determined using the first instance. However, I don't believe that items in an RDD are guaranteed to remain in an ordered, so this approach seems somewhat brittle. Does anybody know a robust solution to this problem in PySpark? I'm am running the 1.0.1 release. -Brad
Re: Configuration setup and Connection refused
Hi, Anyone? Any input would be much appreciated Thanks, Amin On 5 Aug 2014 00:31, Al Amin alamin.is...@gmail.com wrote: Hi all, Any help would be much appreciated. Thanks, Al On Mon, Aug 4, 2014 at 7:09 PM, Al Amin alamin.is...@gmail.com wrote: Hi all, I have setup 2 nodes (master and slave1) on stand alone mode. Tried running SparkPi example and its working fine. However when I move on to wordcount its giving me below error: 14/08/04 21:40:33 INFO storage.MemoryStore: ensureFreeSpace(32856) called with curMem=0, maxMem=311387750 14/08/04 21:40:33 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.1 KB, free 296.9 MB) 14/08/04 21:40:34 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 0 time(s). 14/08/04 21:40:35 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 1 time(s). 14/08/04 21:40:36 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 2 time(s). 14/08/04 21:40:37 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 3 time(s). 14/08/04 21:40:38 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 4 time(s). 14/08/04 21:40:39 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 5 time(s). 14/08/04 21:40:40 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 6 time(s). 14/08/04 21:40:41 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 7 time(s). 14/08/04 21:40:42 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 8 time(s). 14/08/04 21:40:43 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 9 time(s). Exception in thread main java.lang.RuntimeException: java.net.ConnectException: Call to master/10.0.1.27:9000 failed on connection exception: java.net.ConnectException: Connection refused 1) how to fix this issue? I have configure hostname --fqdn accordingly. 2) I could see that in my logs that my master/worker deploy configuration is -Xms512m -Xmx512m. Is there any way that I can increase it? Or 512mb is just fine? AFAIK, spark require huge memory. 3) I have a hadoop cluster and its working. Could anyone point me how to integrate Yarn with Spark? Any good tutorial would be very useful Thanks, Al -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Configuration-setup-and-Connection-refused-tp11477.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
SELECT DISTINCT generates random results?
Hi, all I use “SELECT DISTINCT” to query the data saved in hive it seems that this statement cannot understand the table structure and just output the data in other fields Anyone met the similar problem before? Best, -- Nan Zhu
Re: pyspark inferSchema
Yes, 2376 has been fixed in master. Can you give it a try? Also, for inferSchema, because Python is dynamically typed, I agree with Davies to provide a way to scan a subset (or entire) of the dataset to figure out the proper schema. We will take a look it. Thanks, Yin On Tue, Aug 5, 2014 at 12:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Assuming updating to master fixes the bug I was experiencing with jsonRDD and jsonFile, then pushing sample to master will probably not be necessary. We believe that the link below was the bug I experienced, and I've been told it is fixed in master. https://issues.apache.org/jira/browse/SPARK-2376 best, -brad On Tue, Aug 5, 2014 at 12:18 PM, Davies Liu dav...@databricks.com wrote: This sample argument of inferSchema is still no in master, if will try to add it if it make sense. On Tue, Aug 5, 2014 at 12:14 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, Thanks for the response and tips. Is the sample argument to inferSchema available in the 1.0.1 release of pyspark? I'm not sure (based on the documentation linked below) that it is. http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema It sounds like updating to master may help address my issue (and may also make the sample argument available), so I'm going to go ahead and do that. best, -Brad On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com wrote: On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) a.printSchema() root |-- baz: array (nullable = true) ||-- element: integer (containsNull = false) |-- foo: string (nullable = true) It works really well! It handles fields with inconsistent value types by inferring a value type that covers all the possible values. But say you’ve already deserialized the JSON to do some pre-processing or filtering. You’d commonly want to do this, say, to remove bad data. So now you have an RDD of Python dictionaries, as opposed to an RDD of JSON strings. It would be perfect if you could get the completeness of the json...() methods, but against dictionaries. Unfortunately, as you noted, inferSchema() only looks at the first element in the set. Furthermore, inferring schemata from RDDs of dictionaries is being deprecated in favor of doing so from RDDs of Rows. I’m not sure what the intention behind this move is, but as a user I’d like to be able to convert RDDs of dictionaries directly to SchemaRDDs with the completeness of the jsonRDD()/jsonFile() methods. Right now if I really want that, I have to serialize the dictionaries to JSON text and then call jsonRDD(), which is expensive. Before upcoming 1.1 release, we did not support nested structures via inferSchema, the nested dictionary will be MapType. This introduces inconsistance for dictionary that the top level will be structure type (can be accessed by name of field) but others will be MapType (can be accesses as map). So deprecated top level dictionary is try to solve this kind of inconsistance. The Row class in pyspark.sql has a similar interface to dict, so you can easily convert you dic into a Row: ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d))) In order to get the correct schema, so we need another argument to specify the number of rows to be infered? Such as: inferSchema(rdd, sample=None) with sample=None, it will take the first row, or it will do the sampling to figure out the complete schema. Does this work for you? Nick On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]}, {'foo':'boom', 'baz':[1,2,3]}])) Works fine: srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]}, {'foo':'boom', 'baz':[]}])) To be fair inferSchema says it peeks at the first row, so a possible work-around would be to make sure the type of any collection can be determined using the first instance. However,
Re: SELECT DISTINCT generates random results?
nvm, some problem brought by the ill-formatted raw data -- Nan Zhu On Tuesday, August 5, 2014 at 3:42 PM, Nan Zhu wrote: Hi, all I use “SELECT DISTINCT” to query the data saved in hive it seems that this statement cannot understand the table structure and just output the data in other fields Anyone met the similar problem before? Best, -- Nan Zhu
spark-submit symlink
spark-submit doesnt handle being symlinks currently: $ spark-submit /usr/local/bin/spark-submit: line 44: /usr/local/bin/spark-class: No such file or directory /usr/local/bin/spark-submit: line 44: exec: /usr/local/bin/spark-class: cannot execute: No such file or directory to fix i changed the line that sets SPARK_HOME to: export SPARK_HOME=$(script=`readlink $0`;cd `dirname $script`/..; pwd) i have seen people resolve multiple symlinks in a for loop kind of structure, but thats beyond my bash abilities
spark-ec2 script with VPC
I'm trying to use the spark-ec2 script to launch a spark cluster within a virtual private cloud (VPC) but I don't see an option for that. Is there a way to specify the VPC while using the spark-ec2 script? I found an old spark-incubator mailing list comment which claims to have added that support, but none of the links from the thread are still active. http://mail-archives.apache.org/mod_mbox/incubator-spark-dev/201402.mbox/%3cgit-pr-620-incubator-sp...@git.apache.org%3E The exact error is ssh 255 while generating the cluster's SSH key on master. The error suggests checking the identity file and key pairs, but I confirmed on another instance that these are correct. Please check that you have provided the correct --identity-file and --key-pair parameters and try again. Any help?
[PySpark] [SQL] Going from RDD[dict] to SchemaRDD
Forking from this thread http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-inferSchema-tc11449.html. On Tue, Aug 5, 2014 at 3:01 PM, Davies Liu dav...@databricks.com http://mailto:dav...@databricks.com wrote: Before upcoming 1.1 release, we did not support nested structures via inferSchema, the nested dictionary will be MapType. This introduces inconsistance for dictionary that the top level will be structure type (can be accessed by name of field) but others will be MapType (can be accesses as map). When you mention field access here, do you mean via SQL? Could you provide a brief code example to illustrate your point? The Row class in pyspark.sql has a similar interface to dict, so you can easily convert you dic into a Row: ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d))) I just tried that out and it seems to work well. In order to get the correct schema, so we need another argument to specify the number of rows to be infered? Such as: ... Does this work for you? Maybe; I’m not sure just yet. Basically, I’m looking for something functionally equivalent to this: sqlContext.jsonRDD(RDD[dict].map(lambda x: json.dumps(x))) In other words, given an RDD of JSON-serializable Python dictionaries, I want to be able to infer a schema that is guaranteed to cover the entire data set. With semi-structured data, it is rarely useful to infer schema by inspecting just one element. Does that sound like something we want to support? Nick
issue with spark and bson input
Hello, I have issue when try to use bson file as spark input. I use mongo-hadoop-connector 1.3.0 and spark 1.0.0: val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val config = new Configuration() config.set(mongo.job.input.format, com.mongodb.hadoop.BSONFileInputFormat) config.set(mapred.input.dir, file:///root/jobs/dump/input.bson) config.set(mongo.output.uri, mongodb:// + args(0) + / + args(2)) val mongoRDD = sc.newAPIHadoopFile(file:///root/jobs/dump/input.bson, classOf[BSONFileInputFormat], classOf[Object], classOf[BSONObject], config) But on last line I recieve error: inferred type arguments [Object,org.bson.BSONObject,com.mongodb.hadoop.BSONFileInputFormat] do not conform to method newAPIHadoopFile's type parameter bounds [K,V,F : org.apache.hadoop.mapreduce.InputFormat[K,V]] this is very strange, because BSONFileInputFormat extends org.apache.hadoop.mapreduce.lib.input.FileInputFormat: https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/BSONFileInputFormat.java How I can solve this issue? I have no problems with com.mongodb.hadoop.MongoInputFormat when use mongodb collection as input. And moreover seems there is no problem with java api: https://github.com/crcsmnky/mongodb-spark-demo/blob/master/src/main/java/com/mongodb/spark/demo/Recommender.java I'm not professional java/scala developer, please help. -- Regards Dmitriy Selivanov
Re: Include permalinks in mail footer
Looks like this feature has been turned off. Are these changes intentional? Or perhaps I'm not understanding how it's supposed to work. Nick On Fri, Jul 18, 2014 at 12:20 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Looks like this has now been turned on for new threads? On Thu, Jul 17, 2014 at 9:11 PM, Tobias Pfeiffer t...@preferred.jp wrote: On Jul 17, 2014, at 12:59 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I often find myself wanting to reference one thread from another, or from a JIRA issue. Right now I have to google the thread subject and find the link that way. +1
Re: [PySpark] [SQL] Going from RDD[dict] to SchemaRDD
Maybe; I’m not sure just yet. Basically, I’m looking for something functionally equivalent to this: sqlContext.jsonRDD(RDD[dict].map(lambda x: json.dumps(x))) In other words, given an RDD of JSON-serializable Python dictionaries, I want to be able to infer a schema that is guaranteed to cover the entire data set. With semi-structured data, it is rarely useful to infer schema by inspecting just one element. Does that sound like something we want to support? Yes, I think that would be good. I'd open a JIRA.
Re: Understanding RDD.GroupBy OutOfMemory Exceptions
Patrick Wendell wrote In the latest version of Spark we've added documentation to make this distinction more clear to users: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L390 That is a very good addition to the documentation. Nice and clear about the dangers of groupBy. Patrick Wendell wrote Currently groupBy requires that all of the values for one key can fit in memory. Is that really true? Will partitions not spill to disk, hence the recommendation in the documentation to up the parallelism of groupBy et al? A better question might be: How exactly does partitioning affect groupBy with regards to memory consumption. What will **have** to fit in memory, and what may be spilled to disk, if running out of memory? And if it really is true, that Spark requires all groups' values to fit in memory, how do I do a on-disk grouping of results, similar to what I'd to in a Hadoop job by using a mapper emitting (groupId, value) key-value pairs, and having an entity reducer writing results to disk? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-tp11427p11487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Include permalinks in mail footer
Emails sent from Nabble have it, while others don't. Unfortunately I haven't received a reply from ASF infra on this yet. Matei On August 5, 2014 at 2:04:10 PM, Nicholas Chammas (nicholas.cham...@gmail.com) wrote: Looks like this feature has been turned off. Are these changes intentional? Or perhaps I'm not understanding how it's supposed to work. Nick On Fri, Jul 18, 2014 at 12:20 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Looks like this has now been turned on for new threads? On Thu, Jul 17, 2014 at 9:11 PM, Tobias Pfeiffer t...@preferred.jp wrote: On Jul 17, 2014, at 12:59 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I often find myself wanting to reference one thread from another, or from a JIRA issue. Right now I have to google the thread subject and find the link that way. +1
Re: Configuration setup and Connection refused
Spark is not able to communicate with your hadoop hdfs. Is your hdfs running, if so can you try to explicitly connect to it with hadoop command line tools giving full hostname port. Or test port using telnet localhost 9000 In all likelyhood either your hdfs is down, bound to wrong port/ip that spark cannot access Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Aug 5, 2014 at 11:33 PM, alamin.ishak alamin.is...@gmail.com wrote: Hi, Anyone? Any input would be much appreciated Thanks, Amin On 5 Aug 2014 00:31, Al Amin [hidden email] http://user/SendEmail.jtp?type=nodenode=11477i=0 wrote: Hi all, Any help would be much appreciated. Thanks, Al On Mon, Aug 4, 2014 at 7:09 PM, Al Amin [hidden email] http://user/SendEmail.jtp?type=nodenode=11477i=1 wrote: Hi all, I have setup 2 nodes (master and slave1) on stand alone mode. Tried running SparkPi example and its working fine. However when I move on to wordcount its giving me below error: 14/08/04 21:40:33 INFO storage.MemoryStore: ensureFreeSpace(32856) called with curMem=0, maxMem=311387750 14/08/04 21:40:33 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.1 KB, free 296.9 MB) 14/08/04 21:40:34 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 0 time(s). 14/08/04 21:40:35 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 1 time(s). 14/08/04 21:40:36 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 2 time(s). 14/08/04 21:40:37 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 3 time(s). 14/08/04 21:40:38 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 4 time(s). 14/08/04 21:40:39 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 5 time(s). 14/08/04 21:40:40 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 6 time(s). 14/08/04 21:40:41 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 7 time(s). 14/08/04 21:40:42 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 8 time(s). 14/08/04 21:40:43 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 9 time(s). Exception in thread main java.lang.RuntimeException: java.net.ConnectException: Call to master/10.0.1.27:9000 failed on connection exception: java.net.ConnectException: Connection refused 1) how to fix this issue? I have configure hostname --fqdn accordingly. 2) I could see that in my logs that my master/worker deploy configuration is -Xms512m -Xmx512m. Is there any way that I can increase it? Or 512mb is just fine? AFAIK, spark require huge memory. 3) I have a hadoop cluster and its working. Could anyone point me how to integrate Yarn with Spark? Any good tutorial would be very useful Thanks, Al -- View this message in context: Re: Configuration setup and Connection refused http://apache-spark-user-list.1001560.n3.nabble.com/Re-Configuration-setup-and-Connection-refused-tp11477.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Include permalinks in mail footer
Oh actually sorry, it looks like infra has looked at it but they can't add permalinks. They can only add here's how to unsubscribe footers. My bad, I just didn't catch the email update from them. Matei On August 5, 2014 at 2:39:45 PM, Matei Zaharia (matei.zaha...@gmail.com) wrote: Emails sent from Nabble have it, while others don't. Unfortunately I haven't received a reply from ASF infra on this yet. Matei On August 5, 2014 at 2:04:10 PM, Nicholas Chammas (nicholas.cham...@gmail.com) wrote: Looks like this feature has been turned off. Are these changes intentional? Or perhaps I'm not understanding how it's supposed to work. Nick On Fri, Jul 18, 2014 at 12:20 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Looks like this has now been turned on for new threads? On Thu, Jul 17, 2014 at 9:11 PM, Tobias Pfeiffer t...@preferred.jp wrote: On Jul 17, 2014, at 12:59 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I often find myself wanting to reference one thread from another, or from a JIRA issue. Right now I have to google the thread subject and find the link that way. +1
Re: Include permalinks in mail footer
Ah, the user-specific to: address? I see. OK, thanks for looking into it! On Tue, Aug 5, 2014 at 5:40 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Oh actually sorry, it looks like infra has looked at it but they can't add permalinks. They can only add here's how to unsubscribe footers. My bad, I just didn't catch the email update from them. Matei On August 5, 2014 at 2:39:45 PM, Matei Zaharia (matei.zaha...@gmail.com) wrote: Emails sent from Nabble have it, while others don't. Unfortunately I haven't received a reply from ASF infra on this yet. Matei On August 5, 2014 at 2:04:10 PM, Nicholas Chammas ( nicholas.cham...@gmail.com) wrote: Looks like this feature has been turned off. Are these changes intentional? Or perhaps I'm not understanding how it's supposed to work. Nick On Fri, Jul 18, 2014 at 12:20 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Looks like this has now been turned on for new threads? On Thu, Jul 17, 2014 at 9:11 PM, Tobias Pfeiffer t...@preferred.jp wrote: On Jul 17, 2014, at 12:59 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I often find myself wanting to reference one thread from another, or from a JIRA issue. Right now I have to google the thread subject and find the link that way. +1
Re: Configuration setup and Connection refused
Then dont specify hdfs when you read file. Also the community is quite active in response in general, just be a little patient. Also if possible look at spark training as part of spark summit 2014 vids and/or amplabs training on spark website. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Aug 6, 2014 at 1:58 AM, Al Amin alamin.is...@gmail.com wrote: Finally, someone reply. thank you, sir! But I am planning to deploy stand alone mode of Spark. I thought there is no need to use hdfs? And my spark is not being built with hadoop/yarn config. regards, Amin On Tue, Aug 5, 2014 at 10:39 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Spark is not able to communicate with your hadoop hdfs. Is your hdfs running, if so can you try to explicitly connect to it with hadoop command line tools giving full hostname port. Or test port using telnet localhost 9000 In all likelyhood either your hdfs is down, bound to wrong port/ip that spark cannot access Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Aug 5, 2014 at 11:33 PM, alamin.ishak alamin.is...@gmail.com wrote: Hi, Anyone? Any input would be much appreciated Thanks, Amin On 5 Aug 2014 00:31, Al Amin [hidden email] http://user/SendEmail.jtp?type=nodenode=11477i=0 wrote: Hi all, Any help would be much appreciated. Thanks, Al On Mon, Aug 4, 2014 at 7:09 PM, Al Amin [hidden email] http://user/SendEmail.jtp?type=nodenode=11477i=1 wrote: Hi all, I have setup 2 nodes (master and slave1) on stand alone mode. Tried running SparkPi example and its working fine. However when I move on to wordcount its giving me below error: 14/08/04 21:40:33 INFO storage.MemoryStore: ensureFreeSpace(32856) called with curMem=0, maxMem=311387750 14/08/04 21:40:33 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.1 KB, free 296.9 MB) 14/08/04 21:40:34 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 0 time(s). 14/08/04 21:40:35 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 1 time(s). 14/08/04 21:40:36 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 2 time(s). 14/08/04 21:40:37 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 3 time(s). 14/08/04 21:40:38 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 4 time(s). 14/08/04 21:40:39 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 5 time(s). 14/08/04 21:40:40 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 6 time(s). 14/08/04 21:40:41 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 7 time(s). 14/08/04 21:40:42 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 8 time(s). 14/08/04 21:40:43 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 9 time(s). Exception in thread main java.lang.RuntimeException: java.net.ConnectException: Call to master/10.0.1.27:9000 failed on connection exception: java.net.ConnectException: Connection refused 1) how to fix this issue? I have configure hostname --fqdn accordingly. 2) I could see that in my logs that my master/worker deploy configuration is -Xms512m -Xmx512m. Is there any way that I can increase it? Or 512mb is just fine? AFAIK, spark require huge memory. 3) I have a hadoop cluster and its working. Could anyone point me how to integrate Yarn with Spark? Any good tutorial would be very useful Thanks, Al -- View this message in context: Re: Configuration setup and Connection refused http://apache-spark-user-list.1001560.n3.nabble.com/Re-Configuration-setup-and-Connection-refused-tp11477.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Configuration setup and Connection refused
Hi Amin, This happens usually because your application can't talk to HDFS, and thinks that the name node is waiting on port 9000 when it's not. Are you using the EC2 scripts for standalone Spark? You can verify whether or not the port is correct by checking the configurations with /root/ephemeral-hdfs/conf. (Are you running HdfsWordCount by any chance?) As Mayur mentioned, a good way to see whether or not there is any service listening on port 9000 is telnet. Andrew 2014-08-05 15:01 GMT-07:00 Mayur Rustagi mayur.rust...@gmail.com: Then dont specify hdfs when you read file. Also the community is quite active in response in general, just be a little patient. Also if possible look at spark training as part of spark summit 2014 vids and/or amplabs training on spark website. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Aug 6, 2014 at 1:58 AM, Al Amin alamin.is...@gmail.com wrote: Finally, someone reply. thank you, sir! But I am planning to deploy stand alone mode of Spark. I thought there is no need to use hdfs? And my spark is not being built with hadoop/yarn config. regards, Amin On Tue, Aug 5, 2014 at 10:39 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Spark is not able to communicate with your hadoop hdfs. Is your hdfs running, if so can you try to explicitly connect to it with hadoop command line tools giving full hostname port. Or test port using telnet localhost 9000 In all likelyhood either your hdfs is down, bound to wrong port/ip that spark cannot access Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Aug 5, 2014 at 11:33 PM, alamin.ishak alamin.is...@gmail.com wrote: Hi, Anyone? Any input would be much appreciated Thanks, Amin On 5 Aug 2014 00:31, Al Amin [hidden email] http://user/SendEmail.jtp?type=nodenode=11477i=0 wrote: Hi all, Any help would be much appreciated. Thanks, Al On Mon, Aug 4, 2014 at 7:09 PM, Al Amin [hidden email] http://user/SendEmail.jtp?type=nodenode=11477i=1 wrote: Hi all, I have setup 2 nodes (master and slave1) on stand alone mode. Tried running SparkPi example and its working fine. However when I move on to wordcount its giving me below error: 14/08/04 21:40:33 INFO storage.MemoryStore: ensureFreeSpace(32856) called with curMem=0, maxMem=311387750 14/08/04 21:40:33 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.1 KB, free 296.9 MB) 14/08/04 21:40:34 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 0 time(s). 14/08/04 21:40:35 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 1 time(s). 14/08/04 21:40:36 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 2 time(s). 14/08/04 21:40:37 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 3 time(s). 14/08/04 21:40:38 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 4 time(s). 14/08/04 21:40:39 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 5 time(s). 14/08/04 21:40:40 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 6 time(s). 14/08/04 21:40:41 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 7 time(s). 14/08/04 21:40:42 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 8 time(s). 14/08/04 21:40:43 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 9 time(s). Exception in thread main java.lang.RuntimeException: java.net.ConnectException: Call to master/10.0.1.27:9000 failed on connection exception: java.net.ConnectException: Connection refused 1) how to fix this issue? I have configure hostname --fqdn accordingly. 2) I could see that in my logs that my master/worker deploy configuration is -Xms512m -Xmx512m. Is there any way that I can increase it? Or 512mb is just fine? AFAIK, spark require huge memory. 3) I have a hadoop cluster and its working. Could anyone point me how to integrate Yarn with Spark? Any good tutorial would be very useful Thanks, Al -- View this message in context: Re: Configuration setup and Connection refused http://apache-spark-user-list.1001560.n3.nabble.com/Re-Configuration-setup-and-Connection-refused-tp11477.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Understanding RDD.GroupBy OutOfMemory Exceptions
Hi Jens, Within a partition things will spill - so the current documentation is correct. This spilling can only occur *across keys* at the moment. Spilling cannot occur within a key at present. This is discussed in the video here: https://www.youtube.com/watch?v=dmL0N3qfSc8index=3list=PL-x35fyliRwj7qNxXLgMRJaOk7o9inHBZ Spilling within one key for GroupBy's is likely to end up in the next release of Spark, Spark 1.2. In most cases we see when users hit this, they are actually trying to just do aggregations which would be more efficiently implemented without the groupBy operator. If the goal is literally to just write out to disk all the values associated with each group, and the values associated with a single group are larger than fit in memory, this cannot be accomplished right now with the groupBy operator. The best way to work around this depends a bit on what you are trying to do with the data down stream. Typically approaches involve sub-dividing any very large groups, for instance, appending a hashed value in a small range (1-10) to large keys. Then your downstream code has to deal with aggregating partial values for each group. If your goal is just to lay each group out sequentially on disk on one big file, you can call `sortByKey` with a hashed suffix as well. The sort functions are externalized in Spark 1.1 (which is in pre-release). - Patrick On Tue, Aug 5, 2014 at 2:39 PM, Jens Kristian Geyti sp...@jkg.dk wrote: Patrick Wendell wrote In the latest version of Spark we've added documentation to make this distinction more clear to users: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L390 That is a very good addition to the documentation. Nice and clear about the dangers of groupBy. Patrick Wendell wrote Currently groupBy requires that all of the values for one key can fit in memory. Is that really true? Will partitions not spill to disk, hence the recommendation in the documentation to up the parallelism of groupBy et al? A better question might be: How exactly does partitioning affect groupBy with regards to memory consumption. What will **have** to fit in memory, and what may be spilled to disk, if running out of memory? And if it really is true, that Spark requires all groups' values to fit in memory, how do I do a on-disk grouping of results, similar to what I'd to in a Hadoop job by using a mapper emitting (groupId, value) key-value pairs, and having an entity reducer writing results to disk? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-tp11427p11487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[Streaming] Akka-based receiver with messages defined in uploaded jar
Greetings, I modified ActorWordCount example a little and it uses simple case class as the message for Streaming instead of the primitive string. I also modified launch code to not use run-example script, but set spark master in the code and attach the jar (setJars(...)) with all the classes including new case class. It runs fine in the local[*] mode but fails with ClassNotFoundException in standalone cluster (stacktrace follows). I assume it's the classloader problems and akka remoting just doesn't know about the classes coming to the executor from attached jar. Am I right? I guess I could pass primitive values around and do my own (de)serialization but maybe there is a better way? What's the correct way to build custom akka-based receiver with usage of non-primitive messages? Here is the log excerpt with stacktrace: 14/08/04 20:59:41 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1407211181800 14/08/04 20:59:41 ERROR Remoting: com.genesys.gpe.analytics.akka.messages.SubscribeAck java.lang.ClassNotFoundException: com.genesys.gpe.analytics.akka.messages.SubscribeAck at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623) at akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) WBR, Anton
Re: [PySpark] [SQL] Going from RDD[dict] to SchemaRDD
SPARK-2870: Thorough schema inference directly on RDDs of Python dictionaries https://issues.apache.org/jira/browse/SPARK-2870 On Tue, Aug 5, 2014 at 5:07 PM, Michael Armbrust mich...@databricks.com wrote: Maybe; I’m not sure just yet. Basically, I’m looking for something functionally equivalent to this: sqlContext.jsonRDD(RDD[dict].map(lambda x: json.dumps(x))) In other words, given an RDD of JSON-serializable Python dictionaries, I want to be able to infer a schema that is guaranteed to cover the entire data set. With semi-structured data, it is rarely useful to infer schema by inspecting just one element. Does that sound like something we want to support? Yes, I think that would be good. I'd open a JIRA.
Re: trouble with jsonRDD and jsonFile in pyspark
Hi All, I've built and deployed the current head of branch-1.0, but it seems to have only partly fixed the bug. This code now runs as expected with the indicated output: srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:[1,2,3]}', '{foo:[4,5,6]}'])) srdd.printSchema() root |-- foo: ArrayType[IntegerType] srdd.collect() [{u'foo': [1, 2, 3]}, {u'foo': [4, 5, 6]}] This code still crashes: srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])) srdd.printSchema() root |-- foo: ArrayType[ArrayType(IntegerType)] srdd.collect() Py4JJavaError: An error occurred while calling o63.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3.0:29 failed 4 times, most recent failure: Exception failure in TID 67 on host kunitz.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments I may be able to see if this is fixed in master, but since it's not fixed in 1.0.3 it seems unlikely to be fixed in master either. I previously tried master as well, but ran into a build problem that did not occur with the 1.0 branch. Can anybody else verify that the second example still crashes (and is meant to work)? If so, would it be best to modify JIRA-2376 or start a new bug? https://issues.apache.org/jira/browse/SPARK-2376 best, -Brad On Tue, Aug 5, 2014 at 12:10 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Nick: Thanks for both the original JIRA bug report and the link. Michael: This is on the 1.0.1 release. I'll update to master and follow-up if I have any problems. best, -Brad On Tue, Aug 5, 2014 at 12:04 PM, Michael Armbrust mich...@databricks.com wrote: Is this on 1.0.1? I'd suggest running this on master or the 1.1-RC which should be coming out this week. Pyspark did not have good support for nested data previously. If you still encounter issues using a more recent version, please file a JIRA. Thanks! On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of some JSON data I have, but I've run into some instability involving the following java exception: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments I've pasted code which produces the error as well as the full traceback below. Note that I don't have any problem when I parse the JSON myself and use inferSchema. Is anybody able to reproduce this bug? -Brad srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}', '{foo:boom, baz:[1,2,3]}'])) srdd.printSchema() root |-- baz: ArrayType[IntegerType] |-- foo: StringType srdd.collect() --- Py4JJavaError Traceback (most recent call last) ipython-input-89-ec7e8e8c68c4 in module() 1 srdd.collect() /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self) 581 582 with _JavaStackTrace(self.context) as st: -- 583 bytesInJava = self._jrdd.collect().iterator() 584 return list(self._collect_iterator_through_file(bytesInJava)) 585 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 535 answer = self.gateway_client.send_command(command) 536 return_value = get_return_value(answer, self.gateway_client, -- 537 self.target_id, self.name) 538 539 for temp_arg in temp_args: /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603) net.razorvine.pickle.Pickler.dispatch(Pickler.java:299) net.razorvine.pickle.Pickler.save(Pickler.java:125) net.razorvine.pickle.Pickler.put_map(Pickler.java:322)
python dependencies loaded but not on PYTHONPATH
Hey, I just tried to submit a task to my spark cluster using the following command ./spark/bin/spark-submit --py-files file:///root/abc.zip --master spark://xxx.xxx.xxx.xxx:7077 test.py It seems like the dependency I’ve added gets loaded: 14/08/05 23:07:00 INFO spark.SparkContext: Added file file:///root/abc.zip at http://xxx.xxx.xxx.xxx:40346/files/abc.zip with timestamp 1407280020217 However, my python script can’t find the module in this zip file. I already verified if this zip file is not corrupt by install it with “pip install abc.zip” (works fine). Any ideas how to get the content of the archive to the PYTHONPATH on my master and slaves? Traceback (most recent call last): File /root/test.py, line 7, in module import abc ImportError: No module named abc Maybe, it’s just the master complaining since and it only transfers the archive to the slaves (and adds it to the PYTHONPATH)? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: trouble with jsonRDD and jsonFile in pyspark
This looks to be fixed in master: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}']) ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])) MapPartitionsRDD[14] at mapPartitions at SchemaRDD.scala:408 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])).printSchema() root |-- foo: array (nullable = true) ||-- element: array (containsNull = false) |||-- element: integer (containsNull = false) Nick On Tue, Aug 5, 2014 at 7:12 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I've built and deployed the current head of branch-1.0, but it seems to have only partly fixed the bug. This code now runs as expected with the indicated output: srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:[1,2,3]}', '{foo:[4,5,6]}'])) srdd.printSchema() root |-- foo: ArrayType[IntegerType] srdd.collect() [{u'foo': [1, 2, 3]}, {u'foo': [4, 5, 6]}] This code still crashes: srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])) srdd.printSchema() root |-- foo: ArrayType[ArrayType(IntegerType)] srdd.collect() Py4JJavaError: An error occurred while calling o63.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3.0:29 failed 4 times, most recent failure: Exception failure in TID 67 on host kunitz.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments I may be able to see if this is fixed in master, but since it's not fixed in 1.0.3 it seems unlikely to be fixed in master either. I previously tried master as well, but ran into a build problem that did not occur with the 1.0 branch. Can anybody else verify that the second example still crashes (and is meant to work)? If so, would it be best to modify JIRA-2376 or start a new bug? https://issues.apache.org/jira/browse/SPARK-2376 best, -Brad On Tue, Aug 5, 2014 at 12:10 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Nick: Thanks for both the original JIRA bug report and the link. Michael: This is on the 1.0.1 release. I'll update to master and follow-up if I have any problems. best, -Brad On Tue, Aug 5, 2014 at 12:04 PM, Michael Armbrust mich...@databricks.com wrote: Is this on 1.0.1? I'd suggest running this on master or the 1.1-RC which should be coming out this week. Pyspark did not have good support for nested data previously. If you still encounter issues using a more recent version, please file a JIRA. Thanks! On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of some JSON data I have, but I've run into some instability involving the following java exception: An error occurred while calling o1326.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664 on host neal.research.intel-research.net: net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments I've pasted code which produces the error as well as the full traceback below. Note that I don't have any problem when I parse the JSON myself and use inferSchema. Is anybody able to reproduce this bug? -Brad srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}', '{foo:boom, baz:[1,2,3]}'])) srdd.printSchema() root |-- baz: ArrayType[IntegerType] |-- foo: StringType srdd.collect() --- Py4JJavaError Traceback (most recent call last) ipython-input-89-ec7e8e8c68c4 in module() 1 srdd.collect() /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self) 581 582 with _JavaStackTrace(self.context) as st: -- 583 bytesInJava = self._jrdd.collect().iterator() 584 return list(self._collect_iterator_through_file(bytesInJava)) 585 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 535 answer = self.gateway_client.send_command(command) 536 return_value = get_return_value(answer, self.gateway_client, -- 537 self.target_id, self.name) 538 539 for temp_arg in temp_args: /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300
Re: Using sbt-pack with Spark 1.0.0
Are there any workarounds for this? Seems to be a dead end so far. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-sbt-pack-with-Spark-1-0-0-tp6649p11502.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Streaming] Akka-based receiver with messages defined in uploaded jar
Can you show us the modified version. The reason could very well be what you suggest, but I want to understand what conditions lead to this. TD On Tue, Aug 5, 2014 at 3:55 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Greetings, I modified ActorWordCount example a little and it uses simple case class as the message for Streaming instead of the primitive string. I also modified launch code to not use run-example script, but set spark master in the code and attach the jar (setJars(…)) with all the classes including new case class. It runs fine in the local[*] mode but fails with ClassNotFoundException in standalone cluster (stacktrace follows). I assume it’s the classloader problems and akka remoting just doesn’t know about the classes coming to the executor from attached jar. Am I right? I guess I could pass primitive values around and do my own (de)serialization but maybe there is a better way? What’s the correct way to build custom akka-based receiver with usage of non-primitive messages? Here is the log excerpt with stacktrace: 14/08/04 20:59:41 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1407211181800 14/08/04 20:59:41 ERROR Remoting: com.genesys.gpe.analytics.akka.messages.SubscribeAck java.lang.ClassNotFoundException: com.genesys.gpe.analytics.akka.messages.SubscribeAck at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623) at akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) WBR, Anton - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unit Test for Spark Streaming
That function is simply deletes a directory recursively. you can use alternative libraries. see this discussion http://stackoverflow.com/questions/779519/delete-files-recursively-in-java On Tue, Aug 5, 2014 at 5:02 PM, JiajiaJing jj.jing0...@gmail.com wrote: Hi TD, I encountered a problem when trying to run the KafkaStreamSuite.scala unit test. I added scalatest-maven-plugin to my pom.xml, then ran mvn test, and got the follow error message: error: object Utils in package util cannot be accessed in package org.apache.spark.util [INFO] brokerConf.logDirs.foreach { f = Utils.deleteRecursively(new File(f)) } [INFO]^ I checked that Utils.scala does exists under spark/core/src/main/scala/org/apache/spark/util/, so I have no idea about why this access error. Could you please help me with this? Thank you very much! Best Regards, Jiajia -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-for-Spark-Streaming-tp11394p11505.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming fails - where is the problem?
@ Simon Any progress? On Tue, Aug 5, 2014 at 12:17 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You need to add twitter4j-*-3.0.3.jars to your class path Thanks Best Regards On Tue, Aug 5, 2014 at 7:18 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you able to run it locally? If not, can you try creating an all-inclusive jar with all transitive dependencies together (sbt assembly) and then try running the app? Then this will be a self contained environment, which will help us debug better. TD On Mon, Aug 4, 2014 at 5:06 PM, durin m...@simon-schaefer.net wrote: In the WebUI Environment tab, the section Classpath Entries lists the following ones as part of System Classpath: /foo/hadoop-2.0.0-cdh4.5.0/etc/hadoop /foo/spark-master-2014-07-28/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop2.0.0-cdh4.5.0.jar /foo/spark-master-2014-07-28/conf /foo/spark-master-2014-07-28/external/twitter/target/spark-streaming-twitter_2.10-1.1.0-SNAPSHOT.jar /foo/spark-master-2014-07-28/extrajars/twitter4j-core-3.0.3.jar /foo/spark-master-2014-07-28/extrajars/twitter4j-stream-3.0.3.jar So I can't see where any other versions would come from. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11391.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming at-least once guarantee
I can try answering the question even if I am not Sanjeet ;) There isnt a simple way to do this. In fact the ideal way to do it would be to create a new InputDStream (just like FileInputDStream https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala) where you will create hadoop RDDs as SQS messages are received. But stepping back, I want to understand why do you want to integrate with Spark Streaming at all? If you already have an working system that runs Spark jobs when SQS sends a message about new files, then why use Spark Streaming at all? What is lacking in that implementation? Based on that its worth going into the effort of implementing a new input stream. TD On Tue, Aug 5, 2014 at 12:45 AM, lalit1303 la...@sigmoidanalytics.com wrote: Hi Sanjeet, I have been using spark streaming for processing of files present in S3 and HDFS. I am also using SQS messages for the same purpose as yours i.e. pointer to S3 file. As of now, I have a separate SQS job which receive message from SQS queue and gets the corresponding file from S3. Now, I wasnt to integrate the SQS receiver with spark streaming. Like, my spark streaming job would listen for new SQS messages and proceed accordingly. I was wondering if you find any solution to this. Please let me know in case!! In your above approach, you can achieve #4 in the following way: When you are passing a forEach function to be applied on each RDD of Dstream, you can pass information of SQS message (lke receipthandle for deleting message) associated with that particualar file. After success/failure in processing you can perform deletion of your SQS message accordingly. Thanks --Lalit - Lalit Yadav la...@sigmoidanalytics.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-at-least-once-guarantee-tp10902p11419.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: streaming window not behaving as advertised (v1.0.1)
1. udpateStateByKey should be called on all keys even if there is not data corresponding to that key. There is a unit test for that. https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala#L337 2. I am increasing the priority for this. Off the top of my head, this is easy to fix, but hard to test reliably test in a unit test. Will fix it soon after Spark 1.1 release. TD On Fri, Aug 1, 2014 at 7:37 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Hi TD, I've also been fighting this issue only to find the exact same solution you are suggesting. Too bad I didn't find either the post or the issue sooner. I'm using a 1 second batch with N amount of kafka events (1 to 1 with the state objects) per batch and only calling the updatestatebykey function. This is my interpretation, please correct me if needed: Because of Spark’s lazy computation the RDDs weren’t being updated as expected on the batch interval execution. The assumption was that as long as I have a streaming batch run (with or without new messages), I should get updated RDDs, which was not happening. We only get updateStateByKey calls for objects which got events or that are forced through an output function to compute. I did not make further test to confirm this, but that's the given impression. This doesn't fit our requirements as we want to do duration updates based on the batch interval execution...so I had to force the computation of all the objects through the ForeachRDD function. I will also appreciate if the priority can be increased to the issue. I assume the ForeachRDD is additional unnecessary resource allocation (although I'm not sure how much) as opposite to doing it somehow by default on batch interval execution. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-window-not-behaving-as-advertised-v1-0-1-tp10453p11168.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Can't zip RDDs with unequal numbers of partitions
Hi All, I met the titled error. This exception occured in line 223, as shown below: 212 // read files 213 val lines = sc.textFile(path_edges).map(line=line.split(,)).map(line=((line(0), line(1)), line(2).toDouble)).reduceByKey(_+ _).cache 214 215 val lines_vertices = lines.map{line=(line._1._1, Map(nameHash(line._1._2)-line._2))}.reduceByKey(_++_).cache 216 217 val name_shadow = _shadow 218 219 val nodes = 220 lines_vertices 221 .map{line=(nameHash(line._1), (1, Map[VertexId,Double](), line._1))} ++ 222 lines_vertices 223 .map{line=(nameHash(line._1 + name_shadow), (2,line._2, line._1 + name_shadow))} ++ 224 lines 225 .map{line=(nameHash(line._1._2), (3, Map[VertexId,Double](), line._1._2))} Sorry for posting the source codes, but I couldn't think of a better way. I am confused how come the partitions were unequal, and how I can control the number of partitions of these RDD. Can someone give me some advice on this problem? Thanks very much! Best, Bin
Save an RDD to a SQL Database
Hi, I would like to save an RDD to a SQL database. It seems like this would be a common enough use case. Are there any built in libraries to do it? Otherwise, I'm just planning on mapping my RDD, and having that call a method to write to the database. Given that a lot of records are going to be written, the code would need to be smart and do a batch insert after enough records have collected. Does that sound like a reasonable approach? -Vida
Re: trouble with jsonRDD and jsonFile in pyspark
Hi All, I checked out and built master. Note that Maven had a problem building Kafka (in my case, at least); I was unable to fix this easily so I moved on since it seemed unlikely to have any influence on the problem at hand. Master improves functionality (including the example Nicholas just demonstrated) but unfortunately there still seems to be a bug related to using dictionaries as values. I've put some code below to illustrate the bug. *# dictionary as value works fine* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value}}'])).collect() [Row(key0=Row(key1=u'value'))] *# dictionary as value works fine, even when inner keys are varied* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}', '{key0: {key2: value2}}'])).collect() [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None, key2=u'value2'))] *# dictionary as value works fine when inner keys are missing and outer key is present* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1: value1}}'])).collect() [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))] *# dictionary as value FAILS when outer key is missing* * print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1: value1}}'])).collect()* Py4JJavaError: An error occurred while calling o84.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in stage 7.0 (TID 242, engelland.research.intel-research.net): java.lang.NullPointerException... *# dictionary as value FAILS when outer key is present with null value* * print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0: {key1: value1}}'])).collect()* Py4JJavaError: An error occurred while calling o98.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in stage 9.0 (TID 305, kunitz.research.intel-research.net): java.lang.NullPointerException... *# nested lists work even when outer key is missing* print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0, item1], [item2, item3]]}'])).collect() [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])] Is anyone able to replicate this behavior? -Brad On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com wrote: We try to keep master very stable, but this is where active development happens. YMMV, but a lot of people do run very close to master without incident (myself included). branch-1.0 has been cut for a while and we only merge bug fixes into it (this is more strict for non-alpha components like spark core.). For Spark SQL, this branch is pretty far behind as the project is very young and we are fixing bugs / adding features very rapidly compared with Spark core. branch-1.1 was just cut and is being QAed for a release, at this point its likely the same as master, but that will change as features start getting added to master in the coming weeks. On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: collect() works, too. sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])).collect() [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])] Can’t answer your question about branch stability, though. Spark is a very active project, so stuff is happening all the time. Nick On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Nick, Can you check that the call to collect() works as well as printSchema()? I actually experience that printSchema() works fine, but then it crashes on collect(). In general, should I expect the master (which seems to be on branch-1.1) to be any more/less stable than branch-1.0? While it would be great to have this fixed, it would be good to know if I should expect lots of other instability. best, -Brad On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: This looks to be fixed in master: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}' ]) ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])) MapPartitionsRDD[14] at mapPartitions at SchemaRDD.scala:408 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])).printSchema() root |-- foo: array (nullable = true) ||-- element: array (containsNull = false) |||-- element: integer (containsNull = false) Nick On Tue, Aug 5, 2014 at 7:12 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I've built and deployed the current head of branch-1.0, but it seems to have only partly fixed the bug. This code now runs as expected with the indicated output: srdd =
Re: pyspark inferSchema
I've followed up in a thread more directly related to jsonRDD and jsonFile, but it seems like after building from the current master I'm still having some problems with nested dictionaries. http://apache-spark-user-list.1001560.n3.nabble.com/trouble-with-jsonRDD-and-jsonFile-in-pyspark-tp11461p11517.html On Tue, Aug 5, 2014 at 12:56 PM, Yin Huai yh...@databricks.com wrote: Yes, 2376 has been fixed in master. Can you give it a try? Also, for inferSchema, because Python is dynamically typed, I agree with Davies to provide a way to scan a subset (or entire) of the dataset to figure out the proper schema. We will take a look it. Thanks, Yin On Tue, Aug 5, 2014 at 12:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Assuming updating to master fixes the bug I was experiencing with jsonRDD and jsonFile, then pushing sample to master will probably not be necessary. We believe that the link below was the bug I experienced, and I've been told it is fixed in master. https://issues.apache.org/jira/browse/SPARK-2376 best, -brad On Tue, Aug 5, 2014 at 12:18 PM, Davies Liu dav...@databricks.com wrote: This sample argument of inferSchema is still no in master, if will try to add it if it make sense. On Tue, Aug 5, 2014 at 12:14 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, Thanks for the response and tips. Is the sample argument to inferSchema available in the 1.0.1 release of pyspark? I'm not sure (based on the documentation linked below) that it is. http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema It sounds like updating to master may help address my issue (and may also make the sample argument available), so I'm going to go ahead and do that. best, -Brad On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com wrote: On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', '{foo:boom, baz:[1,2,3]}'])) a.printSchema() root |-- baz: array (nullable = true) ||-- element: integer (containsNull = false) |-- foo: string (nullable = true) It works really well! It handles fields with inconsistent value types by inferring a value type that covers all the possible values. But say you’ve already deserialized the JSON to do some pre-processing or filtering. You’d commonly want to do this, say, to remove bad data. So now you have an RDD of Python dictionaries, as opposed to an RDD of JSON strings. It would be perfect if you could get the completeness of the json...() methods, but against dictionaries. Unfortunately, as you noted, inferSchema() only looks at the first element in the set. Furthermore, inferring schemata from RDDs of dictionaries is being deprecated in favor of doing so from RDDs of Rows. I’m not sure what the intention behind this move is, but as a user I’d like to be able to convert RDDs of dictionaries directly to SchemaRDDs with the completeness of the jsonRDD()/jsonFile() methods. Right now if I really want that, I have to serialize the dictionaries to JSON text and then call jsonRDD(), which is expensive. Before upcoming 1.1 release, we did not support nested structures via inferSchema, the nested dictionary will be MapType. This introduces inconsistance for dictionary that the top level will be structure type (can be accessed by name of field) but others will be MapType (can be accesses as map). So deprecated top level dictionary is try to solve this kind of inconsistance. The Row class in pyspark.sql has a similar interface to dict, so you can easily convert you dic into a Row: ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d))) In order to get the correct schema, so we need another argument to specify the number of rows to be infered? Such as: inferSchema(rdd, sample=None) with sample=None, it will take the first row, or it will do the sampling to figure out the complete schema. Does this work for you? Nick On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have a data set where each record is serialized using JSON, and I'm interested to use SchemaRDDs to work with the data. Unfortunately I've hit a snag since some fields in the data are maps and list, and are not guaranteed to be populated for each record. This seems to cause inferSchema to throw an error: Produces error: srdd =
Re: trouble with jsonRDD and jsonFile in pyspark
I tried jsonRDD(...).printSchema() and it worked. Seems the problem is when we take the data back to the Python side, SchemaRDD#javaToPython failed on your cases. I have created https://issues.apache.org/jira/browse/SPARK-2875 to track it. Thanks, Yin On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I checked out and built master. Note that Maven had a problem building Kafka (in my case, at least); I was unable to fix this easily so I moved on since it seemed unlikely to have any influence on the problem at hand. Master improves functionality (including the example Nicholas just demonstrated) but unfortunately there still seems to be a bug related to using dictionaries as values. I've put some code below to illustrate the bug. *# dictionary as value works fine* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value}}'])).collect() [Row(key0=Row(key1=u'value'))] *# dictionary as value works fine, even when inner keys are varied* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}', '{key0: {key2: value2}}'])).collect() [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None, key2=u'value2'))] *# dictionary as value works fine when inner keys are missing and outer key is present* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1: value1}}'])).collect() [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))] *# dictionary as value FAILS when outer key is missing* * print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1: value1}}'])).collect()* Py4JJavaError: An error occurred while calling o84.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in stage 7.0 (TID 242, engelland.research.intel-research.net): java.lang.NullPointerException... *# dictionary as value FAILS when outer key is present with null value* * print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0: {key1: value1}}'])).collect()* Py4JJavaError: An error occurred while calling o98.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in stage 9.0 (TID 305, kunitz.research.intel-research.net): java.lang.NullPointerException... *# nested lists work even when outer key is missing* print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0, item1], [item2, item3]]}'])).collect() [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])] Is anyone able to replicate this behavior? -Brad On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com wrote: We try to keep master very stable, but this is where active development happens. YMMV, but a lot of people do run very close to master without incident (myself included). branch-1.0 has been cut for a while and we only merge bug fixes into it (this is more strict for non-alpha components like spark core.). For Spark SQL, this branch is pretty far behind as the project is very young and we are fixing bugs / adding features very rapidly compared with Spark core. branch-1.1 was just cut and is being QAed for a release, at this point its likely the same as master, but that will change as features start getting added to master in the coming weeks. On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: collect() works, too. sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])).collect() [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])] Can’t answer your question about branch stability, though. Spark is a very active project, so stuff is happening all the time. Nick On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Nick, Can you check that the call to collect() works as well as printSchema()? I actually experience that printSchema() works fine, but then it crashes on collect(). In general, should I expect the master (which seems to be on branch-1.1) to be any more/less stable than branch-1.0? While it would be great to have this fixed, it would be good to know if I should expect lots of other instability. best, -Brad On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: This looks to be fixed in master: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}' ]) ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])) MapPartitionsRDD[14] at mapPartitions at SchemaRDD.scala:408 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])).printSchema() root |-- foo: array (nullable = true) ||--
Re: trouble with jsonRDD and jsonFile in pyspark
I concur that printSchema works; it just seems to be operations that use the data where trouble happens. Thanks for posting the bug. -Brad On Tue, Aug 5, 2014 at 10:05 PM, Yin Huai yh...@databricks.com wrote: I tried jsonRDD(...).printSchema() and it worked. Seems the problem is when we take the data back to the Python side, SchemaRDD#javaToPython failed on your cases. I have created https://issues.apache.org/jira/browse/SPARK-2875 to track it. Thanks, Yin On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I checked out and built master. Note that Maven had a problem building Kafka (in my case, at least); I was unable to fix this easily so I moved on since it seemed unlikely to have any influence on the problem at hand. Master improves functionality (including the example Nicholas just demonstrated) but unfortunately there still seems to be a bug related to using dictionaries as values. I've put some code below to illustrate the bug. *# dictionary as value works fine* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value}}'])).collect() [Row(key0=Row(key1=u'value'))] *# dictionary as value works fine, even when inner keys are varied* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}', '{key0: {key2: value2}}'])).collect() [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None, key2=u'value2'))] *# dictionary as value works fine when inner keys are missing and outer key is present* print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1: value1}}'])).collect() [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))] *# dictionary as value FAILS when outer key is missing* * print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1: value1}}'])).collect()* Py4JJavaError: An error occurred while calling o84.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in stage 7.0 (TID 242, engelland.research.intel-research.net): java.lang.NullPointerException... *# dictionary as value FAILS when outer key is present with null value* * print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0: {key1: value1}}'])).collect()* Py4JJavaError: An error occurred while calling o98.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in stage 9.0 (TID 305, kunitz.research.intel-research.net): java.lang.NullPointerException... *# nested lists work even when outer key is missing* print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0, item1], [item2, item3]]}'])).collect() [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])] Is anyone able to replicate this behavior? -Brad On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com wrote: We try to keep master very stable, but this is where active development happens. YMMV, but a lot of people do run very close to master without incident (myself included). branch-1.0 has been cut for a while and we only merge bug fixes into it (this is more strict for non-alpha components like spark core.). For Spark SQL, this branch is pretty far behind as the project is very young and we are fixing bugs / adding features very rapidly compared with Spark core. branch-1.1 was just cut and is being QAed for a release, at this point its likely the same as master, but that will change as features start getting added to master in the coming weeks. On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: collect() works, too. sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])).collect() [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])] Can’t answer your question about branch stability, though. Spark is a very active project, so stuff is happening all the time. Nick On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Nick, Can you check that the call to collect() works as well as printSchema()? I actually experience that printSchema() works fine, but then it crashes on collect(). In general, should I expect the master (which seems to be on branch-1.1) to be any more/less stable than branch-1.0? While it would be great to have this fixed, it would be good to know if I should expect lots of other instability. best, -Brad On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: This looks to be fixed in master: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}' ]) ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}']))
type issue: found RDD[T] expected RDD[A]
Hi All, I am having some trouble trying to write generic code that uses sqlContext and RDDs. Can you suggest what might be wrong? class SparkTable[T : ClassTag](val sqlContext:SQLContext, val extractor: (String) = (T) ) { private[this] var location:Option[String] =None private[this] var name:Option[String]=None private[this] val sc = sqlContext.sparkContext ... def makeRDD(sqlQuery:String):SchemaRDD={ require(this.location!=None) require(this.name!=None) import sqlContext._ val rdd:RDD[String] = sc.textFile(this.location.get) val rddT:RDD[T] = rdd.map(extractor) val schemaRDD:SchemaRDD= createSchemaRDD(rddT) schemaRDD.registerAsTable(name.get) val all = sqlContext.sql(sqlQuery) all } } I use it as below: def extractor(line:String):POJO={ val splits= line.split(pattern).toList POJO(splits(0),splits(1),splits(2),splits(3)) } val pojoTable:SparkTable[POJO] = new SparkTable[POJO](sqlContext,extractor) val identityData:SchemaRDD= pojoTable.atLocation(hdfs://location/table) .withName(pojo) .makeRDD(SELECT * FROM pojo) I get compilation failure inferred type arguments [T] do not conform to method createSchemaRDD's type parameter bounds [A : Product] [error] val schemaRDD:SchemaRDD= createSchemaRDD(rddT) [error] ^ [error] SparkTable.scala:37: type mismatch; [error] found : org.apache.spark.rdd.RDD[T] [error] required: org.apache.spark.rdd.RDD[A] [error] val schemaRDD:SchemaRDD= createSchemaRDD(rddT) [error] ^ [error] two errors found I am probably missing something basic either in scala reflection/types or implicits? Any hints would be appreciated. Thanks Amit