Re: SQLCtx cacheTable

2014-08-05 Thread Gurvinder Singh
On 08/04/2014 10:57 PM, Michael Armbrust wrote:
 If mesos is allocating a container that is exactly the same as the max
 heap size then that is leaving no buffer space for non-heap JVM memory,
 which seems wrong to me.
 
This can be a cause. I am now wondering how mesos pick up the size and
setup the -Xmx parameter.
 The problem here is that cacheTable is more aggressive about grabbing
 large ByteBuffers during caching (which it later releases when it knows
 the exact size of the data)  There is a discussion here about trying to
 improve this: https://issues.apache.org/jira/browse/SPARK-2650
 
I am not sure if this issue is the one which is causing issue for us. As
we have approx 60GB of cached data size, where as each executor memory
is 17GB and there are 15 of them so in total 255GB which is way more
than cached data of 60GB.

Any suggestions as where to look for changing the mesos setting in this
case.

- Gurvinder
 
 On Sun, Aug 3, 2014 at 11:35 PM, Gurvinder Singh
 gurvinder.si...@uninett.no mailto:gurvinder.si...@uninett.no wrote:
 
 On 08/03/2014 02:33 AM, Michael Armbrust wrote:
  I am not a mesos expert... but it sounds like there is some mismatch
  between the size that mesos is giving you and the maximum heap size of
  the executors (-Xmx).
 
 It seems that mesos is giving the correct size to java process. It has
 exact size set in -Xms/-Xmx params. Do you if somehow I can find which
 class or thread inside the spark jvm process is using how much memory
 and see which makes it to reach the memory limit on CacheTable case
 where as not in cache RDD case.
 
 - Gurvinder
 
  On Fri, Aug 1, 2014 at 12:07 AM, Gurvinder Singh
  gurvinder.si...@uninett.no mailto:gurvinder.si...@uninett.no
 mailto:gurvinder.si...@uninett.no
 mailto:gurvinder.si...@uninett.no wrote:
 
  It is not getting out of memory exception. I am using Mesos as
 cluster
  manager and it says when I use cacheTable that the container
 has used
  all of its allocated memory and thus kill it. I can see it in
 the logs
  on mesos-slave where executor runs. But on the web UI of spark
  application, it shows that is still have 4-5GB space left for
  caching/storing. So I am wondering how the memory is handled in
  cacheTable case. Does it reserve the memory storage and other
 parts run
  out of their memory. I also tries to change the
  spark.storage.memoryFraction but that did not help.
 
  - Gurvinder
  On 08/01/2014 08:42 AM, Michael Armbrust wrote:
   Are you getting OutOfMemoryExceptions with cacheTable? or
 what do you
   mean when you say you have to specify larger executor
 memory?  You
  might
   be running into SPARK-2650
   https://issues.apache.org/jira/browse/SPARK-2650.
  
   Is there something else you are trying to accomplish by
 setting the
   persistence level?  If you are looking for something like
  DISK_ONLY you
   can simulate that now using saveAsParquetFile and parquetFile.
  
   It is possible long term that we will automatically map the
  standard RDD
   persistence levels to these more efficient implementations
 in the
  future.
  
  
   On Thu, Jul 31, 2014 at 11:26 PM, Gurvinder Singh
   gurvinder.si...@uninett.no
 mailto:gurvinder.si...@uninett.no
 mailto:gurvinder.si...@uninett.no mailto:gurvinder.si...@uninett.no
  mailto:gurvinder.si...@uninett.no
 mailto:gurvinder.si...@uninett.no
  mailto:gurvinder.si...@uninett.no
 mailto:gurvinder.si...@uninett.no wrote:
  
   Thanks Michael for explaination. Actually I tried
 caching the
  RDD and
   making table on it. But the performance for cacheTable
 was 3X
  better
   than caching RDD. Now I know why it is better. But is it
  possible to
   add the support for persistence level into cacheTable itself
  like RDD.
   May be it is not related, but on the same size of data set,
  when I use
   cacheTable I have to specify larger executor memory than
 I need in
   case of caching RDD. Although in the storage tab on
 status web
  UI, the
   memory footprint is almost same 58.3 GB in cacheTable and
  59.7GB in
   cache RDD. Is it possible that there is some memory leak or
  cacheTable
   works differently and thus require higher memory. The
  difference is
   5GB per executor for the dataset of size 122 GB.
  
   Thanks,
   Gurvinder
   On 08/01/2014 04:42 AM, Michael Armbrust wrote:
cacheTable uses a special columnar 

Re: Can't see any thing one the storage panel of application UI

2014-08-05 Thread Akhil Das
You need to use persist or cache those rdds to appear in the Storage.
Unless you do it, those rdds will be computed again.

Thanks
Best Regards


On Tue, Aug 5, 2014 at 8:03 AM, binbinbin915 binbinbin...@live.cn wrote:

 Actually, if you don’t use method like persist or cache, it even not
 store the rdd to the disk. Every time you use this rdd, they just compute
 it from the original one.

 In logistic regression from mllib, they don't persist the changed input ,
 so I can't see the rdd from the web gui.

 I have changed the code and gained a 10x speed up.

 --
 binbinbin915
 Sent with Airmail


 --
 View this message in context: Re: Can't see any thing one the storage
 panel of application UI
 http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-see-any-thing-one-the-storage-panel-of-application-UI-tp10296p11403.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: java.lang.IllegalStateException: unread block data while running the sampe WordCount program from Eclipse

2014-08-05 Thread nightwolf
Did you ever find a sln to this problem? I'm having similar issues. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalStateException-unread-block-data-while-running-the-sampe-WordCount-program-from-Ecle-tp8388p11412.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Deployment Patterns - Automated Deployment Performance Testing

2014-08-05 Thread nightwolf
Thanks AL! 

Thats what I though. I've setup nexus to maintain spark libs and download
them when needed. 

For development purposes. Suppose we have a dev cluster. Is it possible to
run the driver program locally (on a  developers machine)? 

I..e just run the driver from the ID and have it connect to the master and
worker nodes to ship out its tasks? 

Cheers,
N



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Deployment-Patterns-Automated-Deployment-Performance-Testing-tp11000p11414.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark stream data from kafka topics and output as parquet file on HDFS

2014-08-05 Thread rafeeq s
Hi,

I am new to Apache Spark and Trying to Develop spark streaming program
to  *stream
data from kafka topics and output as parquet file on HDFS*.

Please share the *sample reference* program to stream data from kafka
topics and output as parquet file on HDFS.

Thanks in Advance.

Regards,

Rafeeq S
*(“What you do is what matters, not what you think or say or plan.” )*


Running driver/SparkContent locally

2014-08-05 Thread nightwolf
I'm trying to run a local driver (on a development machine) and have this
driver communicate with the Spark master and workers however I'm having a
few problems getting the driver to connect and run a simple job from within
an IDE. 

It all looks like it works but when I try to do something simple like a
count it falls over with a *java.lang.IllegalStateException: unread block
data*.

*The stack-trace: *

14/08/05 17:09:48 WARN TaskSetManager: Lost TID 3 (task 0.0:0)
14/08/05 17:09:48 INFO TaskSetManager: Loss was due to
java.lang.IllegalStateException: unread block data [duplicate 3]
14/08/05 17:09:48 ERROR TaskSetManager: Task 0.0:0 failed 4 times; aborting
job
14/08/05 17:09:48 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool 
14/08/05 17:09:48 INFO TaskSchedulerImpl: Cancelling stage 0
14/08/05 17:09:48 INFO DAGScheduler: Failed to run runJob at
basicOperators.scala:136
Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0.0:0 failed 4 times, most recent failure: Exception
failure in TID 3 on host hadoop-004: java.lang.IllegalStateException: unread
block data
   
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
   
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   
org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:148)
   
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
   
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
   
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1046)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1030)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:632)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:632)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:632)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
05/08/2014 5:09:45 PM INFO: parquet.hadoop.ParquetInputFormat: Total input
paths to process : 1
05/08/2014 5:09:45 PM INFO: parquet.hadoop.ParquetFileReader: reading
another 1 footers
Some error on socket 3168

Any ideas where I've gone wrong?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-driver-SparkContent-locally-tp11415.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 

Re: about spark and using machine learning model

2014-08-05 Thread Julien Naour
You can find in the following presentation a simple example of a clustering
model use to classify new incoming tweet :
https://www.youtube.com/watch?v=sPhyePwo7FA

Regards,
Julien


2014-08-05 7:08 GMT+02:00 Xiangrui Meng men...@gmail.com:

 Some extra work is needed to close the loop. One related example is
 streaming linear regression added by Jeremy very recently:


 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala

 You can use a model trained offline to serve a DStream and save the
 predictions (also a DStream) to somewhere, e.g., HDFS or stdout.

 Best,
 Xiangrui

 On Mon, Aug 4, 2014 at 9:28 PM, Hoai-Thu Vuong thuv...@gmail.com wrote:
  Hello everybody!
 
  I'm getting started with spark and mllib. I'm successful in building a
 small
  cluster and follow the tutorial. However, I would like to ask about how
 to
  use the model, which is trained by mllib. I understand that, with data we
  can training the model such as Classifier model, then use it to classify
 new
  input. Is there any case study to build a service upon spark or hdfs and
  using model (trained by above steps) and give output to user (class of
 input
  data). Thank you very much!
 
 
 
  --
  Thu.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Running driver/SparkContent locally

2014-08-05 Thread nightwolf
The code for this example is very simple;


object SparkMain extends App with Serializable {

  val conf = new SparkConf(false)
//.setAppName(cc-test)
//.setMaster(spark://hadoop-001:7077)
//.setSparkHome(/tmp)
.set(spark.driver.host, 192.168.23.108)
.set(spark.cores.max, 10)
.set(spark.executor.memory, 512M)
  val sc = new SparkContext(spark://hadoop-001:7077, cc-test, conf)

  val hc = new HiveContext(sc)

  val input = hc.hql(select * from prod_qdw.prod_sales_summary where dt =
'2013-01-01' limit 10)

  println(#Result:  + input.collect)


}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-driver-SparkContent-locally-tp11415p11418.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming at-least once guarantee

2014-08-05 Thread lalit1303
Hi Sanjeet,

I have been using spark streaming for processing of files present in S3 and
HDFS.
I am also using SQS messages for the same purpose as yours i.e. pointer to
S3 file.
As of now, I have a separate SQS job which receive message from SQS queue
and gets the corresponding file from S3.
Now, I wasnt to integrate the SQS receiver with spark streaming. Like, my
spark streaming job would listen for new SQS messages and proceed
accordingly.
I was wondering if you find any solution to this. Please let me know in
case!!

In your above approach, you can achieve #4 in the following way:
When you are passing a forEach function to be applied on each RDD of
Dstream, you can pass information of SQS message (lke receipthandle for
deleting message) associated with that particualar file.
After success/failure in processing you can perform deletion of your SQS
message accordingly.


Thanks
--Lalit



-
Lalit Yadav
la...@sigmoidanalytics.com
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-at-least-once-guarantee-tp10902p11419.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java.lang.StackOverflowError

2014-08-05 Thread Chengi Liu
Hi,
  I am doing some basic preprocessing in pyspark (local mode as follows):

files = [ input files]
def read(filename,sc):
  #process file
  return rdd

if __name__ ==__main__:
   conf = SparkConf()
  conf.setMaster('local')
  sc = SparkContext(conf =conf)
  sc.setCheckpointDir(root+temp/)

  data = sc.parallelize([])

  for i,f in enumerate(files):

data = data.union(read(f,sc))
if i ==20:
  data.checkpoint()
  data.count()
if i == 500:break
  #print data.count()
  #rdd_1 = read(files[0],sc)
  data.saveAsTextFile(root+output/)


But I see this error:
  keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
  File
/Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 538, in __call__
  File
/Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
o9564.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
serialization failed: java.lang.StackOverflowError
java.io.Bits.putInt(Bits.java:93)
java.io.ObjectOutputStream$BlockDataOutputStream.writeInt(ObjectOutputStream.java:1927)


Re: Bad Digest error while doing aws s3 put

2014-08-05 Thread lmk
Is it possible that the Content-MD5 changes during multipart upload to s3?
But even then, it succeeds if I increase the cluster configuration..

For ex.
it throws Bad Digest error after writing 48/100 files when the cluster is of
3 m3.2xlarge slaves
it throws Bad Digest error after writing 64/100 files when the cluster is of
4 m3.2xlarge slaves
it throws Bad Digest error after writing 86/100 files when the cluster is of
5 m3.2xlarge slaves
it succeeds writing all the 100 files when the cluster is of 6 m3.2xlarge
slaves..

Please clarify.

Regards,
lmk




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bad-Digest-error-while-doing-aws-s3-put-tp10036p11421.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark stream data from kafka topics and output as parquet file on HDFS

2014-08-05 Thread Dibyendu Bhattacharya
You can try this Kafka Spark Consumer which I recently wrote. This uses the
Low Level Kafka Consumer

https://github.com/dibbhatt/kafka-spark-consumer

Dibyendu




On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s rafeeq.ec...@gmail.com wrote:

 Hi,

 I am new to Apache Spark and Trying to Develop spark streaming program to  
 *stream
 data from kafka topics and output as parquet file on HDFS*.

 Please share the *sample reference* program to stream data from kafka
 topics and output as parquet file on HDFS.

 Thanks in Advance.

 Regards,

 Rafeeq S
 *(“What you do is what matters, not what you think or say or plan.” )*




Re: Low Level Kafka Consumer for Spark

2014-08-05 Thread Dibyendu Bhattacharya
Thanks Jonathan,

Yes, till non-ZK based offset management is available in Kafka, I need to
maintain the offset in ZK. And yes, both cases explicit commit is
necessary. I modified the Low Level Kafka Spark Consumer little bit to have
Receiver spawns threads for every partition of the topic and perform the
'store' operation in multiple threads. It would be good if the
receiver.store methods are made thread safe..which is not now presently .

Waiting for TD's comment on this Kafka Spark Low Level consumer.


Regards,
Dibyendu



On Tue, Aug 5, 2014 at 5:32 AM, Jonathan Hodges hodg...@gmail.com wrote:

 Hi Yan,

 That is a good suggestion.  I believe non-Zookeeper offset management will
 be a feature in the upcoming Kafka 0.8.2 release tentatively scheduled for
 September.


 https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management

 That should make this fairly easy to implement, but it will still require
 explicit offset commits to avoid data loss which is different than the
 current KafkaUtils implementation.

 Jonathan





 On Mon, Aug 4, 2014 at 4:51 PM, Yan Fang yanfang...@gmail.com wrote:

 Another suggestion that may help is that, you can consider use Kafka to
 store the latest offset instead of Zookeeper. There are at least two
 benefits: 1) lower the workload of ZK 2) support replay from certain
 offset. This is how Samza http://samza.incubator.apache.org/ deals
 with the Kafka offset, the doc is here
 http://samza.incubator.apache.org/learn/documentation/0.7.0/container/checkpointing.html
  .
 Thank you.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Sun, Aug 3, 2014 at 8:59 PM, Patrick Wendell pwend...@gmail.com
 wrote:

 I'll let TD chime on on this one, but I'm guessing this would be a
 welcome addition. It's great to see community effort on adding new
 streams/receivers, adding a Java API for receivers was something we did
 specifically to allow this :)

 - Patrick


 On Sat, Aug 2, 2014 at 10:09 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi,

 I have implemented a Low Level Kafka Consumer for Spark Streaming using
 Kafka Simple Consumer API. This API will give better control over the Kafka
 offset management and recovery from failures. As the present Spark
 KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better
 control over the offset management which is not possible in Kafka HighLevel
 consumer.

 This Project is available in below Repo :

 https://github.com/dibbhatt/kafka-spark-consumer


 I have implemented a Custom Receiver
 consumer.kafka.client.KafkaReceiver. The KafkaReceiver uses low level Kafka
 Consumer API (implemented in consumer.kafka packages) to fetch messages
 from Kafka and 'store' it in Spark.

 The logic will detect number of partitions for a topic and spawn that
 many threads (Individual instances of Consumers). Kafka Consumer uses
 Zookeeper for storing the latest offset for individual partitions, which
 will help to recover in case of failure. The Kafka Consumer logic is
 tolerant to ZK Failures, Kafka Leader of Partition changes, Kafka broker
 failures,  recovery from offset errors and other fail-over aspects.

 The consumer.kafka.client.Consumer is the sample Consumer which uses
 this Kafka Receivers to generate DStreams from Kafka and apply a Output
 operation for every messages of the RDD.

 We are planning to use this Kafka Spark Consumer to perform Near Real
 Time Indexing of Kafka Messages to target Search Cluster and also Near Real
 Time Aggregation using target NoSQL storage.

 Kindly let me know your view. Also if this looks good, can I contribute
 to Spark Streaming project.

 Regards,
 Dibyendu







Re: Spark stream data from kafka topics and output as parquet file on HDFS

2014-08-05 Thread rafeeq s
Thanks Dibyendu.

1. Spark itself have api jar for kafka, still we require manual offset
management (using simple consumer concept) and manual consumer ?
2.Kafka Spark Consumer which is implemented in kafka 0.8.0 ,Can we use it
for kafka 0.8.1 ?
3.How to use Kafka Spark Consumer to produce output

*as parquet file on HDFS ?*

*Please give your suggestion.*

Regards,

Rafeeq S
*(“What you do is what matters, not what you think or say or plan.” )*



On Tue, Aug 5, 2014 at 11:55 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 You can try this Kafka Spark Consumer which I recently wrote. This uses
 the Low Level Kafka Consumer

 https://github.com/dibbhatt/kafka-spark-consumer

 Dibyendu




 On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s rafeeq.ec...@gmail.com wrote:

 Hi,

 I am new to Apache Spark and Trying to Develop spark streaming program
 to  *stream data from kafka topics and output as parquet file on HDFS*.

 Please share the *sample reference* program to stream data from kafka
 topics and output as parquet file on HDFS.

 Thanks in Advance.

 Regards,

 Rafeeq S
 *(“What you do is what matters, not what you think or say or plan.” )*





Running Hive UDF from spark-shell fails due to datatype issue

2014-08-05 Thread visakh
Hi,

I'm running Hive 0.13.1 and the latest master branch of Spark (built with
SPARK_HIVE=true). I'm trying to compute Jaccard similarity using the Hive
UDF from Brickhouse
(https://github.com/klout/brickhouse/blob/master/src/main/java/brickhouse/udf/sketch/SetSimilarityUDF.java).
 

*Hive table data:*
hive select * from test_1;
1   [rock,pop]
2   [metal,rock]

*DDL*
create table test_1
(id int, val arraystring);

From spark-shell, I am executing the following commands:

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.hql(CREATE TEMPORARY FUNCTION jaccard_similarity AS
'brickhouse.udf.sketch.SetSimilarityUDF')
hiveContext.hql(select jaccard_similarity(a.val, b.val) from test_1 a join
test_1 b)

I get the following error:

warning: there were 1 deprecation warning(s); re-run with -deprecation for
details
14/08/05 13:54:53 INFO ParseDriver: Parsing command: select
jaccard_similarity(a.val, b.val) from test_1 a join test_1 b
14/08/05 13:54:53 INFO ParseDriver: Parse Completed
14/08/05 13:54:53 INFO HiveMetaStore: 0: get_table : db=default tbl=test_1
14/08/05 13:54:53 INFO audit: ugi=chandrv1  ip=unknown-ip-addr 
cmd=get_table : db=default tbl=test_1   
14/08/05 13:54:53 INFO HiveMetaStore: 0: get_table : db=default tbl=test_1
14/08/05 13:54:53 INFO audit: ugi=chandrv1  ip=unknown-ip-addr 
cmd=get_table : db=default tbl=test_1   
scala.MatchError: ArrayType(StringType,false) (of class
org.apache.spark.sql.catalyst.types.ArrayType)
at
org.apache.spark.sql.hive.HiveInspectors$typeInfoConversions.toTypeInfo(HiveInspectors.scala:216)
at
org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$2.apply(hiveUdfs.scala:52)
at
org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$2.apply(hiveUdfs.scala:52)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:52)
at
org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:253)
at
org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:41)
at
org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:41)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:41)
at
org.apache.spark.sql.hive.HiveContext$$anon$3.lookupFunction(HiveContext.scala:253)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5$$anonfun$applyOrElse$3.applyOrElse(Analyzer.scala:131)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5$$anonfun$applyOrElse$3.applyOrElse(Analyzer.scala:129)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionDown$1(QueryPlan.scala:52)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1$$anonfun$apply$1.apply(QueryPlan.scala:66)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at 

Understanding RDD.GroupBy OutOfMemory Exceptions

2014-08-05 Thread Jens Kristian Geyti
I'm doing a simple groupBy on a fairly small dataset (80 files in HDFS, few
gigs in total, line based, 500-2000 chars per line). I'm running Spark on 8
low-memory machines in a yarn cluster, i.e. something along the lines of:
spark-submit ... --master yarn-client --num-executors 8
--executor-memory 3000m --executor-cores 1
I'm trying to do a simple groupByKey (see below), but it fails with a
java.lang.OutOfMemoryError: GC overhead limit exceeded exception
val keyvals = sc.newAPIHadoopFile(hdfs://...).map(
someobj.produceKeyValTuple )
keyvals.groupByKey().count()
I can count the group sizes using reduceByKey without problems, ensuring
myself the problem isn't caused by a single excessively large group, nor by
an excessive amount of groups :
  keyvals.map(s = (s._1, 1)).reduceByKey((a,b) =
a+b).collect().foreach(println)  // produces:  //  (key1,139368)  // 
(key2,35335)  //  (key3,392744)  //  ...  //  (key13,197941)
I've tried reformatting, reshuffling and increasing the groupBy level of
parallelism:
  keyvals.groupByKey(24).count // fails  keyvals.groupByKey(3000).count //
fails  keyvals.coalesce(24, true).groupByKey(24).count // fails 
keyvals.coalesce(3000, true).groupByKey(3000).count // fails
I've tried playing around with spark.default.parallelism, and increasing
spark.shuffle.memoryFraction to 0.8 while lowering
spark.storage.memoryFraction to 0.1
The failing stage (count) will fail on task 2999 of 3000.
I can't seem to find anything that suggests that groupBy shouldn't just
spill to disk instead of keeping things in memory, but I just can't get it
to work right, even on fairly small datasets. This should obviosuly not be
the case, and I must be doing something wrong, but I have no idea where to
start debugging this, or even trying to understand what's going on - for the
same reason, I'm not looking for a solution to my specific problem, as much
as I'm looking for insight into how to reliably group datasets in Spark.
Notice that I've also posted this question to SO, before realising this
mailing list is more active. I will update the SO thread, if I receive an
answer here.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-tp11427.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Setting spark.executor.memory problem

2014-08-05 Thread Grzegorz Białek
Hi,

I wanted to make simple Spark app running in local mode with 2g
spark.executor.memory and 1g for caching. But following code:

  val conf = new SparkConf()
.setMaster(local)
.setAppName(app)
.set(spark.executor.memory, 2g)
.set(spark.storage.memoryFraction, 0.5)
  val sc = new SparkContext(conf)

doesn't work. In spark UI this variables are set properly but memory store
is around 0.5 * 512MB (default spark.executor.memory) not 0.5 * 2GB:

14/08/05 15:34:00 INFO MemoryStore: MemoryStore started with capacity 245.8
MB.

I have neither spark-defaults.conf nor spark-env.sh in my $SPARK_HOME/conf
directory. I use Spark 1.0.0
How can I set this values properly?

Thanks,
Grzegorz


RE: Spark stream data from kafka topics and output as parquet file on HDFS

2014-08-05 Thread Shao, Saisai
Hi Rafeeq,

I think current Spark Streaming api can offer you the ability to fetch data 
from Kafka and store to another external store, if you do not care about 
management of consumer offset manually, there’s no need to use low level api as 
SimpleConsumer.

For Kafka 0.8.1 compatibility, you can try to modify the pom file and rebuild 
Spark to try it, mostly I think it can work.

For parquet file, I think if parquet offers its own OutputFormat that is 
extended from Hadoop’s OutputFormat, Spark can write data into parquet file, 
like sequence file or text file, you can do this as:

DStream.foreach { rdd = rdd.saveAsHadoopFile(…) } to specify the OutputFormat 
you want.

Thanks
Jerry

From: rafeeq s [mailto:rafeeq.ec...@gmail.com]
Sent: Tuesday, August 05, 2014 5:37 PM
To: Dibyendu Bhattacharya
Cc: u...@spark.incubator.apache.org
Subject: Re: Spark stream data from kafka topics and output as parquet file on 
HDFS

Thanks Dibyendu.
1. Spark itself have api jar for kafka, still we require manual offset 
management (using simple consumer concept) and manual consumer ?
2.Kafka Spark Consumer which is implemented in kafka 0.8.0 ,Can we use it for 
kafka 0.8.1 ?
3.How to use Kafka Spark Consumer to produce output as parquet file on HDFS ?
Please give your suggestion.

Regards,
Rafeeq S
(“What you do is what matters, not what you think or say or plan.” )


On Tue, Aug 5, 2014 at 11:55 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.commailto:dibyendu.bhattach...@gmail.com wrote:
You can try this Kafka Spark Consumer which I recently wrote. This uses the Low 
Level Kafka Consumer

https://github.com/dibbhatt/kafka-spark-consumer

Dibyendu



On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s 
rafeeq.ec...@gmail.commailto:rafeeq.ec...@gmail.com wrote:
Hi,

I am new to Apache Spark and Trying to Develop spark streaming program to  
stream data from kafka topics and output as parquet file on HDFS.
Please share the sample reference program to stream data from kafka topics and 
output as parquet file on HDFS.
Thanks in Advance.

Regards,
Rafeeq S
(“What you do is what matters, not what you think or say or plan.” )





Re: spark sql left join gives KryoException: Buffer overflow

2014-08-05 Thread Dima Zhiyanov
I am also experiencing this kryo buffer problem. My join is left outer with
under 40mb on the right side. I would expect the broadcast join to succeed
in this case (hive did)
Another problem is that the optimizer 
chose nested loop join for some reason
I would expect broadcast (map side) hash join. 
Am I correct in my expectations?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-left-join-gives-KryoException-Buffer-overflow-tp10157p11432.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark sql left join gives KryoException: Buffer overflow

2014-08-05 Thread Dima Zhiyanov
Yes

Sent from my iPhone

 On Aug 5, 2014, at 7:38 AM, Dima Zhiyanov [via Apache Spark User List] 
 ml-node+s1001560n11432...@n3.nabble.com wrote:
 
 I am also experiencing this kryo buffer problem. My join is left outer with 
 under 40mb on the right side. I would expect the broadcast join to succeed 
 in this case (hive did) 
 Another problem is that the optimizer 
 chose nested loop join for some reason 
 I would expect broadcast (map side) hash join. 
 Am I correct in my expectations? 
 
 
 If you reply to this email, your message will be added to the discussion 
 below:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-left-join-gives-KryoException-Buffer-overflow-tp10157p11432.html
 To unsubscribe from spark sql left join gives KryoException: Buffer overflow, 
 click here.
 NAML




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-left-join-gives-KryoException-Buffer-overflow-tp10157p11433.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

master=local vs master=local[*]

2014-08-05 Thread Grzegorz Białek
Hi,

I have Spark application which computes join of two RDDs. One contains
around 150MB of data (7 million entries) second around 1,5MB (80 thousand
entries) and
result of this join contains 50MB of data (2 million entries).

When I run it on one core (with master=local) it works correctly (whole
process uses between 600 and 700MB of memory) but when I run it on all
cores (with master=local[*]) it throws:
java.lang.OutOfMemoryError: GC overhead limit exceeded
and sometimes
java.lang.OutOfMemoryError: Java heap space

I have set spark.executor.memory=512m (default value).

Does anyone know why above occurs?

Thanks,
Grzegorz


Spark SQL Thrift Server

2014-08-05 Thread John Omernik
I gave things working on my cluster with the sparksql thrift server. (Thank
you Yin Huai at Databricks!)

That said, I was curious how I can cache a table via my instance here?  I
tried the shark like create table table_cached as select * from table and
that did not create a cached table.  cacheTable(table) didn't parse in
beeline.

Any thoughts?  Any pointers to documentation (*crosses fingers)?


Re: Spark SQL Thrift Server

2014-08-05 Thread Michael Armbrust
We are working on an overhaul of the docs before the 1.1 release.  In the
mean time try: CACHE TABLE tableName.


On Tue, Aug 5, 2014 at 9:02 AM, John Omernik j...@omernik.com wrote:

 I gave things working on my cluster with the sparksql thrift server.
 (Thank you Yin Huai at Databricks!)

 That said, I was curious how I can cache a table via my instance here?  I
 tried the shark like create table table_cached as select * from table and
 that did not create a cached table.  cacheTable(table) didn't parse in
 beeline.

 Any thoughts?  Any pointers to documentation (*crosses fingers)?



Re: spark sql left join gives KryoException: Buffer overflow

2014-08-05 Thread Michael Armbrust
For outer joins I'd recommend upgrading to master or waiting for a 1.1
release candidate (which should be out this week).


On Tue, Aug 5, 2014 at 7:38 AM, Dima Zhiyanov dimazhiya...@hotmail.com
wrote:

 I am also experiencing this kryo buffer problem. My join is left outer with
 under 40mb on the right side. I would expect the broadcast join to succeed
 in this case (hive did)
 Another problem is that the optimizer
 chose nested loop join for some reason
 I would expect broadcast (map side) hash join.
 Am I correct in my expectations?




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-left-join-gives-KryoException-Buffer-overflow-tp10157p11432.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: master=local vs master=local[*]

2014-08-05 Thread Andre Bois-Crettez

The more cores you have, the less memory they will get.
512M is already quite small, and if you have 4 cores it will mean
roughly 128M per task.
Sometimes it is interesting to have less cores and more memory.

how many cores do you have ?

André

On 2014-08-05 16:43, Grzegorz Białek wrote:

Hi,

I have Spark application which computes join of two RDDs. One contains
around 150MB of data (7 million entries) second around 1,5MB (80
thousand entries) and
result of this join contains 50MB of data (2 million entries).

When I run it on one core (with master=local) it works correctly
(whole process uses between 600 and 700MB of memory) but when I run it
on all cores (with master=local[*]) it throws:
java.lang.OutOfMemoryError: GC overhead limit exceeded
and sometimes
java.lang.OutOfMemoryError: Java heap space

I have set spark.executor.memory=512m (default value).

Does anyone know why above occurs?

Thanks,
Grzegorz



--
André Bois-Crettez

Software Architect
Big Data Developer
http://www.kelkoo.com/


Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can't see any thing one the storage panel of application UI

2014-08-05 Thread Andrew Or
Ah yes, Spark doesn't cache all of your RDDs by default. It turns out that
caching things too aggressively can lead to suboptimal performance because
there might be a lot of churn. If you don't call persist or cache then your
RDDs won't actually be cached. Note that even once they're cached they can
still be kicked out by LRU, however.

-Andrew


2014-08-05 0:13 GMT-07:00 Akhil Das ak...@sigmoidanalytics.com:

 You need to use persist or cache those rdds to appear in the Storage.
 Unless you do it, those rdds will be computed again.

 Thanks
 Best Regards


 On Tue, Aug 5, 2014 at 8:03 AM, binbinbin915 binbinbin...@live.cn wrote:

  Actually, if you don’t use method like persist or cache, it even not
 store the rdd to the disk. Every time you use this rdd, they just compute
 it from the original one.

 In logistic regression from mllib, they don't persist the changed input ,
 so I can't see the rdd from the web gui.

 I have changed the code and gained a 10x speed up.

 --
 binbinbin915
 Sent with Airmail


 --
 View this message in context: Re: Can't see any thing one the storage
 panel of application UI
 http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-see-any-thing-one-the-storage-panel-of-application-UI-tp10296p11403.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.





Re: Setting spark.executor.memory problem

2014-08-05 Thread Andrew Or
Hi Grzegorz,

For local mode you only have one executor, and this executor is your
driver, so you need to set the driver's memory instead. *That said, in
local mode, by the time you run spark-submit, a JVM has already been
launched with the default memory settings, so setting spark.driver.memory
in your conf won't actually do anything for you. Instead, you need to run
spark-submit as follows

bin/spark-submit --driver-memory 2g --class your.class.here app.jar

This will start the JVM with 2G instead of the default 512M.

-Andrew


2014-08-05 6:43 GMT-07:00 Grzegorz Białek grzegorz.bia...@codilime.com:

 Hi,

 I wanted to make simple Spark app running in local mode with 2g
 spark.executor.memory and 1g for caching. But following code:

   val conf = new SparkConf()
 .setMaster(local)
 .setAppName(app)
 .set(spark.executor.memory, 2g)
 .set(spark.storage.memoryFraction, 0.5)
   val sc = new SparkContext(conf)

 doesn't work. In spark UI this variables are set properly but memory store
 is around 0.5 * 512MB (default spark.executor.memory) not 0.5 * 2GB:

 14/08/05 15:34:00 INFO MemoryStore: MemoryStore started with capacity
 245.8 MB.

 I have neither spark-defaults.conf nor spark-env.sh in my $SPARK_HOME/conf
 directory. I use Spark 1.0.0
 How can I set this values properly?

 Thanks,
 Grzegorz




Re: Setting spark.executor.memory problem

2014-08-05 Thread Andrew Or
(Clarification: you'll need to pass in --driver-memory not just for local
mode, but for any application you're launching with client deploy mode)


2014-08-05 9:24 GMT-07:00 Andrew Or and...@databricks.com:

 Hi Grzegorz,

 For local mode you only have one executor, and this executor is your
 driver, so you need to set the driver's memory instead. *That said, in
 local mode, by the time you run spark-submit, a JVM has already been
 launched with the default memory settings, so setting spark.driver.memory
 in your conf won't actually do anything for you. Instead, you need to run
 spark-submit as follows

 bin/spark-submit --driver-memory 2g --class your.class.here app.jar

 This will start the JVM with 2G instead of the default 512M.

 -Andrew


 2014-08-05 6:43 GMT-07:00 Grzegorz Białek grzegorz.bia...@codilime.com:

 Hi,

 I wanted to make simple Spark app running in local mode with 2g
 spark.executor.memory and 1g for caching. But following code:

   val conf = new SparkConf()
 .setMaster(local)
 .setAppName(app)
 .set(spark.executor.memory, 2g)
 .set(spark.storage.memoryFraction, 0.5)
   val sc = new SparkContext(conf)

 doesn't work. In spark UI this variables are set properly but memory store
 is around 0.5 * 512MB (default spark.executor.memory) not 0.5 * 2GB:

 14/08/05 15:34:00 INFO MemoryStore: MemoryStore started with capacity
 245.8 MB.

 I have neither spark-defaults.conf nor spark-env.sh in my
 $SPARK_HOME/conf directory. I use Spark 1.0.0
 How can I set this values properly?

 Thanks,
 Grzegorz





Re: Gradient Boosted Machines

2014-08-05 Thread Manish Amde
Hi Daniel,

Thanks a lot for your interest. Gradient boosting and AdaBoost algorithms
are under active development and should be a part of release 1.2.

-Manish


On Mon, Jul 14, 2014 at 11:24 AM, Daniel Bendavid 
daniel.benda...@creditkarma.com wrote:

  Hi,

  My company is strongly considering implementing a recommendation engine
 that is built off of statistical models using Spark.  We attended the Spark
 Summit and were incredibly impressed with the technology and the entire
 community.  Since then, we have been exploring the technology and
 determining how we could use it for our specific needs.

  One algorithm that we ideally want to use as part of our project is
 Gradient Boosted Machines.  We are aware that they have not yet been
 implemented in MLib and would like to submit our request that they be
 considered for future implementation.  Additionally, we would love to see
 the AdaBoost algorithm implemented in Mlib and Feature Preprocessing
 implemented in Python (as it already exists for Scala).

  Otherwise, thank you for taking our feedback and for providing us with
 this incredible technology.

  Daniel



Re: Writing to RabbitMQ

2014-08-05 Thread jschindler
You are correct in that I am trying to publish inside of a foreachRDD loop. 
I am currently refactoring and will try publishing inside the
foreachPartition loop.  Below is the code showing the way it is currently
written, thanks!


object myData {
  def main(args: Array[String]) {

val ssc = new StreamingContext(local[8], Data, Seconds(10))
ssc.checkpoint(checkpoint)
val topicMap = Map(pagehit.data - 1)

val factory = new ConnectionFactory()
factory.setUsername(officialUsername)
factory.setPassword(crypticPassword)
factory.setVirtualHost(/)
factory.setHost(rabbit-env)
factory.setPort()
val connection = factory.newConnection()

val SQLChannel = connection.createChannel()
SQLChannel.queueDeclare(SQLQueue, true, false, false, null) 

val Pipe = KafkaUtils.createStream(ssc,
Zookeeper_1,Zookeeper_1,Zookeeper_3, Cons1,  
topicMap).map(_._2)

//PARSE SOME JSON ETC

  windowStream.foreachRDD(pagehit = {
  val mongoClient = MongoClient(my-mongodb)
  val db = mongoClient(myClient)
  val SQLCollection = db(SQLCalls)
  
  val callArray = pagehit.map(_._1).collect
  val avg = (callArray.reduceLeft[Long](_+_))/callArray.length
  val URL = pagehit.take(1).map(_._2)

  SQLCollection += MongoDBObject(URL - URL(0).substring(7,
URL(0).length - 1),
 Avg Page Load 
Time - avg)

  val toBuildJSON = Seq(baseMsg, avg.toString, closingBrace)
  val byteArray = toBuildJSON.mkString.getBytes()

  SQLChannel.basicPublish(, SQLQueue, null, byteArray)

})



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p11445.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java.lang.IllegalArgumentException: Unable to create serializer com.esotericsoftware.kryo.serializers.FieldSerializer

2014-08-05 Thread Sameer Tilak
Hi All,
I am trying to move away from spark-shell to spark-submit and have been making 
some code changes.  However, I am now having problem with serialization. It 
used to work fine before the code update. Not sure what I did wrong. However, 
here is the code
JaccardScore.scala
package approxstrmatch
class JaccardScore {
  val mjc = new Jaccard() with Serializable
 def main(args: Array[String]) {
  val conf = new 
SparkConf().setAppName(ApproxStrMatch).set(spark.storage.memoryFraction, 
0.0)  conf.set(spark.serializer, 
org.apache.spark.serializer.KryoSerializer)  
conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator)
   val sc = new SparkContext(conf)   // More code here…   
score.calculateSortedJaccardScore(srcFile, distFile)   sc.stop()
  }
def calculateSortedJaccardScore (sourcerdd: RDD[String], destrdd: RDD[String])  
{ // Code over here…}

MyRegistrator.scala: This is the central place for registering all the classes.
package approxstrmatch
import com.wcohen.ss.BasicStringWrapper;import com.wcohen.ss.Jaccard;import 
com.wcohen.ss.Level2MongeElkan;import com.wcohen.ss.Levenstein;import 
com.wcohen.ss.ScaledLevenstein;import com.wcohen.ss.Jaro;import 
com.wcohen.ss.JensenShannonDistance;
import com.esotericsoftware.kryo.Kryo// import 
org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator}
class MyRegistrator extends KryoRegistrator {  override def 
registerClasses(kryo: Kryo) { 
kryo.register(classOf[approxstrmatch.JaccardScore])
kryo.register(classOf[com.wcohen.ss.BasicStringWrapper])
kryo.register(classOf[com.wcohen.ss.Jaccard]) // Bunch of other registrations 
here. 

  }}
I run it as:
spark-submit --class approxstrmatch.JaccardDriver --master local 
--executor-memory 8G 
/apps/sameert/software/approxstrmatch/target/scala-2.10/classes/approxstrmatch_2.10-1.0.jar
I get the following error message:
java.lang.IllegalArgumentException: Unable to create serializer 
com.esotericsoftware.kryo.serializers.FieldSerializer for class: 
approxstrmatch.JaccardScoreat 
com.esotericsoftware.kryo.Kryo.newSerializer(Kryo.java:335)at 
com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:314)at 
com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:49)at 
com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:307)at 
com.esotericsoftware.kryo.Kryo.register(Kryo.java:351)at 
approxstrmatch.MyRegistrator.registerClasses(MyRegistrator.scala:18)at 
org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:77)at
 
org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:73)at
 scala.Option.foreach(Option.scala:236)at 
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:73)at 
org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:130)at
 
org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:92)at
 
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:995)at
 org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:80)at 
org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:66)at 
org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:847)at 
org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:205)at 
org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:76)at 
org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92)at 
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:661)at 
org.apache.spark.storage.BlockManager.put(BlockManager.scala:546)at 
org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:812)at 
org.apache.spark.broadcast.HttpBroadcast.init(HttpBroadcast.scala:52)at 
org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35)


graph reduceByKey

2014-08-05 Thread Omer Holzinger
Hey all!

I'm a total beginner with spark / hadoop / graph computation so please
excuse my beginner question.

I've created a graph, using graphx. Now, for every vertex, I want to get
all its second degree neighbors.
so if my graph is:
v1 -- v2
v1 -- v4
v1 -- v6

I want to get something like:
v2 -- v4
v2 -- v6
v4 -- v2
v4 -- v6
v6 -- v2
v6 -- v4

Does anyone have advice on what will be the best way to do that over a
graph instance?
I attempted to do it using mapReduceTriplets but I need the reduce function
to work like reduceByKey, which I wasn't able to do.

Thank you.

  -- Omer


Problem running Spark shell (1.0.0) on EMR

2014-08-05 Thread Omer Holzinger
I'm having similar problem to:
http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/browser

I'm trying to follow the tutorial at:

When I run: val file = sc.textFile(s3://bigdatademo/sample/wiki/)

I get:

WARN storage.BlockManager: Putting block broadcast_1 failed
java.lang.NoSuchMethodError:
com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;

I found a few other people raising this issue, but wasn't able to find a
solution or an explanation.
Have anyone encountered this? Any help or advice will be highly appreciated!

thank you,
  -- Omer


pyspark inferSchema

2014-08-05 Thread Brad Miller
Hi All,

I have a data set where each record is serialized using JSON, and I'm
interested to use SchemaRDDs to work with the data.  Unfortunately I've hit
a snag since some fields in the data are maps and list, and are not
guaranteed to be populated for each record.  This seems to cause
inferSchema to throw an error:

Produces error:
srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]},
{'foo':'boom', 'baz':[1,2,3]}]))

Works fine:
srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]},
{'foo':'boom', 'baz':[]}]))

To be fair inferSchema says it peeks at the first row, so a possible
work-around would be to make sure the type of any collection can be
determined using the first instance.  However, I don't believe that items
in an RDD are guaranteed to remain in an ordered, so this approach seems
somewhat brittle.

Does anybody know a robust solution to this problem in PySpark?  I'm am
running the 1.0.1 release.

-Brad


Re: pyspark inferSchema

2014-08-05 Thread Nicholas Chammas
I was just about to ask about this.

Currently, there are two methods, sqlContext.jsonFile() and
sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers
the whole data set.

For example:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
 a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', 
 '{foo:boom, baz:[1,2,3]}'])) a.printSchema()
root
 |-- baz: array (nullable = true)
 ||-- element: integer (containsNull = false)
 |-- foo: string (nullable = true)

It works really well! It handles fields with inconsistent value types by
inferring a value type that covers all the possible values.

But say you’ve already deserialized the JSON to do some pre-processing or
filtering. You’d commonly want to do this, say, to remove bad data. So now
you have an RDD of Python dictionaries, as opposed to an RDD of JSON
strings. It would be perfect if you could get the completeness of the
json...() methods, but against dictionaries.

Unfortunately, as you noted, inferSchema() only looks at the first element
in the set. Furthermore, inferring schemata from RDDs of dictionaries is being
deprecated https://issues.apache.org/jira/browse/SPARK-2010 in favor of
doing so from RDDs of Rows.

I’m not sure what the intention behind this move is, but as a user I’d like
to be able to convert RDDs of dictionaries directly to SchemaRDDs with the
completeness of the jsonRDD()/jsonFile() methods. Right now if I really
want that, I have to serialize the dictionaries to JSON text and then call
jsonRDD(), which is expensive.

Nick
​


On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu
wrote:

 Hi All,

 I have a data set where each record is serialized using JSON, and I'm
 interested to use SchemaRDDs to work with the data.  Unfortunately I've hit
 a snag since some fields in the data are maps and list, and are not
 guaranteed to be populated for each record.  This seems to cause
 inferSchema to throw an error:

 Produces error:
 srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]},
 {'foo':'boom', 'baz':[1,2,3]}]))

 Works fine:
 srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]},
 {'foo':'boom', 'baz':[]}]))

 To be fair inferSchema says it peeks at the first row, so a possible
 work-around would be to make sure the type of any collection can be
 determined using the first instance.  However, I don't believe that items
 in an RDD are guaranteed to remain in an ordered, so this approach seems
 somewhat brittle.

 Does anybody know a robust solution to this problem in PySpark?  I'm am
 running the 1.0.1 release.

 -Brad




Spark Memory Issues

2014-08-05 Thread Sunny Khatri
Hi,

I'm trying to run a spark application with the executor-memory 3G. but I'm
running into the following error:

14/08/05 18:02:58 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[5]
at map at KMeans.scala:123), which has no missing parents
14/08/05 18:02:58 INFO DAGScheduler: Submitting 1 missing tasks from
Stage 0 (MappedRDD[5] at map at KMeans.scala:123)
14/08/05 18:02:58 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks
14/08/05 18:02:59 INFO CoarseGrainedSchedulerBackend: Registered
executor: 
Actor[akka.tcp://sparkexecu...@test-hadoop2.vpc.natero.com:54358/user/Executor#1670455157]
with ID 2
14/08/05 18:02:59 INFO BlockManagerInfo: Registering block manager
test-hadoop2.vpc.natero.com:39156 with 1766.4 MB RAM
14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not
accepted any resources; check your cluster UI to ensure that workers
are registered and have sufficient memory
14/08/05 18:03:28 WARN YarnClusterScheduler: Initial job has not
accepted any resources; check your cluster UI to ensure that workers
are registered and have sufficient memory
14/08/05 18:03:43 WARN YarnClusterScheduler: Initial job has not
accepted any resources; check your cluster UI to ensure that workers
are registered and have sufficient memory
14/08/05 18:03:58 WARN YarnClusterScheduler: Initial job has not
accepted any resources; check your cluster UI to ensure that workers
are registered and have sufficient memory


Tried tweaking executor-memory as well, but same result. It always
gets stuck registering the block manager.


Are there any other settings that needs to be adjusted.


Thanks

Sunny


Re: Spark SQL Thrift Server

2014-08-05 Thread John Omernik
Thanks Michael.

Is there a way to specify off_heap? I.e. Tachyon via the thrift server?

Thanks!


On Tue, Aug 5, 2014 at 11:06 AM, Michael Armbrust mich...@databricks.com
wrote:

 We are working on an overhaul of the docs before the 1.1 release.  In the
 mean time try: CACHE TABLE tableName.


 On Tue, Aug 5, 2014 at 9:02 AM, John Omernik j...@omernik.com wrote:

 I gave things working on my cluster with the sparksql thrift server.
 (Thank you Yin Huai at Databricks!)

 That said, I was curious how I can cache a table via my instance here?  I
 tried the shark like create table table_cached as select * from table and
 that did not create a cached table.  cacheTable(table) didn't parse in
 beeline.

 Any thoughts?  Any pointers to documentation (*crosses fingers)?





Re: How to read from OpenTSDB using PySpark (or Scala Spark)?

2014-08-05 Thread bumble123
Thank you!! Could you give me any sample code for the receiver? I'm still new
to Spark and not quite sure how I would do that.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211p11454.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Memory Issues

2014-08-05 Thread Akhil Das
Are you able to see the job on the WebUI (8080)? If yes, how much memory
are you seeing there specifically for this job?

[image: Inline image 1]

Here you can see i have 11.8Gb RAM on both workers and my app is using
11GB.

1. What are all the memory that you are seeing in your case?
2. Make sure your application is using the same spark URI (as seen in the
top left of the webUI) while creating the SparkContext.



Thanks
Best Regards


On Tue, Aug 5, 2014 at 11:38 PM, Sunny Khatri sunny.k...@gmail.com wrote:

 Hi,

 I'm trying to run a spark application with the executor-memory 3G. but I'm
 running into the following error:

 14/08/05 18:02:58 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[5] at map 
 at KMeans.scala:123), which has no missing parents
 14/08/05 18:02:58 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 
 (MappedRDD[5] at map at KMeans.scala:123)
 14/08/05 18:02:58 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks
 14/08/05 18:02:59 INFO CoarseGrainedSchedulerBackend: Registered executor: 
 Actor[akka.tcp://sparkexecu...@test-hadoop2.vpc.natero.com:54358/user/Executor#1670455157]
  with ID 2
 14/08/05 18:02:59 INFO BlockManagerInfo: Registering block manager 
 test-hadoop2.vpc.natero.com:39156 with 1766.4 MB RAM
 14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not accepted any 
 resources; check your cluster UI to ensure that workers are registered and 
 have sufficient memory
 14/08/05 18:03:28 WARN YarnClusterScheduler: Initial job has not accepted any 
 resources; check your cluster UI to ensure that workers are registered and 
 have sufficient memory
 14/08/05 18:03:43 WARN YarnClusterScheduler: Initial job has not accepted any 
 resources; check your cluster UI to ensure that workers are registered and 
 have sufficient memory
 14/08/05 18:03:58 WARN YarnClusterScheduler: Initial job has not accepted any 
 resources; check your cluster UI to ensure that workers are registered and 
 have sufficient memory


 Tried tweaking executor-memory as well, but same result. It always gets stuck 
 registering the block manager.


 Are there any other settings that needs to be adjusted.


 Thanks

 Sunny





Re: Spark shell creating a local SparkContext instead of connecting to connecting to Spark Master

2014-08-05 Thread Akhil Das
​You can always start your spark-shell by specifying the master as

MASTER=spark://*whatever*:7077 $SPARK_HOME/bin/spark-shell​

Then it will connect to that *whatever* master.


Thanks
Best Regards


On Tue, Aug 5, 2014 at 8:51 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com
 wrote:

 Hi

 Apologies if this is a noob question. I have setup Spark 1.0.1 on EMR
 using a slightly modified version of script
 @ s3://elasticmapreduce/samples/spark/1.0.0/install-spark-shark-yarn.rb. It
 seems to be running fine with master logs stating:

 14/08/05 14:36:56 INFO Master: I have been elected leader! New state: ALIVE
 14/08/05 14:37:21 INFO Master: Registering worker
 ip-10-0-2-80.ec2.internal:52029 with 2 cores, 6.3 GB RAM

 The script has also created spark-env.sh under conf which has the
 following content:

 export SPARK_MASTER_IP=x.x.x.x
 export SCALA_HOME=/home/hadoop/.versions/scala-2.10.3
 export SPARK_LOCAL_DIRS=/mnt/spark/
 export
 SPARK_CLASSPATH=/usr/share/aws/emr/emr-fs/lib/*:/usr/share/aws/emr/lib/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar
 export SPARK_DAEMON_JAVA_OPTS=-verbose:gc -XX:+PrintGCDetails
 -XX:+PrintGCTimeStamps
 export
 SPARK_ASSEMBLY_JAR=/home/hadoop/spark/lib/spark-assembly-1.0.1-hadoop2.4.0.jar

 However, when I run the spark-shell, sc.isLocal returns true. Also, no
 matter how many RDDs I cache, the used memory in the master UI
 (x.x.x.x:7077) shows 0B used. This leads me to believe that the spark-shell
 isn't connecting to Spark master and has started a local instance of spark.
 Is there something I am missing in my setup that allows for spark-shell to
 connect to master?

 Thanks,
 Aniket



Re: Spark Memory Issues

2014-08-05 Thread Sunny Khatri
The only UI I have currently is the Application Master (Cluster mode), with
the following executor nodes status:
Executors (3)

   - *Memory:* 0.0 B Used (3.7 GB Total)
   - *Disk:* 0.0 B Used

Executor IDAddressRDD BlocksMemory UsedDisk UsedActive TasksFailed
TasksComplete
TasksTotal TasksTask TimeShuffle ReadShuffle Write1add100.0 B / 1766.4 MB0.0
B0 ms0.0 B0.0 B2add200.0 B / 1766.4 MB0.0 B0 ms0.0 B0.0 Bdriver
add300.0 B / 294.6 MB0.0 B0 ms0.0 B0.0 B


On Tue, Aug 5, 2014 at 11:32 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Are you able to see the job on the WebUI (8080)? If yes, how much memory
 are you seeing there specifically for this job?

 [image: Inline image 1]

 Here you can see i have 11.8Gb RAM on both workers and my app is using
 11GB.

 1. What are all the memory that you are seeing in your case?
 2. Make sure your application is using the same spark URI (as seen in the
 top left of the webUI) while creating the SparkContext.



 Thanks
 Best Regards


 On Tue, Aug 5, 2014 at 11:38 PM, Sunny Khatri sunny.k...@gmail.com
 wrote:

 Hi,

 I'm trying to run a spark application with the executor-memory 3G. but
 I'm running into the following error:

 14/08/05 18:02:58 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[5] at map 
 at KMeans.scala:123), which has no missing parents
 14/08/05 18:02:58 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 
 (MappedRDD[5] at map at KMeans.scala:123)
 14/08/05 18:02:58 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks
 14/08/05 18:02:59 INFO CoarseGrainedSchedulerBackend: Registered executor: 
 Actor[akka.tcp://sparkexecu...@test-hadoop2.vpc.natero.com:54358/user/Executor#1670455157]
  with ID 2
 14/08/05 18:02:59 INFO BlockManagerInfo: Registering block manager 
 test-hadoop2.vpc.natero.com:39156 with 1766.4 MB RAM
 14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not accepted 
 any resources; check your cluster UI to ensure that workers are registered 
 and have sufficient memory
 14/08/05 18:03:28 WARN YarnClusterScheduler: Initial job has not accepted 
 any resources; check your cluster UI to ensure that workers are registered 
 and have sufficient memory
 14/08/05 18:03:43 WARN YarnClusterScheduler: Initial job has not accepted 
 any resources; check your cluster UI to ensure that workers are registered 
 and have sufficient memory
 14/08/05 18:03:58 WARN YarnClusterScheduler: Initial job has not accepted 
 any resources; check your cluster UI to ensure that workers are registered 
 and have sufficient memory


 Tried tweaking executor-memory as well, but same result. It always gets 
 stuck registering the block manager.


 Are there any other settings that needs to be adjusted.


 Thanks

 Sunny






Re: pyspark inferSchema

2014-08-05 Thread Brad Miller
Hi Nick,

Thanks for the great response.

I actually already investigated jsonRDD and jsonFile, although I did not
realize they provide more complete schema inference.  I did however have
other problems with jsonRDD and jsonFile, but I will now describe in a
separate thread with an appropriate subject.

I did notice that when I run your example code, I do not receive the exact
same output.  For example, I see:

 srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}',
'{foo:boom, baz:[1,2,3]}']))
 srdd.printSchema()

root
 |-- baz: ArrayType[IntegerType]
 |-- foo: StringType


Notice the difference in the schema.  Are you running the 1.0.1 release, or
a more bleeding-edge version from the repository?

best,
-Brad


On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 I was just about to ask about this.

 Currently, there are two methods, sqlContext.jsonFile() and
 sqlContext.jsonRDD(), that work on JSON text and infer a schema that
 covers the whole data set.

 For example:

 from pyspark.sql import SQLContext
 sqlContext = SQLContext(sc)
  a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', 
  '{foo:boom, baz:[1,2,3]}'])) a.printSchema()
 root
  |-- baz: array (nullable = true)
  ||-- element: integer (containsNull = false)
  |-- foo: string (nullable = true)

 It works really well! It handles fields with inconsistent value types by
 inferring a value type that covers all the possible values.

 But say you’ve already deserialized the JSON to do some pre-processing or
 filtering. You’d commonly want to do this, say, to remove bad data. So now
 you have an RDD of Python dictionaries, as opposed to an RDD of JSON
 strings. It would be perfect if you could get the completeness of the
 json...() methods, but against dictionaries.

 Unfortunately, as you noted, inferSchema() only looks at the first
 element in the set. Furthermore, inferring schemata from RDDs of
 dictionaries is being deprecated
 https://issues.apache.org/jira/browse/SPARK-2010 in favor of doing so
 from RDDs of Rows.

 I’m not sure what the intention behind this move is, but as a user I’d
 like to be able to convert RDDs of dictionaries directly to SchemaRDDs with
 the completeness of the jsonRDD()/jsonFile() methods. Right now if I
 really want that, I have to serialize the dictionaries to JSON text and
 then call jsonRDD(), which is expensive.

 Nick
 ​


 On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi All,

 I have a data set where each record is serialized using JSON, and I'm
 interested to use SchemaRDDs to work with the data.  Unfortunately I've hit
 a snag since some fields in the data are maps and list, and are not
 guaranteed to be populated for each record.  This seems to cause
 inferSchema to throw an error:

 Produces error:
 srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]},
 {'foo':'boom', 'baz':[1,2,3]}]))

 Works fine:
 srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]},
 {'foo':'boom', 'baz':[]}]))

 To be fair inferSchema says it peeks at the first row, so a possible
 work-around would be to make sure the type of any collection can be
 determined using the first instance.  However, I don't believe that items
 in an RDD are guaranteed to remain in an ordered, so this approach seems
 somewhat brittle.

 Does anybody know a robust solution to this problem in PySpark?  I'm am
 running the 1.0.1 release.

 -Brad





Re: Spark Memory Issues

2014-08-05 Thread Akhil Das
For that UI to have some values, your process should do some operation.
Which is not happening here ( 14/08/05 18:03:13 WARN YarnClusterScheduler:
Initial job has not accepted any resources; check your cluster UI to ensure
that workers are registered and have sufficient memory )

Can you open up a spark-shell and try some simple code? ( *val x =
sc.parallelize(1 to 100).filter(_100).collect()* )

Just to make sure your cluster setup is proper and is working.

Thanks
Best Regards


On Wed, Aug 6, 2014 at 12:17 AM, Sunny Khatri sunny.k...@gmail.com wrote:

 The only UI I have currently is the Application Master (Cluster mode),
 with the following executor nodes status:
 Executors (3)

- *Memory:* 0.0 B Used (3.7 GB Total)
- *Disk:* 0.0 B Used

  Executor IDAddress RDD BlocksMemory Used Disk UsedActive Tasks Failed
 TasksComplete Tasks Total TasksTask Time Shuffle ReadShuffle Write 1
 add1 0 0.0 B / 1766.4 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 2add2 0 0.0 B
 / 1766.4 MB 0.0 B0 0 00 0 ms0.0 B 0.0 B driver add3 0 0.0 B / 294.6 MB
 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B


 On Tue, Aug 5, 2014 at 11:32 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Are you able to see the job on the WebUI (8080)? If yes, how much memory
 are you seeing there specifically for this job?

 [image: Inline image 1]

 Here you can see i have 11.8Gb RAM on both workers and my app is using
 11GB.

 1. What are all the memory that you are seeing in your case?
 2. Make sure your application is using the same spark URI (as seen in the
 top left of the webUI) while creating the SparkContext.



 Thanks
 Best Regards


 On Tue, Aug 5, 2014 at 11:38 PM, Sunny Khatri sunny.k...@gmail.com
 wrote:

 Hi,

 I'm trying to run a spark application with the executor-memory 3G. but
 I'm running into the following error:

 14/08/05 18:02:58 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[5] at 
 map at KMeans.scala:123), which has no missing parents
 14/08/05 18:02:58 INFO DAGScheduler: Submitting 1 missing tasks from Stage 
 0 (MappedRDD[5] at map at KMeans.scala:123)
 14/08/05 18:02:58 INFO YarnClusterScheduler: Adding task set 0.0 with 1 
 tasks
 14/08/05 18:02:59 INFO CoarseGrainedSchedulerBackend: Registered executor: 
 Actor[akka.tcp://sparkexecu...@test-hadoop2.vpc.natero.com:54358/user/Executor#1670455157]
  with ID 2
 14/08/05 18:02:59 INFO BlockManagerInfo: Registering block manager 
 test-hadoop2.vpc.natero.com:39156 with 1766.4 MB RAM
 14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not accepted 
 any resources; check your cluster UI to ensure that workers are registered 
 and have sufficient memory
 14/08/05 18:03:28 WARN YarnClusterScheduler: Initial job has not accepted 
 any resources; check your cluster UI to ensure that workers are registered 
 and have sufficient memory
 14/08/05 18:03:43 WARN YarnClusterScheduler: Initial job has not accepted 
 any resources; check your cluster UI to ensure that workers are registered 
 and have sufficient memory
 14/08/05 18:03:58 WARN YarnClusterScheduler: Initial job has not accepted 
 any resources; check your cluster UI to ensure that workers are registered 
 and have sufficient memory


 Tried tweaking executor-memory as well, but same result. It always gets 
 stuck registering the block manager.


 Are there any other settings that needs to be adjusted.


 Thanks

 Sunny







Re: pyspark inferSchema

2014-08-05 Thread Nicholas Chammas
Notice the difference in the schema.  Are you running the 1.0.1 release, or
 a more bleeding-edge version from the repository?

Yep, my bad. I’m running off master at commit
184048f80b6fa160c89d5bb47b937a0a89534a95.

Nick
​


trouble with jsonRDD and jsonFile in pyspark

2014-08-05 Thread Brad Miller
Hi All,

I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of
some JSON data I have, but I've run into some instability involving the
following java exception:

An error occurred while calling o1326.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664
on host neal.research.intel-research.net:
net.razorvine.pickle.PickleException: couldn't introspect javabean:
java.lang.IllegalArgumentException: wrong number of arguments

I've pasted code which produces the error as well as the full traceback
below.  Note that I don't have any problem when I parse the JSON myself and
use inferSchema.

Is anybody able to reproduce this bug?

-Brad

 srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}',
'{foo:boom, baz:[1,2,3]}']))
 srdd.printSchema()

root
 |-- baz: ArrayType[IntegerType]
 |-- foo: StringType

 srdd.collect()

---
Py4JJavaError Traceback (most recent call last)
ipython-input-89-ec7e8e8c68c4 in module()
 1 srdd.collect()

/home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self)
581 
582 with _JavaStackTrace(self.context) as st:
-- 583   bytesInJava = self._jrdd.collect().iterator()
584 return
list(self._collect_iterator_through_file(bytesInJava))
585

/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in
__call__(self, *args)
535 answer = self.gateway_client.send_command(command)
536 return_value = get_return_value(answer, self.gateway_client,
-- 537 self.target_id, self.name)
538
539 for temp_arg in temp_args:

/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in
get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
-- 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling o1326.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664
on host neal.research.intel-research.net:
net.razorvine.pickle.PickleException: couldn't introspect javabean:
java.lang.IllegalArgumentException: wrong number of arguments
net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603)
net.razorvine.pickle.Pickler.dispatch(Pickler.java:299)
net.razorvine.pickle.Pickler.save(Pickler.java:125)
net.razorvine.pickle.Pickler.put_map(Pickler.java:322)
net.razorvine.pickle.Pickler.dispatch(Pickler.java:286)
net.razorvine.pickle.Pickler.save(Pickler.java:125)
net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392)
net.razorvine.pickle.Pickler.dispatch(Pickler.java:195)
net.razorvine.pickle.Pickler.save(Pickler.java:125)
net.razorvine.pickle.Pickler.dump(Pickler.java:95)
net.razorvine.pickle.Pickler.dumps(Pickler.java:80)

org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)

org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)
scala.collection.Iterator$anon$11.next(Iterator.scala:328)

org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294)

org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)

org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)

org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)

org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044)
at
org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at
org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
at

Re: pyspark inferSchema

2014-08-05 Thread Davies Liu
On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas
nicholas.cham...@gmail.com wrote:
 I was just about to ask about this.

 Currently, there are two methods, sqlContext.jsonFile() and
 sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers
 the whole data set.

 For example:

 from pyspark.sql import SQLContext
 sqlContext = SQLContext(sc)

 a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}',
 '{foo:boom, baz:[1,2,3]}']))
 a.printSchema()
 root
  |-- baz: array (nullable = true)
  ||-- element: integer (containsNull = false)
  |-- foo: string (nullable = true)

 It works really well! It handles fields with inconsistent value types by
 inferring a value type that covers all the possible values.

 But say you’ve already deserialized the JSON to do some pre-processing or
 filtering. You’d commonly want to do this, say, to remove bad data. So now
 you have an RDD of Python dictionaries, as opposed to an RDD of JSON
 strings. It would be perfect if you could get the completeness of the
 json...() methods, but against dictionaries.

 Unfortunately, as you noted, inferSchema() only looks at the first element
 in the set. Furthermore, inferring schemata from RDDs of dictionaries is
 being deprecated in favor of doing so from RDDs of Rows.

 I’m not sure what the intention behind this move is, but as a user I’d like
 to be able to convert RDDs of dictionaries directly to SchemaRDDs with the
 completeness of the jsonRDD()/jsonFile() methods. Right now if I really want
 that, I have to serialize the dictionaries to JSON text and then call
 jsonRDD(), which is expensive.

Before upcoming 1.1 release, we did not support nested structures via
inferSchema,
the nested dictionary will be MapType. This introduces inconsistance
for dictionary that
the top level will be structure type (can be accessed by name of
field) but others will be
MapType (can be accesses as map).

So deprecated top level dictionary is try to solve this kind of inconsistance.

The Row class in pyspark.sql has a similar interface to dict, so you
can easily convert
you dic into a Row:

ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d)))

In order to get the correct schema, so we need another argument to specify
the number of rows to be infered? Such as:

inferSchema(rdd, sample=None)

with sample=None, it will take the first row, or it will do the
sampling to figure out the
complete schema.

Does this work for you?

 Nick



 On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi All,

 I have a data set where each record is serialized using JSON, and I'm
 interested to use SchemaRDDs to work with the data.  Unfortunately I've hit
 a snag since some fields in the data are maps and list, and are not
 guaranteed to be populated for each record.  This seems to cause inferSchema
 to throw an error:

 Produces error:
 srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]},
 {'foo':'boom', 'baz':[1,2,3]}]))

 Works fine:
 srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]},
 {'foo':'boom', 'baz':[]}]))

 To be fair inferSchema says it peeks at the first row, so a possible
 work-around would be to make sure the type of any collection can be
 determined using the first instance.  However, I don't believe that items in
 an RDD are guaranteed to remain in an ordered, so this approach seems
 somewhat brittle.

 Does anybody know a robust solution to this problem in PySpark?  I'm am
 running the 1.0.1 release.

 -Brad



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-05 Thread Nicholas Chammas
I believe this is a known issue in 1.0.1 that's fixed in 1.0.2.

See: SPARK-2376: Selecting list values inside nested JSON objects raises
java.lang.IllegalArgumentException
https://issues.apache.org/jira/browse/SPARK-2376


On Tue, Aug 5, 2014 at 2:55 PM, Brad Miller bmill...@eecs.berkeley.edu
wrote:

 Hi All,

 I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of
 some JSON data I have, but I've run into some instability involving the
 following java exception:

 An error occurred while calling o1326.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664
 on host neal.research.intel-research.net:
 net.razorvine.pickle.PickleException: couldn't introspect javabean:
 java.lang.IllegalArgumentException: wrong number of arguments

 I've pasted code which produces the error as well as the full traceback
 below.  Note that I don't have any problem when I parse the JSON myself and
 use inferSchema.

 Is anybody able to reproduce this bug?

 -Brad

  srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}',
 '{foo:boom, baz:[1,2,3]}']))
  srdd.printSchema()

 root
  |-- baz: ArrayType[IntegerType]
  |-- foo: StringType

  srdd.collect()

 ---
 Py4JJavaError Traceback (most recent call last)
 ipython-input-89-ec7e8e8c68c4 in module()
  1 srdd.collect()

 /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self)
 581 
 582 with _JavaStackTrace(self.context) as st:
 -- 583   bytesInJava = self._jrdd.collect().iterator()
 584 return
 list(self._collect_iterator_through_file(bytesInJava))
 585

 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in
 __call__(self, *args)
 535 answer = self.gateway_client.send_command(command)
 536 return_value = get_return_value(answer,
 self.gateway_client,
 -- 537 self.target_id, self.name)
 538
 539 for temp_arg in temp_args:

 /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in
 get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling {0}{1}{2}.\n'.
 -- 300 format(target_id, '.', name), value)
 301 else:
 302 raise Py4JError(

 Py4JJavaError: An error occurred while calling o1326.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664
 on host neal.research.intel-research.net:
 net.razorvine.pickle.PickleException: couldn't introspect javabean:
 java.lang.IllegalArgumentException: wrong number of arguments
 net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603)
 net.razorvine.pickle.Pickler.dispatch(Pickler.java:299)
 net.razorvine.pickle.Pickler.save(Pickler.java:125)
 net.razorvine.pickle.Pickler.put_map(Pickler.java:322)
 net.razorvine.pickle.Pickler.dispatch(Pickler.java:286)
 net.razorvine.pickle.Pickler.save(Pickler.java:125)
 net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392)
 net.razorvine.pickle.Pickler.dispatch(Pickler.java:195)
 net.razorvine.pickle.Pickler.save(Pickler.java:125)
 net.razorvine.pickle.Pickler.dump(Pickler.java:95)
 net.razorvine.pickle.Pickler.dumps(Pickler.java:80)

 org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)

 org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)
 scala.collection.Iterator$anon$11.next(Iterator.scala:328)

 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294)

 org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)

 org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)

 org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)

 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)

 org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044)
 at
 org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
 at
 org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 

Re: pyspark inferSchema

2014-08-05 Thread Brad Miller
Got it.  Thanks!


On Tue, Aug 5, 2014 at 11:53 AM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Notice the difference in the schema.  Are you running the 1.0.1 release,
 or a more bleeding-edge version from the repository?

 Yep, my bad. I’m running off master at commit
 184048f80b6fa160c89d5bb47b937a0a89534a95.

 Nick
 ​



Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-05 Thread Michael Armbrust
Is this on 1.0.1?  I'd suggest running this on master or the 1.1-RC which
should be coming out this week.  Pyspark did not have good support for
nested data previously.  If you still encounter issues using a more recent
version, please file a JIRA.  Thanks!


On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller bmill...@eecs.berkeley.edu
wrote:

 Hi All,

 I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of
 some JSON data I have, but I've run into some instability involving the
 following java exception:

 An error occurred while calling o1326.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664
 on host neal.research.intel-research.net:
 net.razorvine.pickle.PickleException: couldn't introspect javabean:
 java.lang.IllegalArgumentException: wrong number of arguments

 I've pasted code which produces the error as well as the full traceback
 below.  Note that I don't have any problem when I parse the JSON myself and
 use inferSchema.

 Is anybody able to reproduce this bug?

 -Brad

  srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}',
 '{foo:boom, baz:[1,2,3]}']))
  srdd.printSchema()

 root
  |-- baz: ArrayType[IntegerType]
  |-- foo: StringType

  srdd.collect()

 ---
 Py4JJavaError Traceback (most recent call last)
 ipython-input-89-ec7e8e8c68c4 in module()
  1 srdd.collect()

 /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self)
 581 
 582 with _JavaStackTrace(self.context) as st:
 -- 583   bytesInJava = self._jrdd.collect().iterator()
 584 return
 list(self._collect_iterator_through_file(bytesInJava))
 585

 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in
 __call__(self, *args)
 535 answer = self.gateway_client.send_command(command)
 536 return_value = get_return_value(answer,
 self.gateway_client,
 -- 537 self.target_id, self.name)
 538
 539 for temp_arg in temp_args:

 /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in
 get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling {0}{1}{2}.\n'.
 -- 300 format(target_id, '.', name), value)
 301 else:
 302 raise Py4JError(

 Py4JJavaError: An error occurred while calling o1326.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664
 on host neal.research.intel-research.net:
 net.razorvine.pickle.PickleException: couldn't introspect javabean:
 java.lang.IllegalArgumentException: wrong number of arguments
 net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603)
 net.razorvine.pickle.Pickler.dispatch(Pickler.java:299)
 net.razorvine.pickle.Pickler.save(Pickler.java:125)
 net.razorvine.pickle.Pickler.put_map(Pickler.java:322)
 net.razorvine.pickle.Pickler.dispatch(Pickler.java:286)
 net.razorvine.pickle.Pickler.save(Pickler.java:125)
 net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392)
 net.razorvine.pickle.Pickler.dispatch(Pickler.java:195)
 net.razorvine.pickle.Pickler.save(Pickler.java:125)
 net.razorvine.pickle.Pickler.dump(Pickler.java:95)
 net.razorvine.pickle.Pickler.dumps(Pickler.java:80)

 org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)

 org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)
 scala.collection.Iterator$anon$11.next(Iterator.scala:328)

 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294)

 org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)

 org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)

 org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)

 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)

 org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044)
 at
 org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
 at
 org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 

Re: java.lang.StackOverflowError

2014-08-05 Thread Chengi Liu
Bump

On Tuesday, August 5, 2014, Chengi Liu chengi.liu...@gmail.com wrote:

 Hi,
   I am doing some basic preprocessing in pyspark (local mode as follows):

 files = [ input files]
 def read(filename,sc):
   #process file
   return rdd

 if __name__ ==__main__:
conf = SparkConf()
   conf.setMaster('local')
   sc = SparkContext(conf =conf)
   sc.setCheckpointDir(root+temp/)

   data = sc.parallelize([])

   for i,f in enumerate(files):

 data = data.union(read(f,sc))
 if i ==20:
   data.checkpoint()
   data.count()
 if i == 500:break
   #print data.count()
   #rdd_1 = read(files[0],sc)
   data.saveAsTextFile(root+output/)


 But I see this error:
   keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
   File
 /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
   File
 /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling
 o9564.saveAsTextFile.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 serialization failed: java.lang.StackOverflowError
 java.io.Bits.putInt(Bits.java:93)

 java.io.ObjectOutputStream$BlockDataOutputStream.writeInt(ObjectOutputStream.java:1927)



Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-05 Thread Brad Miller
Nick: Thanks for both the original JIRA bug report and the link.

Michael: This is on the 1.0.1 release.  I'll update to master and follow-up
if I have any problems.

best,
-Brad


On Tue, Aug 5, 2014 at 12:04 PM, Michael Armbrust mich...@databricks.com
wrote:

 Is this on 1.0.1?  I'd suggest running this on master or the 1.1-RC which
 should be coming out this week.  Pyspark did not have good support for
 nested data previously.  If you still encounter issues using a more recent
 version, please file a JIRA.  Thanks!


 On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi All,

 I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of
 some JSON data I have, but I've run into some instability involving the
 following java exception:

 An error occurred while calling o1326.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664
 on host neal.research.intel-research.net:
 net.razorvine.pickle.PickleException: couldn't introspect javabean:
 java.lang.IllegalArgumentException: wrong number of arguments

 I've pasted code which produces the error as well as the full traceback
 below.  Note that I don't have any problem when I parse the JSON myself and
 use inferSchema.

 Is anybody able to reproduce this bug?

 -Brad

  srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}',
 '{foo:boom, baz:[1,2,3]}']))
  srdd.printSchema()

 root
  |-- baz: ArrayType[IntegerType]
  |-- foo: StringType

  srdd.collect()


 ---
 Py4JJavaError Traceback (most recent call
 last)
 ipython-input-89-ec7e8e8c68c4 in module()
  1 srdd.collect()

 /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self)
 581 
 582 with _JavaStackTrace(self.context) as st:
 -- 583   bytesInJava = self._jrdd.collect().iterator()
 584 return
 list(self._collect_iterator_through_file(bytesInJava))
 585

 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in
 __call__(self, *args)
 535 answer = self.gateway_client.send_command(command)
 536 return_value = get_return_value(answer,
 self.gateway_client,
 -- 537 self.target_id, self.name)
 538
 539 for temp_arg in temp_args:

 /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in
 get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling
 {0}{1}{2}.\n'.
 -- 300 format(target_id, '.', name), value)
 301 else:
 302 raise Py4JError(

 Py4JJavaError: An error occurred while calling o1326.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664
 on host neal.research.intel-research.net:
 net.razorvine.pickle.PickleException: couldn't introspect javabean:
 java.lang.IllegalArgumentException: wrong number of arguments
 net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603)
 net.razorvine.pickle.Pickler.dispatch(Pickler.java:299)
 net.razorvine.pickle.Pickler.save(Pickler.java:125)
 net.razorvine.pickle.Pickler.put_map(Pickler.java:322)
 net.razorvine.pickle.Pickler.dispatch(Pickler.java:286)
 net.razorvine.pickle.Pickler.save(Pickler.java:125)
 net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392)
 net.razorvine.pickle.Pickler.dispatch(Pickler.java:195)
 net.razorvine.pickle.Pickler.save(Pickler.java:125)
 net.razorvine.pickle.Pickler.dump(Pickler.java:95)
 net.razorvine.pickle.Pickler.dumps(Pickler.java:80)

 org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)

 org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)
 scala.collection.Iterator$anon$11.next(Iterator.scala:328)

 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294)

 org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)

 org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)

 org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)

 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)

 org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044)
 at
 org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
 at
 

Re: pyspark inferSchema

2014-08-05 Thread Brad Miller
Hi Davies,

Thanks for the response and tips.  Is the sample argument to inferSchema
available in the 1.0.1 release of pyspark?  I'm not sure (based on the
documentation linked below) that it is.
http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema

It sounds like updating to master may help address my issue (and may also
make the sample argument available), so I'm going to go ahead and do that.

best,
-Brad


On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com wrote:

 On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  I was just about to ask about this.
 
  Currently, there are two methods, sqlContext.jsonFile() and
  sqlContext.jsonRDD(), that work on JSON text and infer a schema that
 covers
  the whole data set.
 
  For example:
 
  from pyspark.sql import SQLContext
  sqlContext = SQLContext(sc)
 
  a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}',
  '{foo:boom, baz:[1,2,3]}']))
  a.printSchema()
  root
   |-- baz: array (nullable = true)
   ||-- element: integer (containsNull = false)
   |-- foo: string (nullable = true)
 
  It works really well! It handles fields with inconsistent value types by
  inferring a value type that covers all the possible values.
 
  But say you’ve already deserialized the JSON to do some pre-processing or
  filtering. You’d commonly want to do this, say, to remove bad data. So
 now
  you have an RDD of Python dictionaries, as opposed to an RDD of JSON
  strings. It would be perfect if you could get the completeness of the
  json...() methods, but against dictionaries.
 
  Unfortunately, as you noted, inferSchema() only looks at the first
 element
  in the set. Furthermore, inferring schemata from RDDs of dictionaries is
  being deprecated in favor of doing so from RDDs of Rows.
 
  I’m not sure what the intention behind this move is, but as a user I’d
 like
  to be able to convert RDDs of dictionaries directly to SchemaRDDs with
 the
  completeness of the jsonRDD()/jsonFile() methods. Right now if I really
 want
  that, I have to serialize the dictionaries to JSON text and then call
  jsonRDD(), which is expensive.

 Before upcoming 1.1 release, we did not support nested structures via
 inferSchema,
 the nested dictionary will be MapType. This introduces inconsistance
 for dictionary that
 the top level will be structure type (can be accessed by name of
 field) but others will be
 MapType (can be accesses as map).

 So deprecated top level dictionary is try to solve this kind of
 inconsistance.

 The Row class in pyspark.sql has a similar interface to dict, so you
 can easily convert
 you dic into a Row:

 ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d)))

 In order to get the correct schema, so we need another argument to specify
 the number of rows to be infered? Such as:

 inferSchema(rdd, sample=None)

 with sample=None, it will take the first row, or it will do the
 sampling to figure out the
 complete schema.

 Does this work for you?

  Nick
 
 
 
  On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu
  wrote:
 
  Hi All,
 
  I have a data set where each record is serialized using JSON, and I'm
  interested to use SchemaRDDs to work with the data.  Unfortunately I've
 hit
  a snag since some fields in the data are maps and list, and are not
  guaranteed to be populated for each record.  This seems to cause
 inferSchema
  to throw an error:
 
  Produces error:
  srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]},
  {'foo':'boom', 'baz':[1,2,3]}]))
 
  Works fine:
  srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]},
  {'foo':'boom', 'baz':[]}]))
 
  To be fair inferSchema says it peeks at the first row, so a possible
  work-around would be to make sure the type of any collection can be
  determined using the first instance.  However, I don't believe that
 items in
  an RDD are guaranteed to remain in an ordered, so this approach seems
  somewhat brittle.
 
  Does anybody know a robust solution to this problem in PySpark?  I'm am
  running the 1.0.1 release.
 
  -Brad
 
 



Re: java.lang.StackOverflowError

2014-08-05 Thread Davies Liu
Could you create an re-producable script (and data) to allow us to
investigate this?

Davies

On Tue, Aug 5, 2014 at 1:10 AM, Chengi Liu chengi.liu...@gmail.com wrote:
 Hi,
   I am doing some basic preprocessing in pyspark (local mode as follows):

 files = [ input files]
 def read(filename,sc):
   #process file
   return rdd

 if __name__ ==__main__:
conf = SparkConf()
   conf.setMaster('local')
   sc = SparkContext(conf =conf)
   sc.setCheckpointDir(root+temp/)

   data = sc.parallelize([])

   for i,f in enumerate(files):

 data = data.union(read(f,sc))

union is an lazy transformation, you could union them at once,

rdds = [read(f,sc) for f in files]
rdd = sc.union(rdds)

 if i ==20:
   data.checkpoint()
   data.count()
 if i == 500:break
   #print data.count()
   #rdd_1 = read(files[0],sc)
   data.saveAsTextFile(root+output/)


 But I see this error:
   keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
   File
 /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
   File
 /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling
 o9564.saveAsTextFile.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 serialization failed: java.lang.StackOverflowError
 java.io.Bits.putInt(Bits.java:93)
 java.io.ObjectOutputStream$BlockDataOutputStream.writeInt(ObjectOutputStream.java:1927)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Memory Issues

2014-08-05 Thread Akhil Das
Are you sure that you were not running SparkPi in local mode?

Thanks
Best Regards


On Wed, Aug 6, 2014 at 12:43 AM, Sunny Khatri sunny.k...@gmail.com wrote:

 Well I was able to run the SparkPi, that also does the similar stuff,
 successfully.


 On Tue, Aug 5, 2014 at 11:52 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 For that UI to have some values, your process should do some operation.
 Which is not happening here ( 14/08/05 18:03:13 WARN
 YarnClusterScheduler: Initial job has not accepted any resources; check
 your cluster UI to ensure that workers are registered and have sufficient
 memory )

 Can you open up a spark-shell and try some simple code? ( *val x =
 sc.parallelize(1 to 100).filter(_100).collect()* )

 Just to make sure your cluster setup is proper and is working.

 Thanks
 Best Regards


 On Wed, Aug 6, 2014 at 12:17 AM, Sunny Khatri sunny.k...@gmail.com
 wrote:

 The only UI I have currently is the Application Master (Cluster mode),
 with the following executor nodes status:
 Executors (3)

- *Memory:* 0.0 B Used (3.7 GB Total)
- *Disk:* 0.0 B Used

  Executor IDAddress RDD BlocksMemory Used Disk UsedActive Tasks Failed
 TasksComplete Tasks Total TasksTask Time Shuffle ReadShuffle Write 1
 add1 0 0.0 B / 1766.4 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 2add2 0 0.0
 B / 1766.4 MB 0.0 B0 0 00 0 ms0.0 B 0.0 B driver add3 0 0.0 B /
 294.6 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B


 On Tue, Aug 5, 2014 at 11:32 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Are you able to see the job on the WebUI (8080)? If yes, how much
 memory are you seeing there specifically for this job?

 [image: Inline image 1]

 Here you can see i have 11.8Gb RAM on both workers and my app is using
 11GB.

 1. What are all the memory that you are seeing in your case?
 2. Make sure your application is using the same spark URI (as seen in
 the top left of the webUI) while creating the SparkContext.



 Thanks
 Best Regards


 On Tue, Aug 5, 2014 at 11:38 PM, Sunny Khatri sunny.k...@gmail.com
 wrote:

 Hi,

 I'm trying to run a spark application with the executor-memory 3G. but
 I'm running into the following error:

 14/08/05 18:02:58 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[5] at 
 map at KMeans.scala:123), which has no missing parents
 14/08/05 18:02:58 INFO DAGScheduler: Submitting 1 missing tasks from 
 Stage 0 (MappedRDD[5] at map at KMeans.scala:123)
 14/08/05 18:02:58 INFO YarnClusterScheduler: Adding task set 0.0 with 1 
 tasks
 14/08/05 18:02:59 INFO CoarseGrainedSchedulerBackend: Registered 
 executor: 
 Actor[akka.tcp://sparkexecu...@test-hadoop2.vpc.natero.com:54358/user/Executor#1670455157]
  with ID 2
 14/08/05 18:02:59 INFO BlockManagerInfo: Registering block manager 
 test-hadoop2.vpc.natero.com:39156 with 1766.4 MB RAM
 14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not accepted 
 any resources; check your cluster UI to ensure that workers are 
 registered and have sufficient memory
 14/08/05 18:03:28 WARN YarnClusterScheduler: Initial job has not accepted 
 any resources; check your cluster UI to ensure that workers are 
 registered and have sufficient memory
 14/08/05 18:03:43 WARN YarnClusterScheduler: Initial job has not accepted 
 any resources; check your cluster UI to ensure that workers are 
 registered and have sufficient memory
 14/08/05 18:03:58 WARN YarnClusterScheduler: Initial job has not accepted 
 any resources; check your cluster UI to ensure that workers are 
 registered and have sufficient memory


 Tried tweaking executor-memory as well, but same result. It always gets 
 stuck registering the block manager.


 Are there any other settings that needs to be adjusted.


 Thanks

 Sunny









Re: Spark Memory Issues

2014-08-05 Thread Sunny Khatri
Yeah, ran it on yarn-cluster mode.


On Tue, Aug 5, 2014 at 12:17 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Are you sure that you were not running SparkPi in local mode?

 Thanks
 Best Regards


 On Wed, Aug 6, 2014 at 12:43 AM, Sunny Khatri sunny.k...@gmail.com
 wrote:

 Well I was able to run the SparkPi, that also does the similar stuff,
 successfully.


 On Tue, Aug 5, 2014 at 11:52 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 For that UI to have some values, your process should do some operation.
 Which is not happening here ( 14/08/05 18:03:13 WARN
 YarnClusterScheduler: Initial job has not accepted any resources; check
 your cluster UI to ensure that workers are registered and have sufficient
 memory )

 Can you open up a spark-shell and try some simple code? ( *val x =
 sc.parallelize(1 to 100).filter(_100).collect()* )

 Just to make sure your cluster setup is proper and is working.

 Thanks
 Best Regards


 On Wed, Aug 6, 2014 at 12:17 AM, Sunny Khatri sunny.k...@gmail.com
 wrote:

 The only UI I have currently is the Application Master (Cluster mode),
 with the following executor nodes status:
 Executors (3)

- *Memory:* 0.0 B Used (3.7 GB Total)
- *Disk:* 0.0 B Used

  Executor IDAddress RDD BlocksMemory Used Disk UsedActive Tasks Failed
 TasksComplete Tasks Total TasksTask Time Shuffle ReadShuffle Write 1
 add1 0 0.0 B / 1766.4 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 2add2 0
 0.0 B / 1766.4 MB 0.0 B0 0 00 0 ms0.0 B 0.0 B driver add3 0 0.0 B
 / 294.6 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B


 On Tue, Aug 5, 2014 at 11:32 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Are you able to see the job on the WebUI (8080)? If yes, how much
 memory are you seeing there specifically for this job?

 [image: Inline image 1]

 Here you can see i have 11.8Gb RAM on both workers and my app is using
 11GB.

 1. What are all the memory that you are seeing in your case?
 2. Make sure your application is using the same spark URI (as seen in
 the top left of the webUI) while creating the SparkContext.



 Thanks
 Best Regards


 On Tue, Aug 5, 2014 at 11:38 PM, Sunny Khatri sunny.k...@gmail.com
 wrote:

 Hi,

 I'm trying to run a spark application with the executor-memory 3G.
 but I'm running into the following error:

 14/08/05 18:02:58 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[5] at 
 map at KMeans.scala:123), which has no missing parents
 14/08/05 18:02:58 INFO DAGScheduler: Submitting 1 missing tasks from 
 Stage 0 (MappedRDD[5] at map at KMeans.scala:123)
 14/08/05 18:02:58 INFO YarnClusterScheduler: Adding task set 0.0 with 1 
 tasks
 14/08/05 18:02:59 INFO CoarseGrainedSchedulerBackend: Registered 
 executor: 
 Actor[akka.tcp://sparkexecu...@test-hadoop2.vpc.natero.com:54358/user/Executor#1670455157]
  with ID 2
 14/08/05 18:02:59 INFO BlockManagerInfo: Registering block manager 
 test-hadoop2.vpc.natero.com:39156 with 1766.4 MB RAM
 14/08/05 18:03:13 WARN YarnClusterScheduler: Initial job has not 
 accepted any resources; check your cluster UI to ensure that workers are 
 registered and have sufficient memory
 14/08/05 18:03:28 WARN YarnClusterScheduler: Initial job has not 
 accepted any resources; check your cluster UI to ensure that workers are 
 registered and have sufficient memory
 14/08/05 18:03:43 WARN YarnClusterScheduler: Initial job has not 
 accepted any resources; check your cluster UI to ensure that workers are 
 registered and have sufficient memory
 14/08/05 18:03:58 WARN YarnClusterScheduler: Initial job has not 
 accepted any resources; check your cluster UI to ensure that workers are 
 registered and have sufficient memory


 Tried tweaking executor-memory as well, but same result. It always gets 
 stuck registering the block manager.


 Are there any other settings that needs to be adjusted.


 Thanks

 Sunny










Re: pyspark inferSchema

2014-08-05 Thread Davies Liu
This sample argument of inferSchema is still no in master, if will
try to add it if it make
sense.

On Tue, Aug 5, 2014 at 12:14 PM, Brad Miller bmill...@eecs.berkeley.edu wrote:
 Hi Davies,

 Thanks for the response and tips.  Is the sample argument to inferSchema
 available in the 1.0.1 release of pyspark?  I'm not sure (based on the
 documentation linked below) that it is.
 http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema

 It sounds like updating to master may help address my issue (and may also
 make the sample argument available), so I'm going to go ahead and do that.

 best,
 -Brad


 On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com wrote:

 On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  I was just about to ask about this.
 
  Currently, there are two methods, sqlContext.jsonFile() and
  sqlContext.jsonRDD(), that work on JSON text and infer a schema that
  covers
  the whole data set.
 
  For example:
 
  from pyspark.sql import SQLContext
  sqlContext = SQLContext(sc)
 
  a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}',
  '{foo:boom, baz:[1,2,3]}']))
  a.printSchema()
  root
   |-- baz: array (nullable = true)
   ||-- element: integer (containsNull = false)
   |-- foo: string (nullable = true)
 
  It works really well! It handles fields with inconsistent value types by
  inferring a value type that covers all the possible values.
 
  But say you’ve already deserialized the JSON to do some pre-processing
  or
  filtering. You’d commonly want to do this, say, to remove bad data. So
  now
  you have an RDD of Python dictionaries, as opposed to an RDD of JSON
  strings. It would be perfect if you could get the completeness of the
  json...() methods, but against dictionaries.
 
  Unfortunately, as you noted, inferSchema() only looks at the first
  element
  in the set. Furthermore, inferring schemata from RDDs of dictionaries is
  being deprecated in favor of doing so from RDDs of Rows.
 
  I’m not sure what the intention behind this move is, but as a user I’d
  like
  to be able to convert RDDs of dictionaries directly to SchemaRDDs with
  the
  completeness of the jsonRDD()/jsonFile() methods. Right now if I really
  want
  that, I have to serialize the dictionaries to JSON text and then call
  jsonRDD(), which is expensive.

 Before upcoming 1.1 release, we did not support nested structures via
 inferSchema,
 the nested dictionary will be MapType. This introduces inconsistance
 for dictionary that
 the top level will be structure type (can be accessed by name of
 field) but others will be
 MapType (can be accesses as map).

 So deprecated top level dictionary is try to solve this kind of
 inconsistance.

 The Row class in pyspark.sql has a similar interface to dict, so you
 can easily convert
 you dic into a Row:

 ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d)))

 In order to get the correct schema, so we need another argument to specify
 the number of rows to be infered? Such as:

 inferSchema(rdd, sample=None)

 with sample=None, it will take the first row, or it will do the
 sampling to figure out the
 complete schema.

 Does this work for you?

  Nick
 
 
 
  On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu
  wrote:
 
  Hi All,
 
  I have a data set where each record is serialized using JSON, and I'm
  interested to use SchemaRDDs to work with the data.  Unfortunately I've
  hit
  a snag since some fields in the data are maps and list, and are not
  guaranteed to be populated for each record.  This seems to cause
  inferSchema
  to throw an error:
 
  Produces error:
  srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]},
  {'foo':'boom', 'baz':[1,2,3]}]))
 
  Works fine:
  srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]},
  {'foo':'boom', 'baz':[]}]))
 
  To be fair inferSchema says it peeks at the first row, so a possible
  work-around would be to make sure the type of any collection can be
  determined using the first instance.  However, I don't believe that
  items in
  an RDD are guaranteed to remain in an ordered, so this approach seems
  somewhat brittle.
 
  Does anybody know a robust solution to this problem in PySpark?  I'm am
  running the 1.0.1 release.
 
  -Brad
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark inferSchema

2014-08-05 Thread Brad Miller
Assuming updating to master fixes the bug I was experiencing with jsonRDD
and jsonFile, then pushing sample to master will probably not be
necessary.

We believe that the link below was the bug I experienced, and I've been
told it is fixed in master.

https://issues.apache.org/jira/browse/SPARK-2376

best,
-brad


On Tue, Aug 5, 2014 at 12:18 PM, Davies Liu dav...@databricks.com wrote:

 This sample argument of inferSchema is still no in master, if will
 try to add it if it make
 sense.

 On Tue, Aug 5, 2014 at 12:14 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:
  Hi Davies,
 
  Thanks for the response and tips.  Is the sample argument to
 inferSchema
  available in the 1.0.1 release of pyspark?  I'm not sure (based on the
  documentation linked below) that it is.
 
 http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema
 
  It sounds like updating to master may help address my issue (and may also
  make the sample argument available), so I'm going to go ahead and do
 that.
 
  best,
  -Brad
 
 
  On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com
 wrote:
 
  On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas
  nicholas.cham...@gmail.com wrote:
   I was just about to ask about this.
  
   Currently, there are two methods, sqlContext.jsonFile() and
   sqlContext.jsonRDD(), that work on JSON text and infer a schema that
   covers
   the whole data set.
  
   For example:
  
   from pyspark.sql import SQLContext
   sqlContext = SQLContext(sc)
  
   a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}',
   '{foo:boom, baz:[1,2,3]}']))
   a.printSchema()
   root
|-- baz: array (nullable = true)
||-- element: integer (containsNull = false)
|-- foo: string (nullable = true)
  
   It works really well! It handles fields with inconsistent value types
 by
   inferring a value type that covers all the possible values.
  
   But say you’ve already deserialized the JSON to do some pre-processing
   or
   filtering. You’d commonly want to do this, say, to remove bad data. So
   now
   you have an RDD of Python dictionaries, as opposed to an RDD of JSON
   strings. It would be perfect if you could get the completeness of the
   json...() methods, but against dictionaries.
  
   Unfortunately, as you noted, inferSchema() only looks at the first
   element
   in the set. Furthermore, inferring schemata from RDDs of dictionaries
 is
   being deprecated in favor of doing so from RDDs of Rows.
  
   I’m not sure what the intention behind this move is, but as a user I’d
   like
   to be able to convert RDDs of dictionaries directly to SchemaRDDs with
   the
   completeness of the jsonRDD()/jsonFile() methods. Right now if I
 really
   want
   that, I have to serialize the dictionaries to JSON text and then call
   jsonRDD(), which is expensive.
 
  Before upcoming 1.1 release, we did not support nested structures via
  inferSchema,
  the nested dictionary will be MapType. This introduces inconsistance
  for dictionary that
  the top level will be structure type (can be accessed by name of
  field) but others will be
  MapType (can be accesses as map).
 
  So deprecated top level dictionary is try to solve this kind of
  inconsistance.
 
  The Row class in pyspark.sql has a similar interface to dict, so you
  can easily convert
  you dic into a Row:
 
  ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d)))
 
  In order to get the correct schema, so we need another argument to
 specify
  the number of rows to be infered? Such as:
 
  inferSchema(rdd, sample=None)
 
  with sample=None, it will take the first row, or it will do the
  sampling to figure out the
  complete schema.
 
  Does this work for you?
 
   Nick
  
  
  
   On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller 
 bmill...@eecs.berkeley.edu
   wrote:
  
   Hi All,
  
   I have a data set where each record is serialized using JSON, and I'm
   interested to use SchemaRDDs to work with the data.  Unfortunately
 I've
   hit
   a snag since some fields in the data are maps and list, and are not
   guaranteed to be populated for each record.  This seems to cause
   inferSchema
   to throw an error:
  
   Produces error:
   srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]},
   {'foo':'boom', 'baz':[1,2,3]}]))
  
   Works fine:
   srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar',
 'baz':[1,2,3]},
   {'foo':'boom', 'baz':[]}]))
  
   To be fair inferSchema says it peeks at the first row, so a
 possible
   work-around would be to make sure the type of any collection can be
   determined using the first instance.  However, I don't believe that
   items in
   an RDD are guaranteed to remain in an ordered, so this approach seems
   somewhat brittle.
  
   Does anybody know a robust solution to this problem in PySpark?  I'm
 am
   running the 1.0.1 release.
  
   -Brad
  
  
 
 



Re: Configuration setup and Connection refused

2014-08-05 Thread alamin.ishak
Hi,
Anyone? Any input would be much appreciated

Thanks,
Amin
On 5 Aug 2014 00:31, Al Amin alamin.is...@gmail.com wrote:

 Hi all,

 Any help would be much appreciated.

 Thanks,
 Al


 On Mon, Aug 4, 2014 at 7:09 PM, Al Amin alamin.is...@gmail.com wrote:

 Hi all,

 I have setup 2 nodes (master and slave1) on stand alone mode. Tried
 running SparkPi example and its working fine. However when I move on to
  wordcount its giving me below error:

 14/08/04 21:40:33 INFO storage.MemoryStore: ensureFreeSpace(32856) called
 with curMem=0, maxMem=311387750
 14/08/04 21:40:33 INFO storage.MemoryStore: Block broadcast_0 stored as
 values in memory (estimated size 32.1 KB, free 296.9 MB)
 14/08/04 21:40:34 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 0 time(s).
 14/08/04 21:40:35 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 1 time(s).
 14/08/04 21:40:36 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 2 time(s).
 14/08/04 21:40:37 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 3 time(s).
 14/08/04 21:40:38 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 4 time(s).
 14/08/04 21:40:39 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 5 time(s).
 14/08/04 21:40:40 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 6 time(s).
 14/08/04 21:40:41 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 7 time(s).
 14/08/04 21:40:42 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 8 time(s).
 14/08/04 21:40:43 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 9 time(s).
 Exception in thread main java.lang.RuntimeException:
 java.net.ConnectException: Call to master/10.0.1.27:9000 failed on
 connection exception: java.net.ConnectException: Connection refused


 1) how to fix this issue? I have configure hostname --fqdn accordingly.

 2) I could see that in my logs that my master/worker deploy configuration
 is  -Xms512m -Xmx512m. Is there any way that I can increase it? Or 512mb is
 just fine? AFAIK, spark require huge memory.

 3) I have a hadoop cluster and its working. Could anyone point me how to
 integrate Yarn with Spark? Any good tutorial would be very useful


 Thanks,
 Al







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-Configuration-setup-and-Connection-refused-tp11477.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

SELECT DISTINCT generates random results?

2014-08-05 Thread Nan Zhu
Hi, all  

I use “SELECT DISTINCT” to query the data saved in hive

it seems that this statement cannot understand the table structure and just 
output the data in other fields

Anyone met the similar problem before?

Best,  

--  
Nan Zhu



Re: pyspark inferSchema

2014-08-05 Thread Yin Huai
Yes, 2376 has been fixed in master. Can you give it a try?

Also, for inferSchema, because Python is dynamically typed, I agree with
Davies to provide a way to scan a subset (or entire) of the dataset to
figure out the proper schema. We will take a look it.

Thanks,

Yin


On Tue, Aug 5, 2014 at 12:20 PM, Brad Miller bmill...@eecs.berkeley.edu
wrote:

 Assuming updating to master fixes the bug I was experiencing with jsonRDD
 and jsonFile, then pushing sample to master will probably not be
 necessary.

 We believe that the link below was the bug I experienced, and I've been
 told it is fixed in master.

 https://issues.apache.org/jira/browse/SPARK-2376

 best,
 -brad


 On Tue, Aug 5, 2014 at 12:18 PM, Davies Liu dav...@databricks.com wrote:

 This sample argument of inferSchema is still no in master, if will
 try to add it if it make
 sense.

 On Tue, Aug 5, 2014 at 12:14 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:
  Hi Davies,
 
  Thanks for the response and tips.  Is the sample argument to
 inferSchema
  available in the 1.0.1 release of pyspark?  I'm not sure (based on the
  documentation linked below) that it is.
 
 http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema
 
  It sounds like updating to master may help address my issue (and may
 also
  make the sample argument available), so I'm going to go ahead and do
 that.
 
  best,
  -Brad
 
 
  On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com
 wrote:
 
  On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas
  nicholas.cham...@gmail.com wrote:
   I was just about to ask about this.
  
   Currently, there are two methods, sqlContext.jsonFile() and
   sqlContext.jsonRDD(), that work on JSON text and infer a schema that
   covers
   the whole data set.
  
   For example:
  
   from pyspark.sql import SQLContext
   sqlContext = SQLContext(sc)
  
   a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}',
   '{foo:boom, baz:[1,2,3]}']))
   a.printSchema()
   root
|-- baz: array (nullable = true)
||-- element: integer (containsNull = false)
|-- foo: string (nullable = true)
  
   It works really well! It handles fields with inconsistent value
 types by
   inferring a value type that covers all the possible values.
  
   But say you’ve already deserialized the JSON to do some
 pre-processing
   or
   filtering. You’d commonly want to do this, say, to remove bad data.
 So
   now
   you have an RDD of Python dictionaries, as opposed to an RDD of JSON
   strings. It would be perfect if you could get the completeness of the
   json...() methods, but against dictionaries.
  
   Unfortunately, as you noted, inferSchema() only looks at the first
   element
   in the set. Furthermore, inferring schemata from RDDs of
 dictionaries is
   being deprecated in favor of doing so from RDDs of Rows.
  
   I’m not sure what the intention behind this move is, but as a user
 I’d
   like
   to be able to convert RDDs of dictionaries directly to SchemaRDDs
 with
   the
   completeness of the jsonRDD()/jsonFile() methods. Right now if I
 really
   want
   that, I have to serialize the dictionaries to JSON text and then call
   jsonRDD(), which is expensive.
 
  Before upcoming 1.1 release, we did not support nested structures via
  inferSchema,
  the nested dictionary will be MapType. This introduces inconsistance
  for dictionary that
  the top level will be structure type (can be accessed by name of
  field) but others will be
  MapType (can be accesses as map).
 
  So deprecated top level dictionary is try to solve this kind of
  inconsistance.
 
  The Row class in pyspark.sql has a similar interface to dict, so you
  can easily convert
  you dic into a Row:
 
  ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d)))
 
  In order to get the correct schema, so we need another argument to
 specify
  the number of rows to be infered? Such as:
 
  inferSchema(rdd, sample=None)
 
  with sample=None, it will take the first row, or it will do the
  sampling to figure out the
  complete schema.
 
  Does this work for you?
 
   Nick
  
  
  
   On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller 
 bmill...@eecs.berkeley.edu
   wrote:
  
   Hi All,
  
   I have a data set where each record is serialized using JSON, and
 I'm
   interested to use SchemaRDDs to work with the data.  Unfortunately
 I've
   hit
   a snag since some fields in the data are maps and list, and are not
   guaranteed to be populated for each record.  This seems to cause
   inferSchema
   to throw an error:
  
   Produces error:
   srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]},
   {'foo':'boom', 'baz':[1,2,3]}]))
  
   Works fine:
   srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar',
 'baz':[1,2,3]},
   {'foo':'boom', 'baz':[]}]))
  
   To be fair inferSchema says it peeks at the first row, so a
 possible
   work-around would be to make sure the type of any collection can be
   determined using the first instance.  However, 

Re: SELECT DISTINCT generates random results?

2014-08-05 Thread Nan Zhu
nvm,   

some problem brought by the ill-formatted raw data  

--  
Nan Zhu


On Tuesday, August 5, 2014 at 3:42 PM, Nan Zhu wrote:

 Hi, all  
  
 I use “SELECT DISTINCT” to query the data saved in hive
  
 it seems that this statement cannot understand the table structure and just 
 output the data in other fields
  
 Anyone met the similar problem before?
  
 Best,  
  
 --  
 Nan Zhu
  
  
  
  




spark-submit symlink

2014-08-05 Thread Koert Kuipers
spark-submit doesnt handle being symlinks currently:
$ spark-submit
/usr/local/bin/spark-submit: line 44: /usr/local/bin/spark-class: No such
file or directory
/usr/local/bin/spark-submit: line 44: exec: /usr/local/bin/spark-class:
cannot execute: No such file or directory

to fix i changed the line that sets SPARK_HOME to:
export SPARK_HOME=$(script=`readlink $0`;cd `dirname $script`/..; pwd)

i have seen people resolve multiple symlinks in a for loop kind of
structure, but thats beyond my bash abilities


spark-ec2 script with VPC

2014-08-05 Thread Erik Shilts
I'm trying to use the spark-ec2 script to launch a spark cluster within a
virtual private cloud (VPC) but I don't see an option for that. Is there a
way to specify the VPC while using the spark-ec2 script?

I found an old spark-incubator mailing list comment which claims to have
added that support, but none of the links from the thread are still active.
http://mail-archives.apache.org/mod_mbox/incubator-spark-dev/201402.mbox/%3cgit-pr-620-incubator-sp...@git.apache.org%3E

The exact error is ssh 255 while generating the cluster's SSH key on
master. The error suggests checking the identity file and key pairs, but I
confirmed on another instance that these are correct.

Please check that you have provided the correct --identity-file and
 --key-pair parameters and try again.


Any help?


[PySpark] [SQL] Going from RDD[dict] to SchemaRDD

2014-08-05 Thread Nicholas Chammas
Forking from this thread
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-inferSchema-tc11449.html.


On Tue, Aug 5, 2014 at 3:01 PM, Davies Liu dav...@databricks.com
http://mailto:dav...@databricks.com wrote:

Before upcoming 1.1 release, we did not support nested structures
 via inferSchema,
 the nested dictionary will be MapType. This introduces inconsistance for
 dictionary that
 the top level will be structure type (can be accessed by name of field)
 but others will be
 MapType (can be accesses as map).

When you mention field access here, do you mean via SQL? Could you provide
a brief code example to illustrate your point?

 The Row class in pyspark.sql has a similar interface to dict, so you
 can easily convert you dic into a Row:

 ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d)))

I just tried that out and it seems to work well.

 In order to get the correct schema, so we need another argument to specify
 the number of rows to be infered? Such as:
 ...

 Does this work for you?

Maybe; I’m not sure just yet. Basically, I’m looking for something
functionally equivalent to this:

sqlContext.jsonRDD(RDD[dict].map(lambda x: json.dumps(x)))

In other words, given an RDD of JSON-serializable Python dictionaries, I
want to be able to infer a schema that is guaranteed to cover the entire
data set. With semi-structured data, it is rarely useful to infer schema by
inspecting just one element.

Does that sound like something we want to support?

Nick
​


issue with spark and bson input

2014-08-05 Thread Dmitriy Selivanov
Hello, I have issue when try to use bson file as spark input. I use
mongo-hadoop-connector 1.3.0 and spark 1.0.0:
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val config = new Configuration()
config.set(mongo.job.input.format,
com.mongodb.hadoop.BSONFileInputFormat)
config.set(mapred.input.dir, file:///root/jobs/dump/input.bson)
config.set(mongo.output.uri, mongodb:// + args(0) + / + args(2))
val mongoRDD = sc.newAPIHadoopFile(file:///root/jobs/dump/input.bson,
classOf[BSONFileInputFormat], classOf[Object], classOf[BSONObject], config)

But on last line I recieve error: inferred type arguments
[Object,org.bson.BSONObject,com.mongodb.hadoop.BSONFileInputFormat] do not
conform to method newAPIHadoopFile's type parameter bounds [K,V,F :
org.apache.hadoop.mapreduce.InputFormat[K,V]]
this is very strange, because BSONFileInputFormat
extends org.apache.hadoop.mapreduce.lib.input.FileInputFormat:
https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/BSONFileInputFormat.java
How I can solve this issue?
I have no problems with com.mongodb.hadoop.MongoInputFormat when use
mongodb collection as input.
And moreover seems there is no problem with java api:
https://github.com/crcsmnky/mongodb-spark-demo/blob/master/src/main/java/com/mongodb/spark/demo/Recommender.java
I'm not professional java/scala developer, please help.

-- 
Regards
Dmitriy Selivanov


Re: Include permalinks in mail footer

2014-08-05 Thread Nicholas Chammas
Looks like this feature has been turned off. Are these changes intentional?
Or perhaps I'm not understanding how it's supposed to work.

Nick



On Fri, Jul 18, 2014 at 12:20 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Looks like this has now been turned on for new threads?


 On Thu, Jul 17, 2014 at 9:11 PM, Tobias Pfeiffer t...@preferred.jp wrote:

  On Jul 17, 2014, at 12:59 PM, Nick Chammas nicholas.cham...@gmail.com
 wrote:

 I often find myself wanting to reference one thread from another, or
 from a JIRA issue. Right now I have to google the thread subject and find
 the link that way.


 +1





Re: [PySpark] [SQL] Going from RDD[dict] to SchemaRDD

2014-08-05 Thread Michael Armbrust

 Maybe; I’m not sure just yet. Basically, I’m looking for something
 functionally equivalent to this:

 sqlContext.jsonRDD(RDD[dict].map(lambda x: json.dumps(x)))

 In other words, given an RDD of JSON-serializable Python dictionaries, I
 want to be able to infer a schema that is guaranteed to cover the entire
 data set. With semi-structured data, it is rarely useful to infer schema by
 inspecting just one element.

 Does that sound like something we want to support?

Yes, I think that would be good.  I'd open a JIRA.

  ​



Re: Understanding RDD.GroupBy OutOfMemory Exceptions

2014-08-05 Thread Jens Kristian Geyti
Patrick Wendell wrote
 In the latest version of Spark we've added documentation to make this
 distinction more clear to users:
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L390

That is a very good addition to the documentation. Nice and clear about the
dangers of groupBy.


Patrick Wendell wrote
 Currently groupBy requires that all
 of the values for one key can fit in memory. 

Is that really true? Will partitions not spill to disk, hence the
recommendation in the documentation to up the parallelism of groupBy et al?

A better question might be: How exactly does partitioning affect groupBy
with regards to memory consumption. What will **have** to fit in memory, and
what may be spilled to disk, if running out of memory? 

And if it really is true, that Spark requires all groups' values to fit in
memory, how do I do a on-disk grouping of results, similar to what I'd to
in a Hadoop job by using a mapper emitting (groupId, value) key-value pairs,
and having an entity reducer writing results to disk?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-tp11427p11487.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Include permalinks in mail footer

2014-08-05 Thread Matei Zaharia
Emails sent from Nabble have it, while others don't. Unfortunately I haven't 
received a reply from ASF infra on this yet.

Matei

On August 5, 2014 at 2:04:10 PM, Nicholas Chammas (nicholas.cham...@gmail.com) 
wrote:

Looks like this feature has been turned off. Are these changes intentional? Or 
perhaps I'm not understanding how it's supposed to work.

Nick



On Fri, Jul 18, 2014 at 12:20 PM, Nicholas Chammas nicholas.cham...@gmail.com 
wrote:
Looks like this has now been turned on for new threads?


On Thu, Jul 17, 2014 at 9:11 PM, Tobias Pfeiffer t...@preferred.jp wrote:
On Jul 17, 2014, at 12:59 PM, Nick Chammas nicholas.cham...@gmail.com wrote:
I often find myself wanting to reference one thread from another, or from a 
JIRA issue. Right now I have to google the thread subject and find the link 
that way.

+1




Re: Configuration setup and Connection refused

2014-08-05 Thread Mayur Rustagi
Spark is not able to communicate with your hadoop hdfs. Is your hdfs
running, if so can you try to explicitly connect to it with hadoop command
line tools giving full hostname  port.
Or test port using
  telnet localhost 9000
In all likelyhood either your hdfs is down, bound to wrong port/ip that
spark cannot access

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Tue, Aug 5, 2014 at 11:33 PM, alamin.ishak alamin.is...@gmail.com
wrote:

 Hi,
 Anyone? Any input would be much appreciated

 Thanks,
 Amin
 On 5 Aug 2014 00:31, Al Amin [hidden email]
 http://user/SendEmail.jtp?type=nodenode=11477i=0 wrote:

 Hi all,

 Any help would be much appreciated.

 Thanks,
 Al


 On Mon, Aug 4, 2014 at 7:09 PM, Al Amin [hidden email]
 http://user/SendEmail.jtp?type=nodenode=11477i=1 wrote:

 Hi all,

 I have setup 2 nodes (master and slave1) on stand alone mode. Tried
 running SparkPi example and its working fine. However when I move on to
  wordcount its giving me below error:

 14/08/04 21:40:33 INFO storage.MemoryStore: ensureFreeSpace(32856)
 called with curMem=0, maxMem=311387750
 14/08/04 21:40:33 INFO storage.MemoryStore: Block broadcast_0 stored as
 values in memory (estimated size 32.1 KB, free 296.9 MB)
 14/08/04 21:40:34 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 0 time(s).
 14/08/04 21:40:35 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 1 time(s).
 14/08/04 21:40:36 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 2 time(s).
 14/08/04 21:40:37 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 3 time(s).
 14/08/04 21:40:38 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 4 time(s).
 14/08/04 21:40:39 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 5 time(s).
 14/08/04 21:40:40 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 6 time(s).
 14/08/04 21:40:41 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 7 time(s).
 14/08/04 21:40:42 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 8 time(s).
 14/08/04 21:40:43 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 9 time(s).
 Exception in thread main java.lang.RuntimeException:
 java.net.ConnectException: Call to master/10.0.1.27:9000 failed on
 connection exception: java.net.ConnectException: Connection refused


 1) how to fix this issue? I have configure hostname --fqdn accordingly.

 2) I could see that in my logs that my master/worker deploy
 configuration is  -Xms512m -Xmx512m. Is there any way that I can increase
 it? Or 512mb is just fine? AFAIK, spark require huge memory.

 3) I have a hadoop cluster and its working. Could anyone point me how to
 integrate Yarn with Spark? Any good tutorial would be very useful


 Thanks,
 Al



 --
 View this message in context: Re: Configuration setup and Connection
 refused
 http://apache-spark-user-list.1001560.n3.nabble.com/Re-Configuration-setup-and-Connection-refused-tp11477.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Include permalinks in mail footer

2014-08-05 Thread Matei Zaharia
Oh actually sorry, it looks like infra has looked at it but they can't add 
permalinks. They can only add here's how to unsubscribe footers. My bad, I 
just didn't catch the email update from them.

Matei

On August 5, 2014 at 2:39:45 PM, Matei Zaharia (matei.zaha...@gmail.com) wrote:

Emails sent from Nabble have it, while others don't. Unfortunately I haven't 
received a reply from ASF infra on this yet.

Matei

On August 5, 2014 at 2:04:10 PM, Nicholas Chammas (nicholas.cham...@gmail.com) 
wrote:

Looks like this feature has been turned off. Are these changes intentional? Or 
perhaps I'm not understanding how it's supposed to work.

Nick



On Fri, Jul 18, 2014 at 12:20 PM, Nicholas Chammas nicholas.cham...@gmail.com 
wrote:
Looks like this has now been turned on for new threads?


On Thu, Jul 17, 2014 at 9:11 PM, Tobias Pfeiffer t...@preferred.jp wrote:
On Jul 17, 2014, at 12:59 PM, Nick Chammas nicholas.cham...@gmail.com wrote:
I often find myself wanting to reference one thread from another, or from a 
JIRA issue. Right now I have to google the thread subject and find the link 
that way.

+1




Re: Include permalinks in mail footer

2014-08-05 Thread Nicholas Chammas
Ah, the user-specific to: address? I see. OK, thanks for looking into it!


On Tue, Aug 5, 2014 at 5:40 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Oh actually sorry, it looks like infra has looked at it but they can't add
 permalinks. They can only add here's how to unsubscribe footers. My bad,
 I just didn't catch the email update from them.

 Matei

 On August 5, 2014 at 2:39:45 PM, Matei Zaharia (matei.zaha...@gmail.com)
 wrote:

  Emails sent from Nabble have it, while others don't. Unfortunately I
 haven't received a reply from ASF infra on this yet.

  Matei

 On August 5, 2014 at 2:04:10 PM, Nicholas Chammas (
 nicholas.cham...@gmail.com) wrote:

  Looks like this feature has been turned off. Are these changes
 intentional? Or perhaps I'm not understanding how it's supposed to work.

 Nick



 On Fri, Jul 18, 2014 at 12:20 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Looks like this has now been turned on for new threads?


 On Thu, Jul 17, 2014 at 9:11 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

  On Jul 17, 2014, at 12:59 PM, Nick Chammas 
 nicholas.cham...@gmail.com wrote:

  I often find myself wanting to reference one thread from another, or
 from a JIRA issue. Right now I have to google the thread subject and find
 the link that way.


 +1






Re: Configuration setup and Connection refused

2014-08-05 Thread Mayur Rustagi
Then dont specify hdfs when you read file.
Also the community is quite active in response in general, just be a little
patient.
Also if possible look at spark training as part of spark summit 2014 vids
and/or amplabs training on spark website.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Aug 6, 2014 at 1:58 AM, Al Amin alamin.is...@gmail.com wrote:

 Finally, someone reply. thank you, sir!
 But I am planning to deploy stand alone mode of Spark. I thought there is
 no need to use hdfs? And my spark is not being built with hadoop/yarn
 config.

 regards,
 Amin


 On Tue, Aug 5, 2014 at 10:39 PM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 Spark is not able to communicate with your hadoop hdfs. Is your hdfs
 running, if so can you try to explicitly connect to it with hadoop command
 line tools giving full hostname  port.
 Or test port using
   telnet localhost 9000
 In all likelyhood either your hdfs is down, bound to wrong port/ip that
 spark cannot access

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Tue, Aug 5, 2014 at 11:33 PM, alamin.ishak alamin.is...@gmail.com
 wrote:

 Hi,
 Anyone? Any input would be much appreciated

 Thanks,
 Amin
 On 5 Aug 2014 00:31, Al Amin [hidden email]
 http://user/SendEmail.jtp?type=nodenode=11477i=0 wrote:

 Hi all,

 Any help would be much appreciated.

 Thanks,
 Al


 On Mon, Aug 4, 2014 at 7:09 PM, Al Amin [hidden email]
 http://user/SendEmail.jtp?type=nodenode=11477i=1 wrote:

 Hi all,

 I have setup 2 nodes (master and slave1) on stand alone mode. Tried
 running SparkPi example and its working fine. However when I move on to
  wordcount its giving me below error:

 14/08/04 21:40:33 INFO storage.MemoryStore: ensureFreeSpace(32856)
 called with curMem=0, maxMem=311387750
 14/08/04 21:40:33 INFO storage.MemoryStore: Block broadcast_0 stored
 as values in memory (estimated size 32.1 KB, free 296.9 MB)
 14/08/04 21:40:34 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 0 time(s).
 14/08/04 21:40:35 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 1 time(s).
 14/08/04 21:40:36 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 2 time(s).
 14/08/04 21:40:37 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 3 time(s).
 14/08/04 21:40:38 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 4 time(s).
 14/08/04 21:40:39 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 5 time(s).
 14/08/04 21:40:40 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 6 time(s).
 14/08/04 21:40:41 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 7 time(s).
 14/08/04 21:40:42 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 8 time(s).
 14/08/04 21:40:43 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 9 time(s).
 Exception in thread main java.lang.RuntimeException:
 java.net.ConnectException: Call to master/10.0.1.27:9000 failed on
 connection exception: java.net.ConnectException: Connection refused


 1) how to fix this issue? I have configure hostname --fqdn
 accordingly.

 2) I could see that in my logs that my master/worker deploy
 configuration is  -Xms512m -Xmx512m. Is there any way that I can increase
 it? Or 512mb is just fine? AFAIK, spark require huge memory.

 3) I have a hadoop cluster and its working. Could anyone point me how
 to integrate Yarn with Spark? Any good tutorial would be very useful


 Thanks,
 Al



 --
 View this message in context: Re: Configuration setup and Connection
 refused
 http://apache-spark-user-list.1001560.n3.nabble.com/Re-Configuration-setup-and-Connection-refused-tp11477.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.






Re: Configuration setup and Connection refused

2014-08-05 Thread Andrew Or
Hi Amin,

This happens usually because your application can't talk to HDFS, and
thinks that the name node is waiting on port 9000 when it's not. Are you
using the EC2 scripts for standalone Spark? You can verify whether or not
the port is correct by checking the configurations with
/root/ephemeral-hdfs/conf. (Are you running HdfsWordCount by any chance?)
As Mayur mentioned, a good way to see whether or not there is any service
listening on port 9000 is telnet.

Andrew


2014-08-05 15:01 GMT-07:00 Mayur Rustagi mayur.rust...@gmail.com:

 Then dont specify hdfs when you read file.
 Also the community is quite active in response in general, just be a
 little patient.
 Also if possible look at spark training as part of spark summit 2014 vids
 and/or amplabs training on spark website.

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Aug 6, 2014 at 1:58 AM, Al Amin alamin.is...@gmail.com wrote:

 Finally, someone reply. thank you, sir!
 But I am planning to deploy stand alone mode of Spark. I thought there is
 no need to use hdfs? And my spark is not being built with hadoop/yarn
 config.

 regards,
 Amin


 On Tue, Aug 5, 2014 at 10:39 PM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 Spark is not able to communicate with your hadoop hdfs. Is your hdfs
 running, if so can you try to explicitly connect to it with hadoop command
 line tools giving full hostname  port.
 Or test port using
   telnet localhost 9000
 In all likelyhood either your hdfs is down, bound to wrong port/ip that
 spark cannot access

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Tue, Aug 5, 2014 at 11:33 PM, alamin.ishak alamin.is...@gmail.com
 wrote:

 Hi,
 Anyone? Any input would be much appreciated

 Thanks,
 Amin
 On 5 Aug 2014 00:31, Al Amin [hidden email]
 http://user/SendEmail.jtp?type=nodenode=11477i=0 wrote:

 Hi all,

 Any help would be much appreciated.

 Thanks,
 Al


 On Mon, Aug 4, 2014 at 7:09 PM, Al Amin [hidden email]
 http://user/SendEmail.jtp?type=nodenode=11477i=1 wrote:

 Hi all,

 I have setup 2 nodes (master and slave1) on stand alone mode. Tried
 running SparkPi example and its working fine. However when I move on to
  wordcount its giving me below error:

 14/08/04 21:40:33 INFO storage.MemoryStore: ensureFreeSpace(32856)
 called with curMem=0, maxMem=311387750
 14/08/04 21:40:33 INFO storage.MemoryStore: Block broadcast_0 stored
 as values in memory (estimated size 32.1 KB, free 296.9 MB)
 14/08/04 21:40:34 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 0 time(s).
 14/08/04 21:40:35 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 1 time(s).
 14/08/04 21:40:36 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 2 time(s).
 14/08/04 21:40:37 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 3 time(s).
 14/08/04 21:40:38 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 4 time(s).
 14/08/04 21:40:39 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 5 time(s).
 14/08/04 21:40:40 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 6 time(s).
 14/08/04 21:40:41 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 7 time(s).
 14/08/04 21:40:42 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 8 time(s).
 14/08/04 21:40:43 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 9 time(s).
 Exception in thread main java.lang.RuntimeException:
 java.net.ConnectException: Call to master/10.0.1.27:9000 failed on
 connection exception: java.net.ConnectException: Connection refused


 1) how to fix this issue? I have configure hostname --fqdn
 accordingly.

 2) I could see that in my logs that my master/worker deploy
 configuration is  -Xms512m -Xmx512m. Is there any way that I can increase
 it? Or 512mb is just fine? AFAIK, spark require huge memory.

 3) I have a hadoop cluster and its working. Could anyone point me how
 to integrate Yarn with Spark? Any good tutorial would be very useful


 Thanks,
 Al



 --
 View this message in context: Re: Configuration setup and Connection
 refused
 http://apache-spark-user-list.1001560.n3.nabble.com/Re-Configuration-setup-and-Connection-refused-tp11477.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.







Re: Understanding RDD.GroupBy OutOfMemory Exceptions

2014-08-05 Thread Patrick Wendell
Hi Jens,

Within a partition things will spill - so the current documentation is
correct. This spilling can only occur *across keys* at the moment. Spilling
cannot occur within a key at present.

This is discussed in the video here:
https://www.youtube.com/watch?v=dmL0N3qfSc8index=3list=PL-x35fyliRwj7qNxXLgMRJaOk7o9inHBZ

Spilling within one key for GroupBy's is likely to end up in the next
release of Spark, Spark 1.2. In most cases we see when users hit this, they
are actually trying to just do aggregations which would be more efficiently
implemented without the groupBy operator.

If the goal is literally to just write out to disk all the values
associated with each group, and the values associated with a single group
are larger than fit in memory, this cannot be accomplished right now with
the groupBy operator.

The best way to work around this depends a bit on what you are trying to do
with the data down stream. Typically approaches involve sub-dividing any
very large groups, for instance, appending a hashed value in a small range
(1-10) to large keys. Then your downstream code has to deal with
aggregating partial values for each group. If your goal is just to lay each
group out sequentially on disk on one big file, you can call `sortByKey`
with a hashed suffix as well. The sort functions are externalized in Spark
1.1 (which is in pre-release).

- Patrick


On Tue, Aug 5, 2014 at 2:39 PM, Jens Kristian Geyti sp...@jkg.dk wrote:

 Patrick Wendell wrote
  In the latest version of Spark we've added documentation to make this
  distinction more clear to users:
 
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L390

 That is a very good addition to the documentation. Nice and clear about the
 dangers of groupBy.


 Patrick Wendell wrote
  Currently groupBy requires that all
  of the values for one key can fit in memory.

 Is that really true? Will partitions not spill to disk, hence the
 recommendation in the documentation to up the parallelism of groupBy et al?

 A better question might be: How exactly does partitioning affect groupBy
 with regards to memory consumption. What will **have** to fit in memory,
 and
 what may be spilled to disk, if running out of memory?

 And if it really is true, that Spark requires all groups' values to fit in
 memory, how do I do a on-disk grouping of results, similar to what I'd to
 in a Hadoop job by using a mapper emitting (groupId, value) key-value
 pairs,
 and having an entity reducer writing results to disk?




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-tp11427p11487.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




[Streaming] Akka-based receiver with messages defined in uploaded jar

2014-08-05 Thread Anton Brazhnyk
Greetings,

I modified ActorWordCount example a little and it uses simple case class as the 
message for Streaming instead of the primitive string.
I also modified launch code to not use run-example script, but set spark master 
in the code and attach the jar (setJars(...)) with all the classes including 
new case class. It runs fine in the local[*] mode but fails with 
ClassNotFoundException in standalone cluster (stacktrace follows).

I assume it's the classloader problems and akka remoting just doesn't know 
about the classes coming to the executor from attached jar.
Am I right?

I guess I could pass primitive values around and do my own (de)serialization 
but maybe there is a better way?
What's the correct way to build custom akka-based receiver with usage of 
non-primitive messages?


Here is the log excerpt with stacktrace:
14/08/04 20:59:41 DEBUG RecurringTimer: Callback for BlockGenerator called at 
time 1407211181800
14/08/04 20:59:41 ERROR Remoting: 
com.genesys.gpe.analytics.akka.messages.SubscribeAck
java.lang.ClassNotFoundException: 
com.genesys.gpe.analytics.akka.messages.SubscribeAck
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
at 
akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
at 
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161)
at akka.serialization.Serialization.deserialize(Serialization.scala:98)
at 
akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
at 
akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


WBR,
Anton

Re: [PySpark] [SQL] Going from RDD[dict] to SchemaRDD

2014-08-05 Thread Nicholas Chammas
SPARK-2870: Thorough schema inference directly on RDDs of Python
dictionaries https://issues.apache.org/jira/browse/SPARK-2870


On Tue, Aug 5, 2014 at 5:07 PM, Michael Armbrust mich...@databricks.com
wrote:

 Maybe; I’m not sure just yet. Basically, I’m looking for something
 functionally equivalent to this:

 sqlContext.jsonRDD(RDD[dict].map(lambda x: json.dumps(x)))

 In other words, given an RDD of JSON-serializable Python dictionaries, I
 want to be able to infer a schema that is guaranteed to cover the entire
 data set. With semi-structured data, it is rarely useful to infer schema by
 inspecting just one element.

 Does that sound like something we want to support?

 Yes, I think that would be good.  I'd open a JIRA.

  ​





Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-05 Thread Brad Miller
Hi All,

I've built and deployed the current head of branch-1.0, but it seems to
have only partly fixed the bug.

This code now runs as expected with the indicated output:
 srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:[1,2,3]}',
'{foo:[4,5,6]}']))
 srdd.printSchema()
root
 |-- foo: ArrayType[IntegerType]
 srdd.collect()
[{u'foo': [1, 2, 3]}, {u'foo': [4, 5, 6]}]

This code still crashes:
 srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}',
'{foo:[[1,2,3], [4,5,6]]}']))
 srdd.printSchema()
root
 |-- foo: ArrayType[ArrayType(IntegerType)]
 srdd.collect()
Py4JJavaError: An error occurred while calling o63.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
3.0:29 failed 4 times, most recent failure: Exception failure in TID 67 on
host kunitz.research.intel-research.net:
net.razorvine.pickle.PickleException: couldn't introspect javabean:
java.lang.IllegalArgumentException: wrong number of arguments

I may be able to see if this is fixed in master, but since it's not fixed
in 1.0.3 it seems unlikely to be fixed in master either. I previously tried
master as well, but ran into a build problem that did not occur with the
1.0 branch.

Can anybody else verify that the second example still crashes (and is meant
to work)? If so, would it be best to modify JIRA-2376 or start a new bug?
https://issues.apache.org/jira/browse/SPARK-2376

best,
-Brad





On Tue, Aug 5, 2014 at 12:10 PM, Brad Miller bmill...@eecs.berkeley.edu
wrote:

 Nick: Thanks for both the original JIRA bug report and the link.

 Michael: This is on the 1.0.1 release.  I'll update to master and
 follow-up if I have any problems.

 best,
 -Brad


 On Tue, Aug 5, 2014 at 12:04 PM, Michael Armbrust mich...@databricks.com
 wrote:

 Is this on 1.0.1?  I'd suggest running this on master or the 1.1-RC which
 should be coming out this week.  Pyspark did not have good support for
 nested data previously.  If you still encounter issues using a more recent
 version, please file a JIRA.  Thanks!


 On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi All,

 I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of
 some JSON data I have, but I've run into some instability involving the
 following java exception:

 An error occurred while calling o1326.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure:
 Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID
 1664 on host neal.research.intel-research.net:
 net.razorvine.pickle.PickleException: couldn't introspect javabean:
 java.lang.IllegalArgumentException: wrong number of arguments

 I've pasted code which produces the error as well as the full traceback
 below.  Note that I don't have any problem when I parse the JSON myself and
 use inferSchema.

 Is anybody able to reproduce this bug?

 -Brad

  srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}',
 '{foo:boom, baz:[1,2,3]}']))
  srdd.printSchema()

 root
  |-- baz: ArrayType[IntegerType]
  |-- foo: StringType

  srdd.collect()


 ---
 Py4JJavaError Traceback (most recent call
 last)
 ipython-input-89-ec7e8e8c68c4 in module()
  1 srdd.collect()

 /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in
 collect(self)
 581 
 582 with _JavaStackTrace(self.context) as st:
 -- 583   bytesInJava = self._jrdd.collect().iterator()
 584 return
 list(self._collect_iterator_through_file(bytesInJava))
 585

 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in
 __call__(self, *args)
 535 answer = self.gateway_client.send_command(command)
 536 return_value = get_return_value(answer,
 self.gateway_client,
 -- 537 self.target_id, self.name)
 538
 539 for temp_arg in temp_args:

 /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in
 get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling
 {0}{1}{2}.\n'.
 -- 300 format(target_id, '.', name), value)
 301 else:
 302 raise Py4JError(

 Py4JJavaError: An error occurred while calling o1326.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure:
 Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID
 1664 on host neal.research.intel-research.net:
 net.razorvine.pickle.PickleException: couldn't introspect javabean:
 java.lang.IllegalArgumentException: wrong number of arguments
 net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603)
 net.razorvine.pickle.Pickler.dispatch(Pickler.java:299)
 net.razorvine.pickle.Pickler.save(Pickler.java:125)
 net.razorvine.pickle.Pickler.put_map(Pickler.java:322)
 

python dependencies loaded but not on PYTHONPATH

2014-08-05 Thread Dominik Hübner
Hey,
I just tried to submit a task to my spark cluster using the following command 

./spark/bin/spark-submit --py-files file:///root/abc.zip --master 
spark://xxx.xxx.xxx.xxx:7077 test.py

It seems like the dependency I’ve added gets loaded:
14/08/05 23:07:00 INFO spark.SparkContext: Added file file:///root/abc.zip at 
http://xxx.xxx.xxx.xxx:40346/files/abc.zip with timestamp 1407280020217

However, my python script can’t find the module in this zip file. I already 
verified if this zip file is not corrupt by install it with “pip install 
abc.zip” (works fine). 
Any ideas how to get the content of the archive to the PYTHONPATH on my master 
and slaves?

Traceback (most recent call last):
  File /root/test.py, line 7, in module
import abc
ImportError: No module named abc

Maybe, it’s just the master complaining since and it only transfers the archive 
to the slaves (and adds it to the PYTHONPATH)?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-05 Thread Nicholas Chammas
This looks to be fixed in master:

 from pyspark.sql import SQLContext sqlContext = SQLContext(sc) 
 sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])
ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315
sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}',
'{foo:[[1,2,3], [4,5,6]]}']))
MapPartitionsRDD[14] at mapPartitions at SchemaRDD.scala:408
sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}',
'{foo:[[1,2,3], [4,5,6]]}'])).printSchema()
root
 |-- foo: array (nullable = true)
 ||-- element: array (containsNull = false)
 |||-- element: integer (containsNull = false)



Nick
​


On Tue, Aug 5, 2014 at 7:12 PM, Brad Miller bmill...@eecs.berkeley.edu
wrote:

 Hi All,

 I've built and deployed the current head of branch-1.0, but it seems to
 have only partly fixed the bug.

 This code now runs as expected with the indicated output:
  srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:[1,2,3]}',
 '{foo:[4,5,6]}']))
  srdd.printSchema()
 root
  |-- foo: ArrayType[IntegerType]
  srdd.collect()
 [{u'foo': [1, 2, 3]}, {u'foo': [4, 5, 6]}]

 This code still crashes:
  srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}',
 '{foo:[[1,2,3], [4,5,6]]}']))
  srdd.printSchema()
 root
  |-- foo: ArrayType[ArrayType(IntegerType)]
  srdd.collect()
 Py4JJavaError: An error occurred while calling o63.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 3.0:29 failed 4 times, most recent failure: Exception failure in TID 67 on
 host kunitz.research.intel-research.net:
 net.razorvine.pickle.PickleException: couldn't introspect javabean:
 java.lang.IllegalArgumentException: wrong number of arguments

 I may be able to see if this is fixed in master, but since it's not fixed
 in 1.0.3 it seems unlikely to be fixed in master either. I previously tried
 master as well, but ran into a build problem that did not occur with the
 1.0 branch.

 Can anybody else verify that the second example still crashes (and is
 meant to work)? If so, would it be best to modify JIRA-2376 or start a new
 bug?
 https://issues.apache.org/jira/browse/SPARK-2376

 best,
 -Brad





 On Tue, Aug 5, 2014 at 12:10 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Nick: Thanks for both the original JIRA bug report and the link.

 Michael: This is on the 1.0.1 release.  I'll update to master and
 follow-up if I have any problems.

 best,
 -Brad


 On Tue, Aug 5, 2014 at 12:04 PM, Michael Armbrust mich...@databricks.com
  wrote:

 Is this on 1.0.1?  I'd suggest running this on master or the 1.1-RC
 which should be coming out this week.  Pyspark did not have good support
 for nested data previously.  If you still encounter issues using a more
 recent version, please file a JIRA.  Thanks!


 On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller bmill...@eecs.berkeley.edu
  wrote:

 Hi All,

 I am interested to use jsonRDD and jsonFile to create a SchemaRDD out
 of some JSON data I have, but I've run into some instability involving the
 following java exception:

 An error occurred while calling o1326.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure:
 Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID
 1664 on host neal.research.intel-research.net:
 net.razorvine.pickle.PickleException: couldn't introspect javabean:
 java.lang.IllegalArgumentException: wrong number of arguments

 I've pasted code which produces the error as well as the full traceback
 below.  Note that I don't have any problem when I parse the JSON myself and
 use inferSchema.

 Is anybody able to reproduce this bug?

 -Brad

  srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}',
 '{foo:boom, baz:[1,2,3]}']))
  srdd.printSchema()

 root
  |-- baz: ArrayType[IntegerType]
  |-- foo: StringType

  srdd.collect()


 ---
 Py4JJavaError Traceback (most recent call
 last)
 ipython-input-89-ec7e8e8c68c4 in module()
  1 srdd.collect()

 /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in
 collect(self)
 581 
 582 with _JavaStackTrace(self.context) as st:
 -- 583   bytesInJava = self._jrdd.collect().iterator()
 584 return
 list(self._collect_iterator_through_file(bytesInJava))
 585

 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in
 __call__(self, *args)
 535 answer = self.gateway_client.send_command(command)
 536 return_value = get_return_value(answer,
 self.gateway_client,
 -- 537 self.target_id, self.name)
 538
 539 for temp_arg in temp_args:

 /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in
 get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling
 {0}{1}{2}.\n'.
 -- 300 

Re: Using sbt-pack with Spark 1.0.0

2014-08-05 Thread lbustelo
Are there any workarounds for this? Seems to be a dead end so far.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-sbt-pack-with-Spark-1-0-0-tp6649p11502.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Streaming] Akka-based receiver with messages defined in uploaded jar

2014-08-05 Thread Tathagata Das
 Can you show us the modified version. The reason could very well be
what you suggest, but I want to understand what conditions lead to
this.

TD

On Tue, Aug 5, 2014 at 3:55 PM, Anton Brazhnyk
anton.brazh...@genesys.com wrote:
 Greetings,



 I modified ActorWordCount example a little and it uses simple case class as
 the message for Streaming instead of the primitive string.

 I also modified launch code to not use run-example script, but set spark
 master in the code and attach the jar (setJars(…)) with all the classes
 including new case class. It runs fine in the local[*] mode but fails with
 ClassNotFoundException in standalone cluster (stacktrace follows).



 I assume it’s the classloader problems and akka remoting just doesn’t know
 about the classes coming to the executor from attached jar.
 Am I right?



 I guess I could pass primitive values around and do my own (de)serialization
 but maybe there is a better way?

 What’s the correct way to build custom akka-based receiver with usage of
 non-primitive messages?





 Here is the log excerpt with stacktrace:

 14/08/04 20:59:41 DEBUG RecurringTimer: Callback for BlockGenerator called
 at time 1407211181800

 14/08/04 20:59:41 ERROR Remoting:
 com.genesys.gpe.analytics.akka.messages.SubscribeAck

 java.lang.ClassNotFoundException:
 com.genesys.gpe.analytics.akka.messages.SubscribeAck

 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

 at java.security.AccessController.doPrivileged(Native Method)

 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

 at java.lang.Class.forName0(Native Method)

 at java.lang.Class.forName(Class.java:270)

 at
 java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)

 at
 akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)

 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610)

 at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)

 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769)

 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)

 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

 at
 akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)

 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

 at
 akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)

 at
 akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)

 at scala.util.Try$.apply(Try.scala:161)

 at
 akka.serialization.Serialization.deserialize(Serialization.scala:98)

 at
 akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)

 at
 akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)

 at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)

 at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)

 at
 akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)

 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)

 at akka.actor.ActorCell.invoke(ActorCell.scala:456)

 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)

 at akka.dispatch.Mailbox.run(Mailbox.scala:219)

 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




 WBR,

 Anton

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unit Test for Spark Streaming

2014-08-05 Thread Tathagata Das
That function is simply deletes a directory recursively. you can use
alternative libraries. see this discussion
http://stackoverflow.com/questions/779519/delete-files-recursively-in-java


On Tue, Aug 5, 2014 at 5:02 PM, JiajiaJing jj.jing0...@gmail.com wrote:
 Hi TD,

 I encountered a problem when trying to run the KafkaStreamSuite.scala unit
 test.
 I added scalatest-maven-plugin to my pom.xml, then ran mvn test, and got
 the follow error message:

 error: object Utils in package util cannot be accessed in package
 org.apache.spark.util
 [INFO] brokerConf.logDirs.foreach { f =
 Utils.deleteRecursively(new File(f)) }
 [INFO]^

 I checked that Utils.scala does exists under
 spark/core/src/main/scala/org/apache/spark/util/, so I have no idea about
 why this access error.
 Could you please help me with this?

 Thank you very much!

 Best Regards,

 Jiajia



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-for-Spark-Streaming-tp11394p11505.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming fails - where is the problem?

2014-08-05 Thread Tathagata Das
@ Simon Any progress?

On Tue, Aug 5, 2014 at 12:17 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 You need to add twitter4j-*-3.0.3.jars to your class path

 Thanks
 Best Regards


 On Tue, Aug 5, 2014 at 7:18 AM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 Are you able to run it locally? If not, can you try creating an
 all-inclusive jar with all transitive dependencies together (sbt
 assembly)  and then try running the app? Then this will be a self
 contained environment, which will help us debug better.

 TD


 On Mon, Aug 4, 2014 at 5:06 PM, durin m...@simon-schaefer.net wrote:
  In the WebUI Environment tab, the section Classpath Entries lists
  the
  following ones as part of System Classpath:
 
  /foo/hadoop-2.0.0-cdh4.5.0/etc/hadoop
 
  /foo/spark-master-2014-07-28/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop2.0.0-cdh4.5.0.jar
  /foo/spark-master-2014-07-28/conf
 
  /foo/spark-master-2014-07-28/external/twitter/target/spark-streaming-twitter_2.10-1.1.0-SNAPSHOT.jar
  /foo/spark-master-2014-07-28/extrajars/twitter4j-core-3.0.3.jar
  /foo/spark-master-2014-07-28/extrajars/twitter4j-stream-3.0.3.jar
 
 
  So I can't see where any other versions would come from.
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11391.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming at-least once guarantee

2014-08-05 Thread Tathagata Das
I can try answering the question even if I am not Sanjeet ;)
There isnt a simple way to do this. In fact the ideal way to do it would be
to create a new InputDStream (just like FileInputDStream
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala)
where you will create hadoop RDDs as SQS messages are received.

But stepping back, I want to understand why do you want to integrate with
Spark Streaming at all? If you already have an working system that runs
Spark jobs when SQS sends a message about new files, then why use Spark
Streaming at all? What is lacking in that implementation? Based on that its
worth going into the effort of implementing a new input stream.

TD


On Tue, Aug 5, 2014 at 12:45 AM, lalit1303 la...@sigmoidanalytics.com
wrote:

 Hi Sanjeet,

 I have been using spark streaming for processing of files present in S3 and
 HDFS.
 I am also using SQS messages for the same purpose as yours i.e. pointer to
 S3 file.
 As of now, I have a separate SQS job which receive message from SQS queue
 and gets the corresponding file from S3.
 Now, I wasnt to integrate the SQS receiver with spark streaming. Like, my
 spark streaming job would listen for new SQS messages and proceed
 accordingly.
 I was wondering if you find any solution to this. Please let me know in
 case!!

 In your above approach, you can achieve #4 in the following way:
 When you are passing a forEach function to be applied on each RDD of
 Dstream, you can pass information of SQS message (lke receipthandle for
 deleting message) associated with that particualar file.
 After success/failure in processing you can perform deletion of your SQS
 message accordingly.


 Thanks
 --Lalit



 -
 Lalit Yadav
 la...@sigmoidanalytics.com
 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-at-least-once-guarantee-tp10902p11419.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: streaming window not behaving as advertised (v1.0.1)

2014-08-05 Thread Tathagata Das
1. udpateStateByKey should be called on all keys even if there is not data
corresponding to that key. There is a unit test for that.
https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala#L337

2. I am increasing the priority for this. Off the top of my head, this is
easy to fix, but hard to test reliably test in a unit test. Will fix it
soon after Spark 1.1 release.

TD


On Fri, Aug 1, 2014 at 7:37 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Hi TD,

 I've also been fighting this issue only to find the exact same solution you
 are suggesting.
 Too bad I didn't find either the post or the issue sooner.

 I'm using a 1 second batch with N amount of kafka events (1 to 1 with the
 state objects) per batch and only calling the updatestatebykey function.

 This is my interpretation, please correct me if needed:
 Because of Spark’s lazy computation the RDDs weren’t being updated as
 expected on the batch interval execution. The assumption was that as long
 as
 I have a streaming batch run (with or without new messages), I should get
 updated RDDs, which was not happening. We only get updateStateByKey calls
 for objects which got events or that are forced through an output function
 to compute. I did not make further test to confirm this, but that's the
 given impression.

 This doesn't fit our requirements as we want to do duration updates based
 on
 the batch interval execution...so I had to force the computation of all the
 objects through the ForeachRDD function.

 I will also appreciate if the priority can be increased to the issue. I
 assume the ForeachRDD is additional unnecessary resource allocation
 (although I'm not sure how much) as opposite to doing it somehow by default
 on batch interval execution.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/streaming-window-not-behaving-as-advertised-v1-0-1-tp10453p11168.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Can't zip RDDs with unequal numbers of partitions

2014-08-05 Thread Bin
Hi All,


I met the titled error. This exception occured in line 223, as shown below:


212 // read files
213 val lines = 
sc.textFile(path_edges).map(line=line.split(,)).map(line=((line(0), 
line(1)), line(2).toDouble)).reduceByKey(_+
_).cache
214 
215 val lines_vertices = lines.map{line=(line._1._1, 
Map(nameHash(line._1._2)-line._2))}.reduceByKey(_++_).cache
216 
217 val name_shadow = _shadow
218 
219 val nodes =
220 lines_vertices
221 .map{line=(nameHash(line._1), (1, Map[VertexId,Double](), 
line._1))} ++
222 lines_vertices
223 .map{line=(nameHash(line._1 + name_shadow), (2,line._2, line._1 + 
name_shadow))} ++
224 lines
225 .map{line=(nameHash(line._1._2), (3, Map[VertexId,Double](), 
line._1._2))}


Sorry for posting the source codes, but I couldn't think of a better way. 


I am confused how come the partitions were unequal, and how I can control the 
number of partitions of these RDD. Can someone give me some advice on this 
problem?


Thanks very much!


Best,
Bin

Save an RDD to a SQL Database

2014-08-05 Thread Vida Ha
Hi,

I would like to save an RDD to a SQL database.  It seems like this would be
a common enough use case.  Are there any built in libraries to do it?

Otherwise, I'm just planning on mapping my RDD, and having that call a
method to write to the database.   Given that a lot of records are going to
be written, the code would need to be smart and do a batch insert after
enough records have collected.  Does that sound like a reasonable approach?


-Vida


Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-05 Thread Brad Miller
Hi All,

I checked out and built master.  Note that Maven had a problem building
Kafka (in my case, at least); I was unable to fix this easily so I moved on
since it seemed unlikely to have any influence on the problem at hand.

Master improves functionality (including the example Nicholas just
demonstrated) but unfortunately there still seems to be a bug related to
using dictionaries as values.  I've put some code below to illustrate the
bug.

*# dictionary as value works fine*
 print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1:
value}}'])).collect()
[Row(key0=Row(key1=u'value'))]

*# dictionary as value works fine, even when inner keys are varied*
 print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}',
'{key0: {key2: value2}}'])).collect()
[Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None,
key2=u'value2'))]

*# dictionary as value works fine when inner keys are missing and outer key
is present*
 print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1:
value1}}'])).collect()
[Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))]

*# dictionary as value FAILS when outer key is missing*
* print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1:
value1}}'])).collect()*
Py4JJavaError: An error occurred while calling o84.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in
stage 7.0 (TID 242, engelland.research.intel-research.net):
java.lang.NullPointerException...

*# dictionary as value FAILS when outer key is present with null value*
* print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0:
{key1: value1}}'])).collect()*
Py4JJavaError: An error occurred while calling o98.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in
stage 9.0 (TID 305, kunitz.research.intel-research.net):
java.lang.NullPointerException...

*# nested lists work even when outer key is missing*
 print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0, item1],
[item2, item3]]}'])).collect()
[Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])]

Is anyone able to replicate this behavior?

-Brad




On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com
wrote:

 We try to keep master very stable, but this is where active development
 happens. YMMV, but a lot of people do run very close to master without
 incident (myself included).

 branch-1.0 has been cut for a while and we only merge bug fixes into it
 (this is more strict for non-alpha components like spark core.).  For Spark
 SQL, this branch is pretty far behind as the project is very young and we
 are fixing bugs / adding features very rapidly compared with Spark core.

 branch-1.1 was just cut and is being QAed for a release, at this point its
 likely the same as master, but that will change as features start getting
 added to master in the coming weeks.



 On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 collect() works, too.

  sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', 
  '{foo:[[1,2,3], [4,5,6]]}'])).collect()
 [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])]

 Can’t answer your question about branch stability, though. Spark is a
 very active project, so stuff is happening all the time.

 Nick
 ​


 On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi Nick,

 Can you check that the call to collect() works as well as
 printSchema()?  I actually experience that printSchema() works fine,
 but then it crashes on collect().

 In general, should I expect the master (which seems to be on branch-1.1)
 to be any more/less stable than branch-1.0?  While it would be great to
 have this fixed, it would be good to know if I should expect lots of other
 instability.

 best,
 -Brad


 On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 This looks to be fixed in master:

  from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
  sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], 
  [4,5,6]]}'





 ])
 ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315 
 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', 
 '{foo:[[1,2,3], [4,5,6]]}']))
 MapPartitionsRDD[14] at mapPartitions at SchemaRDD.scala:408 
 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', 
 '{foo:[[1,2,3], [4,5,6]]}'])).printSchema()
 root
  |-- foo: array (nullable = true)
  ||-- element: array (containsNull = false)
  |||-- element: integer (containsNull = false)

 

 Nick
 ​


 On Tue, Aug 5, 2014 at 7:12 PM, Brad Miller bmill...@eecs.berkeley.edu
  wrote:

 Hi All,

 I've built and deployed the current head of branch-1.0, but it seems
 to have only partly fixed the bug.

 This code now runs as expected with the indicated output:
  srdd = 

Re: pyspark inferSchema

2014-08-05 Thread Brad Miller
I've followed up in a thread more directly related to jsonRDD and jsonFile,
but it seems like after building from the current master I'm still having
some problems with nested dictionaries.

http://apache-spark-user-list.1001560.n3.nabble.com/trouble-with-jsonRDD-and-jsonFile-in-pyspark-tp11461p11517.html


On Tue, Aug 5, 2014 at 12:56 PM, Yin Huai yh...@databricks.com wrote:

 Yes, 2376 has been fixed in master. Can you give it a try?

 Also, for inferSchema, because Python is dynamically typed, I agree with
 Davies to provide a way to scan a subset (or entire) of the dataset to
 figure out the proper schema. We will take a look it.

 Thanks,

 Yin


 On Tue, Aug 5, 2014 at 12:20 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Assuming updating to master fixes the bug I was experiencing with jsonRDD
 and jsonFile, then pushing sample to master will probably not be
 necessary.

 We believe that the link below was the bug I experienced, and I've been
 told it is fixed in master.

 https://issues.apache.org/jira/browse/SPARK-2376

 best,
 -brad


 On Tue, Aug 5, 2014 at 12:18 PM, Davies Liu dav...@databricks.com
 wrote:

 This sample argument of inferSchema is still no in master, if will
 try to add it if it make
 sense.

 On Tue, Aug 5, 2014 at 12:14 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:
  Hi Davies,
 
  Thanks for the response and tips.  Is the sample argument to
 inferSchema
  available in the 1.0.1 release of pyspark?  I'm not sure (based on the
  documentation linked below) that it is.
 
 http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema
 
  It sounds like updating to master may help address my issue (and may
 also
  make the sample argument available), so I'm going to go ahead and do
 that.
 
  best,
  -Brad
 
 
  On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com
 wrote:
 
  On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas
  nicholas.cham...@gmail.com wrote:
   I was just about to ask about this.
  
   Currently, there are two methods, sqlContext.jsonFile() and
   sqlContext.jsonRDD(), that work on JSON text and infer a schema that
   covers
   the whole data set.
  
   For example:
  
   from pyspark.sql import SQLContext
   sqlContext = SQLContext(sc)
  
   a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}',
   '{foo:boom, baz:[1,2,3]}']))
   a.printSchema()
   root
|-- baz: array (nullable = true)
||-- element: integer (containsNull = false)
|-- foo: string (nullable = true)
  
   It works really well! It handles fields with inconsistent value
 types by
   inferring a value type that covers all the possible values.
  
   But say you’ve already deserialized the JSON to do some
 pre-processing
   or
   filtering. You’d commonly want to do this, say, to remove bad data.
 So
   now
   you have an RDD of Python dictionaries, as opposed to an RDD of JSON
   strings. It would be perfect if you could get the completeness of
 the
   json...() methods, but against dictionaries.
  
   Unfortunately, as you noted, inferSchema() only looks at the first
   element
   in the set. Furthermore, inferring schemata from RDDs of
 dictionaries is
   being deprecated in favor of doing so from RDDs of Rows.
  
   I’m not sure what the intention behind this move is, but as a user
 I’d
   like
   to be able to convert RDDs of dictionaries directly to SchemaRDDs
 with
   the
   completeness of the jsonRDD()/jsonFile() methods. Right now if I
 really
   want
   that, I have to serialize the dictionaries to JSON text and then
 call
   jsonRDD(), which is expensive.
 
  Before upcoming 1.1 release, we did not support nested structures via
  inferSchema,
  the nested dictionary will be MapType. This introduces inconsistance
  for dictionary that
  the top level will be structure type (can be accessed by name of
  field) but others will be
  MapType (can be accesses as map).
 
  So deprecated top level dictionary is try to solve this kind of
  inconsistance.
 
  The Row class in pyspark.sql has a similar interface to dict, so you
  can easily convert
  you dic into a Row:
 
  ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d)))
 
  In order to get the correct schema, so we need another argument to
 specify
  the number of rows to be infered? Such as:
 
  inferSchema(rdd, sample=None)
 
  with sample=None, it will take the first row, or it will do the
  sampling to figure out the
  complete schema.
 
  Does this work for you?
 
   Nick
  
  
  
   On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller 
 bmill...@eecs.berkeley.edu
   wrote:
  
   Hi All,
  
   I have a data set where each record is serialized using JSON, and
 I'm
   interested to use SchemaRDDs to work with the data.  Unfortunately
 I've
   hit
   a snag since some fields in the data are maps and list, and are not
   guaranteed to be populated for each record.  This seems to cause
   inferSchema
   to throw an error:
  
   Produces error:
   srdd = 

Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-05 Thread Yin Huai
I tried jsonRDD(...).printSchema() and it worked. Seems the problem is when
we take the data back to the Python side, SchemaRDD#javaToPython failed on
your cases. I have created https://issues.apache.org/jira/browse/SPARK-2875
to track it.

Thanks,

Yin


On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller bmill...@eecs.berkeley.edu
wrote:

 Hi All,

 I checked out and built master.  Note that Maven had a problem building
 Kafka (in my case, at least); I was unable to fix this easily so I moved on
 since it seemed unlikely to have any influence on the problem at hand.

 Master improves functionality (including the example Nicholas just
 demonstrated) but unfortunately there still seems to be a bug related to
 using dictionaries as values.  I've put some code below to illustrate the
 bug.

 *# dictionary as value works fine*
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1:
 value}}'])).collect()
 [Row(key0=Row(key1=u'value'))]

 *# dictionary as value works fine, even when inner keys are varied*
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}',
 '{key0: {key2: value2}}'])).collect()
 [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None,
 key2=u'value2'))]

 *# dictionary as value works fine when inner keys are missing and outer
 key is present*
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1:
 value1}}'])).collect()
 [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))]

 *# dictionary as value FAILS when outer key is missing*
 * print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1:
 value1}}'])).collect()*
 Py4JJavaError: An error occurred while calling o84.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in
 stage 7.0 (TID 242, engelland.research.intel-research.net):
 java.lang.NullPointerException...

 *# dictionary as value FAILS when outer key is present with null value*
 * print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0:
 {key1: value1}}'])).collect()*
 Py4JJavaError: An error occurred while calling o98.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in
 stage 9.0 (TID 305, kunitz.research.intel-research.net):
 java.lang.NullPointerException...

 *# nested lists work even when outer key is missing*
  print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0,
 item1], [item2, item3]]}'])).collect()
 [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])]

 Is anyone able to replicate this behavior?

 -Brad




 On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com
 wrote:

 We try to keep master very stable, but this is where active development
 happens. YMMV, but a lot of people do run very close to master without
 incident (myself included).

 branch-1.0 has been cut for a while and we only merge bug fixes into it
 (this is more strict for non-alpha components like spark core.).  For Spark
 SQL, this branch is pretty far behind as the project is very young and we
 are fixing bugs / adding features very rapidly compared with Spark core.

 branch-1.1 was just cut and is being QAed for a release, at this point
 its likely the same as master, but that will change as features start
 getting added to master in the coming weeks.



 On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 collect() works, too.

  sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', 
  '{foo:[[1,2,3], [4,5,6]]}'])).collect()
 [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])]

 Can’t answer your question about branch stability, though. Spark is a
 very active project, so stuff is happening all the time.

 Nick
 ​


 On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi Nick,

 Can you check that the call to collect() works as well as
 printSchema()?  I actually experience that printSchema() works fine,
 but then it crashes on collect().

 In general, should I expect the master (which seems to be on
 branch-1.1) to be any more/less stable than branch-1.0?  While it would be
 great to have this fixed, it would be good to know if I should expect lots
 of other instability.

 best,
 -Brad


 On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 This looks to be fixed in master:

  from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
  sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], 
  [4,5,6]]}'






 ])
 ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315 
 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', 
 '{foo:[[1,2,3], [4,5,6]]}']))
 MapPartitionsRDD[14] at mapPartitions at SchemaRDD.scala:408 
 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', 
 '{foo:[[1,2,3], [4,5,6]]}'])).printSchema()
 root
  |-- foo: array (nullable = true)
  ||-- 

Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-05 Thread Brad Miller
I concur that printSchema works; it just seems to be operations that use
the data where trouble happens.

Thanks for posting the bug.

-Brad


On Tue, Aug 5, 2014 at 10:05 PM, Yin Huai yh...@databricks.com wrote:

 I tried jsonRDD(...).printSchema() and it worked. Seems the problem is
 when we take the data back to the Python side, SchemaRDD#javaToPython
 failed on your cases. I have created
 https://issues.apache.org/jira/browse/SPARK-2875 to track it.

 Thanks,

 Yin


 On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi All,

 I checked out and built master.  Note that Maven had a problem building
 Kafka (in my case, at least); I was unable to fix this easily so I moved on
 since it seemed unlikely to have any influence on the problem at hand.

 Master improves functionality (including the example Nicholas just
 demonstrated) but unfortunately there still seems to be a bug related to
 using dictionaries as values.  I've put some code below to illustrate the
 bug.

 *# dictionary as value works fine*
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1:
 value}}'])).collect()
 [Row(key0=Row(key1=u'value'))]

 *# dictionary as value works fine, even when inner keys are varied*
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}',
 '{key0: {key2: value2}}'])).collect()
 [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None,
 key2=u'value2'))]

 *# dictionary as value works fine when inner keys are missing and outer
 key is present*
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1:
 value1}}'])).collect()
 [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))]

 *# dictionary as value FAILS when outer key is missing*
 * print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1:
 value1}}'])).collect()*
 Py4JJavaError: An error occurred while calling o84.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in
 stage 7.0 (TID 242, engelland.research.intel-research.net):
 java.lang.NullPointerException...

 *# dictionary as value FAILS when outer key is present with null value*
 * print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0:
 {key1: value1}}'])).collect()*
 Py4JJavaError: An error occurred while calling o98.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in
 stage 9.0 (TID 305, kunitz.research.intel-research.net):
 java.lang.NullPointerException...

 *# nested lists work even when outer key is missing*
  print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0,
 item1], [item2, item3]]}'])).collect()
 [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])]

 Is anyone able to replicate this behavior?

  -Brad




 On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com
 wrote:

 We try to keep master very stable, but this is where active development
 happens. YMMV, but a lot of people do run very close to master without
 incident (myself included).

 branch-1.0 has been cut for a while and we only merge bug fixes into it
 (this is more strict for non-alpha components like spark core.).  For Spark
 SQL, this branch is pretty far behind as the project is very young and we
 are fixing bugs / adding features very rapidly compared with Spark core.

 branch-1.1 was just cut and is being QAed for a release, at this point
 its likely the same as master, but that will change as features start
 getting added to master in the coming weeks.



 On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 collect() works, too.

  sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', 
  '{foo:[[1,2,3], [4,5,6]]}'])).collect()
 [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])]

 Can’t answer your question about branch stability, though. Spark is a
 very active project, so stuff is happening all the time.

 Nick
 ​


 On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller bmill...@eecs.berkeley.edu
  wrote:

 Hi Nick,

 Can you check that the call to collect() works as well as
 printSchema()?  I actually experience that printSchema() works fine,
 but then it crashes on collect().

 In general, should I expect the master (which seems to be on
 branch-1.1) to be any more/less stable than branch-1.0?  While it would be
 great to have this fixed, it would be good to know if I should expect lots
 of other instability.

 best,
 -Brad


 On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 This looks to be fixed in master:

  from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
  sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], 
  [4,5,6]]}'







 ])
 ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315 
 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', 
 '{foo:[[1,2,3], [4,5,6]]}']))
 

type issue: found RDD[T] expected RDD[A]

2014-08-05 Thread Amit Kumar
Hi All,

I am having some trouble trying to write generic code that uses sqlContext
and RDDs. Can you suggest what might be wrong?

 class SparkTable[T : ClassTag](val sqlContext:SQLContext, val extractor:
(String) = (T) ) {

  private[this] var location:Option[String] =None
  private[this] var name:Option[String]=None
  private[this] val sc = sqlContext.sparkContext
  ...

def makeRDD(sqlQuery:String):SchemaRDD={
require(this.location!=None)
require(this.name!=None)
import sqlContext._
val rdd:RDD[String] = sc.textFile(this.location.get)
val rddT:RDD[T] = rdd.map(extractor)
val schemaRDD:SchemaRDD= createSchemaRDD(rddT)
schemaRDD.registerAsTable(name.get)
val all = sqlContext.sql(sqlQuery)
all
  }

}

I use it as below:

 def extractor(line:String):POJO={
  val splits= line.split(pattern).toList
  POJO(splits(0),splits(1),splits(2),splits(3))
}

   val pojoTable:SparkTable[POJO] = new
SparkTable[POJO](sqlContext,extractor)

val identityData:SchemaRDD=
pojoTable.atLocation(hdfs://location/table)
  .withName(pojo)
  .makeRDD(SELECT * FROM pojo)


I get compilation failure

inferred type arguments [T] do not conform to method createSchemaRDD's type
parameter bounds [A : Product]
[error] val schemaRDD:SchemaRDD= createSchemaRDD(rddT)
[error]  ^
[error]  SparkTable.scala:37: type mismatch;
[error]  found   : org.apache.spark.rdd.RDD[T]
[error]  required: org.apache.spark.rdd.RDD[A]
[error] val schemaRDD:SchemaRDD= createSchemaRDD(rddT)
[error]  ^
[error] two errors found

I am probably missing something basic either in scala reflection/types or
implicits?

Any hints would be appreciated.

Thanks
Amit