Re: Understanding Spark's caching

2015-04-28 Thread ayan guha
Hi I replied you in SO. If option A had a action call then it should
suffice too.
On 28 Apr 2015 05:30, Eran Medan eran.me...@gmail.com wrote:

 Hi Everyone!

 I'm trying to understand how Spark's cache work.

 Here is my naive understanding, please let me know if I'm missing
 something:

 val rdd1 = sc.textFile(some data)
 rdd.cache() //marks rdd as cached
 val rdd2 = rdd1.filter(...)
 val rdd3 = rdd1.map(...)
 rdd2.saveAsTextFile(...)
 rdd3.saveAsTextFile(...)

 In the above, rdd1 will be loaded from disk (e.g. HDFS) only once. (when
 rdd2 is saved I assume) and then from cache (assuming there is enough RAM)
 when rdd3 is saved)

 Now here is my question. Let's say I want to cache rdd2 and rdd3 as they
 will both be used later on, but I don't need rdd1 after creating them.

 Basically there is duplication, isn't it? Since once rdd2 and rdd3 are
 calculated, I don't need rdd1 anymore, I should probably unpersist it,
 right? the question is when?

 *Will this work? (Option A)*

 val rdd1 = sc.textFile(some data)
 rdd.cache() //marks rdd as cached
 val rdd2 = rdd1.filter(...)
 val rdd3 = rdd1.map(...)
 rdd2.cache()
 rdd3.cache()
 rdd1.unpersist()

 Does spark add the unpersist call to the DAG? or is it done immediately?
 if it's done immediately, then basically rdd1 will be non cached when I
 read from rdd2 and rdd3, right?

 *Should I do it this way instead (Option B)?*

 val rdd1 = sc.textFile(some data)
 rdd.cache() //marks rdd as cached
 val rdd2 = rdd1.filter(...)
 val rdd3 = rdd1.map(...)

 rdd2.cache()
 rdd3.cache()

 rdd2.saveAsTextFile(...)
 rdd3.saveAsTextFile(...)

 rdd1.unpersist()

 *So the question is this:* Is Option A good enough? e.g. will rdd1 be
 still accessing the file only once? Or do I need to go with Option B?

 (see also
 http://stackoverflow.com/questions/29903675/understanding-sparks-caching)

 Thanks in advance



Re: How to add jars to standalone pyspark program

2015-04-28 Thread jamborta
ah, just noticed that you are using an external package, you can add that
like this

conf = (SparkConf().set(spark.jars, jar_path))

or if it is a python package:

sc.addPyFile()



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jars-to-standalone-pyspark-program-tp22685p22688.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ?

2015-04-28 Thread ๏̯͡๏
Worked now.




On Mon, Apr 27, 2015 at 10:20 PM, Sean Owen so...@cloudera.com wrote:

 Works fine for me. Make sure you're not downloading the HTML
 redirector page and thinking it's the archive.

 On Mon, Apr 27, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:
  I downloaded 1.3.1 hadoop 2.4 prebuilt package (tar) from multiple
 mirrors
  and direct link. Each time i untar i get below error
 
  spark-1.3.1-bin-hadoop2.4/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:
 (Empty
  error message)
 
  tar: Error exit delayed from previous errors
 
 
  Is it broken ?
 
 
  --
  Deepak
 




-- 
Deepak


Re: How to add jars to standalone pyspark program

2015-04-28 Thread jamborta
Hi Mark,

That does not look like an python path issue, spark-assembly jar should have
those packaged, and should make it available for the workers. Have you built
the jar yourself?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jars-to-standalone-pyspark-program-tp22685p22687.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Serialization error

2015-04-28 Thread ๏̯͡๏
arguments are values of it. The name of the argument is important and all
you need to do is specify those when your creating SparkConf object.

Glad it worked.

On Tue, Apr 28, 2015 at 5:20 PM, madhvi madhvi.gu...@orkash.com wrote:

  Thankyou Deepak.It worked.
 Madhvi
 On Tuesday 28 April 2015 01:39 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote:

  val conf = new SparkConf()

   .setAppName(detail)

   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

   .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize
 ).get)

   .set(spark.kryoserializer.buffer.max.mb, arguments.get(
 maxbuffersize).get)

   .set(spark.driver.maxResultSize, arguments.get(maxResultSize
 ).get)

 .registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key]))


  Can you try this ?

 On Tue, Apr 28, 2015 at 11:11 AM, madhvi madhvi.gu...@orkash.com wrote:

 Hi,

 While connecting to accumulo through spark by making sparkRDD I am
 getting the following error:
  object not serializable (class: org.apache.accumulo.core.data.Key)

 This is due to the 'key' class of accumulo which does not implement
 serializable interface.How it can be solved and accumulo can be used with
 spark

 Thanks
 Madhvi

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




  --
  Deepak





-- 
Deepak


Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

2015-04-28 Thread Todd Nist
Can you simply apply the
https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.util.StatCounter
to this?  You should be able to do something like this:

val stats = RDD.map(x = x._2).stats()

-Todd

On Tue, Apr 28, 2015 at 10:00 AM, subscripti...@prismalytics.io 
subscripti...@prismalytics.io wrote:

  Hello Friends:

 I generated a Pair RDD with K/V pairs, like so:

 
  rdd1.take(10) # Show a small sample.
  [(u'2013-10-09', 7.60117302052786),
  (u'2013-10-10', 9.322709163346612),
  (u'2013-10-10', 28.264462809917358),
  (u'2013-10-07', 9.664429530201343),
  (u'2013-10-07', 12.461538461538463),
  (u'2013-10-09', 20.76923076923077),
  (u'2013-10-08', 11.842105263157894),
  (u'2013-10-13', 32.32514177693762),
  (u'2013-10-13', 26.246),
  (u'2013-10-13', 10.693069306930692)]

 Now from the above RDD, I would like to calculate an average of the VALUES
 for each KEY.
 I can do so as shown here, which does work:

  countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of
 countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
  rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerator (i.e.
 the SUM).
  rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) #
 Divide each SUM by it's denominator (i.e. COUNT)
  print(rdd1.collect())
   [(u'2013-10-09', 11.235365503035176),
(u'2013-10-07', 23.39500642456595),
... snip ...
   ]

 But I wonder if the above semantics/approach is the optimal one; or
 whether perhaps there is a single API call
 that handles common use case.

 Improvement thoughts welcome. =:)

 Thank you,
 nmv



Single stream with series of transformations

2015-04-28 Thread jc.francisco
Hi,
I'm following the pattern of filtering data by a certain criteria, and then
saving the results to a different table. The code below illustrates the
idea.
The simple integration test I wrote suggests it works, simply asserting
filtered data should be in their respective tables after being filtered.
I could be adding more filters in this stream in the future. Is this a bad
idea?
public void run() { JavaPairReceiverInputDStreamlt;String, 
Rawgt; stream
= KafkaUtils.createStream(...);//Save to Master Table   
stream.map((rawTuple) -gt; { return rawTuple._2;   
}).foreach((rawRDD) -gt; {  
javaFunctions(rawRDD).writerBuilder(keyspace, rawTable,
mapToRow(Raw.class)).saveToCassandra();   return null;  
});   //Process stream again, filter by a criteria, and
save to a different table   stream.map((rawTuple) -gt; {   

return rawTuple._2;   }).filter((raw) -gt; {   
//Filter by some criteria   }).map((raw) -gt; {   
return new Criteria1(raw.getSomeField());   }).foreachRDD((rdd)
-gt; { javaFunctions(rdd).writerBuilder(keyspace,
criteria1Table, mapToRow(Criteria1.class)).saveToCassandra(); 
return null;  });  //Process stream for 3rd time, by
a different criteria, and then save to a different table 
stream.map((rawTuple) -gt; {  return rawTuple._2; 
}).filter((raw) -gt; {//filter by another criteria 
}).map((raw) -gt; {  return new Criteria2(raw.getOtherField());
 
}).foreachRDD((rdd) -gt; {  
javaFunctions(rdd).writerBuilder(keyspace, criteria2Table,
mapToRow(Criteria2.class)).saveToCassandra();   return null;
});streamingContext.start();}}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Single-stream-with-series-of-transformations-tp22689.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark partitioning question

2015-04-28 Thread Marius Danciu
Thank you Silvio,

I am aware of groubBy limitations and this is subject for replacement.

I did try repartitionAndSortWithinPartitions but then I end up with maybe
too much shuffling one from groupByKey and the other from repartition.

My expectation was that since N records are partitioned to the same
partition ...say 0, doing a mapPartition on the resulting RDD would place
all records for partition 0 into a single on a single node. Seems to me
that this is not quite the case since N can span to multiple HDFS blocks
and subsequent mapPartition operation would be paralelized on multiple
nodes. In my case I see 2 yarn containers receiving records during a
mapPartition operation applied on the sorted partition. I need to test more
but it seems that applying the same partitioner again right before the
last mapPartition can
help.

Best,
Marius

On Tue, Apr 28, 2015 at 4:40 PM Silvio Fiorito 
silvio.fior...@granturing.com wrote:

   Hi Marius,

  What’s the expected output?

  I would recommend avoiding the groupByKey if possible since it’s going
 to force all records for each key to go to an executor which may overload
 it.

  Also if you need to sort and repartition, try using
 repartitionAndSortWithinPartitions to do it in one shot.

  Thanks,
 Silvio

   From: Marius Danciu
 Date: Tuesday, April 28, 2015 at 8:10 AM
 To: user
 Subject: Spark partitioning question

   Hello all,

  I have the following Spark (pseudo)code:

  rdd = mapPartitionsWithIndex(...)
 .mapPartitionsToPair(...)
 .groupByKey()
 .sortByKey(comparator)
 .partitionBy(myPartitioner)
  .mapPartitionsWithIndex(...)
 .mapPartitionsToPair( *f* )

  The input data has 2 input splits (yarn 2.6.0).
 myPartitioner partitions all the records on partition 0, which is correct,
 so the intuition is that f provided to the last transformation
 (mapPartitionsToPair) would run sequentially inside a single yarn
 container. However from yarn logs I do see that both yarn containers are
 processing records from the same partition ... and *sometimes*  the over
 all job fails (due to the code in f which expects a certain order of
 records) and yarn container 1 receives the records as expected, whereas
 yarn container 2 receives a subset of records ... for a reason I cannot
 explain and f fails.

  The overall behavior of this job is that sometimes it succeeds and
 sometimes it fails ... apparently due to inconsistent propagation of sorted
 records to yarn containers.


  If any of this makes any sense to you, please let me know what I am
 missing.



  Best,
 Marius



Re: Scalability of group by

2015-04-28 Thread Richard Marscher
Hi,

I can offer a few ideas to investigate in regards to your issue here. I've
run into resource issues doing shuffle operations with a much smaller
dataset than 2B. The data is going to be saved to disk by the BlockManager
as part of the shuffle and then redistributed across the cluster as
relevant to the group by. So the data is going to be replicated during the
operation.

I might suggest trying to allocate more memory for your executors in your
cluster. You might also want to look into configuring more explicitly the
shuffle functionality (
https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior).
Check the disk usage on the worker nodes, in our case we actually had small
disk space to start with and were running out of temporary space for the
shuffle operation.

I believe you should also be able to find more clear errors in logs from
the worker nodes if you haven't checked yet.

On Mon, Apr 27, 2015 at 10:02 PM, Ulanov, Alexander alexander.ula...@hp.com
 wrote:

  It works on a smaller dataset of 100 rows. Probably I could find the
 size when it fails using binary search. However, it would not help me
 because I need to work with 2B rows.



 *From:* ayan guha [mailto:guha.a...@gmail.com]
 *Sent:* Monday, April 27, 2015 6:58 PM
 *To:* Ulanov, Alexander
 *Cc:* user@spark.apache.org
 *Subject:* Re: Scalability of group by



 Hi

 Can you test on a smaller dataset to identify if it is cluster issue or
 scaling issue in spark

 On 28 Apr 2015 11:30, Ulanov, Alexander alexander.ula...@hp.com wrote:

  Hi,



 I am running a group by on a dataset of 2B of RDD[Row [id, time, value]]
 in Spark 1.3 as follows:

 “select id, time, first(value) from data group by id, time”



 My cluster is 8 nodes with 16GB RAM and one worker per node. Each executor
 is allocated with 5GB of memory. However, all executors are being lost
 during the query execution and I get “ExecutorLostFailure”.



 Could you suggest what might be the reason for it? Could it be that “group
 by” is implemented as RDD.groupBy so it holds the group by result in
 memory? What is the workaround?



 Best regards, Alexander




Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

2015-04-28 Thread subscripti...@prismalytics.io

Hello Friends:

I generated a Pair RDD with K/V pairs, like so:


 rdd1.take(10) # Show a small sample.
 [(u'2013-10-09', 7.60117302052786),
 (u'2013-10-10', 9.322709163346612),
 (u'2013-10-10', 28.264462809917358),
 (u'2013-10-07', 9.664429530201343),
 (u'2013-10-07', 12.461538461538463),
 (u'2013-10-09', 20.76923076923077),
 (u'2013-10-08', 11.842105263157894),
 (u'2013-10-13', 32.32514177693762),
 (u'2013-10-13', 26.246),
 (u'2013-10-13', 10.693069306930692)]

Now from the above RDD, I would like to calculate an average of the 
VALUES for each KEY.

I can do so as shown here, which does work:
*
***countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of 
countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
 rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerator 
(i.e. the SUM).
 rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # 
Divide each SUM by it's denominator (i.e. COUNT)

 print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

But I wonder if the above semantics/approach is the optimal one; or 
whether perhaps there is a single API call

that handles common use case.

Improvement thoughts welcome. =:)

Thank you,
nmv


Re: A problem of using spark streaming to capture network packets

2015-04-28 Thread Dean Wampler
Are the tasks on the slaves also running as root? If not, that might
explain the problem.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Apr 28, 2015 at 8:30 PM, Lin Hao Xu xulin...@cn.ibm.com wrote:

 1. The full command line is written in a shell script:

 LIB=/home/spark/.m2/repository

 /opt/spark/bin/spark-submit \
 --class spark.pcap.run.TestPcapSpark \
 --jars
 $LIB/org/pcap4j/pcap4j-core/1.4.0/pcap4j-core-1.4.0.jar,$LIB/org/pcap4j/pcap4j-packetfactory-static/1.4.0/pcap4j-packetfactory-static-1.4.0.jar,$LIB/
 org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar,$LIB/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar,$LIB/net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar
 \
 /home/spark/napa/napa.jar

 2. And we run this script with *sudo*, if you do not use sudo, then you
 cannot access network interface.

 3. We also tested ListPcapNetworkInterface nifs = Pcaps.findAllDevs() in
 a standard Java program, it really worked like a champion.

 Best regards,

 Lin Hao XU
 IBM Research China
 Email: xulin...@cn.ibm.com
 My Flickr: http://www.flickr.com/photos/xulinhao/sets

 [image: Inactive hide details for Dean Wampler ---2015/04/28
 20:07:54---It's probably not your code. What's the full command line you 
 u]Dean
 Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the
 full command line you use to submit the job?

 From: Dean Wampler deanwamp...@gmail.com
 To: Hai Shan Wu/China/IBM@IBMCN
 Cc: user user@spark.apache.org, Lin Hao Xu/China/IBM@IBMCN
 Date: 2015/04/28 20:07
 Subject: Re: A problem of using spark streaming to capture network packets
 --



 It's probably not your code.

 What's the full command line you use to submit the job?

 Are you sure the job on the cluster has access to the network interface?
 Can you test the receiver by itself without Spark? For example, does this
 line work as expected:

 ListPcapNetworkInterface nifs = Pcaps.findAllDevs();

 dean

 Dean Wampler, Ph.D.
 Author: *Programming Scala, 2nd Edition*
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 *Typesafe* http://typesafe.com/
 *@deanwampler* http://twitter.com/deanwampler
 *http://polyglotprogramming.com* http://polyglotprogramming.com/

 On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu *wuh...@cn.ibm.com*
 wuh...@cn.ibm.com wrote:

Hi Everyone

We use pcap4j to capture network packets and then use spark streaming
to analyze captured packets. However, we met a strange problem.

If we run our application on spark locally (for example, spark-submit
--master local[2]), then the program runs successfully.

If we run our application on spark standalone cluster, then the
program will tell us that NO NIFs found.

I also attach two test files for clarification.

So anyone can help on this? Thanks in advance!


 * (See attached file: PcapReceiver.java)(See attached file:
TestPcapSpark.java)*

Best regards,

- Haishan

Haishan Wu (吴海珊)

IBM Research - China
Tel: 86-10-58748508
Fax: 86-10-58748330
Email: *wuh...@cn.ibm.com* wuh...@cn.ibm.com
Lotus Notes: Hai Shan Wu/China/IBM


-
To unsubscribe, e-mail: *user-unsubscr...@spark.apache.org*
user-unsubscr...@spark.apache.org
For additional commands, e-mail: *user-h...@spark.apache.org*
user-h...@spark.apache.org





Question about Memory Used and VCores Used

2015-04-28 Thread bit1...@163.com
Hi,guys,
I have the following computation with 3 workers:
spark-sql --master yarn --executor-memory 3g --executor-cores 2 --driver-memory 
1g -e 'select count(*) from table'

The resources used are shown as below on the UI:
I don't understand why the memory used is 15GB and vcores used is 5. I think 
the memory used should be executor-memory*numOfWorkers=3G*3=9G, and the Vcores 
used shoulde be executor-cores*numOfWorkers=6

Can you please explain the result?Thanks.





bit1...@163.com


Re: HBase HTable constructor hangs

2015-04-28 Thread tridib
I am exactly having same issue. I am running hbase and spark in docker
container.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HBase-HTable-constructor-hangs-tp4926p22696.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLLib SVMWithSGD is failing for large dataset

2015-04-28 Thread ai he
Hi Sarath,

It might be questionable to set num-executors as 64 if you only has 8
nodes. Do you use any action like collect which will overwhelm the
driver since you have a large dataset?

Thanks

On Tue, Apr 28, 2015 at 10:50 AM, sarath sarathkrishn...@gmail.com wrote:

 I am trying to train a large dataset consisting of 8 million data points and
 20 million features using SVMWithSGD. But it is failing after running for
 some time. I tried increasing num-partitions, driver-memory,
 executor-memory, driver-max-resultSize. Also I tried by reducing the size of
 dataset from 8 million to 25K (keeping number of features same 20 M). But
 after using the entire 64GB driver memory for 20 to 30 min it failed.

 I'm using a cluster of 8 nodes (each with 8 cores and 64G RAM).
 executor-memory - 60G
 driver-memory - 60G
 num-executors - 64
 And other default settings

 This is the error log :

 15/04/20 11:51:09 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/04/20 11:51:29 WARN BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeSystemBLAS
 15/04/20 11:51:29 WARN BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeRefBLAS
 15/04/20 11:56:11 WARN TransportChannelHandler: Exception in connection from
 xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 ...
 15/04/20 11:56:11 ERROR TransportResponseHandler: Still have 7 requests
 outstanding when connection from xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 is
 closed
 15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting block
 fetches
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 ...
 15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting block
 fetches
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 ...
 15/04/20 11:56:12 ERROR RetryingBlockFetcher: Exception while beginning
 fetch of 1 outstanding blocks
 java.io.IOException: Failed to connect to
 xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
 at
 org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:290)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at 
 org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:91)
 at
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
 at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.net.ConnectException: Connection refused:
 xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
 at 

RE: Why Spark is much faster than Hadoop MapReduce even on disk

2015-04-28 Thread Mohammed Guller
One reason Spark on disk is faster than MapReduce is Spark’s advanced Directed 
Acyclic Graph (DAG) engine. MapReduce will require a complex job to be split 
into multiple Map-Reduce jobs, with disk I/O at the end of each job and 
beginning of a new job. With Spark, you may be able to express the same job 
with fewer number of stages, invoking fewer disk I/O. Disk I/O is an expensive 
operation, so fewer disk I/O operation translates to better performance.

Mohammed

From: Ilya Ganelin [mailto:ilgan...@gmail.com]
Sent: Monday, April 27, 2015 7:55 PM
To: bit1...@163.com; user
Subject: Re: Why Spark is much faster than Hadoop MapReduce even on disk

I believe the typical answer is that Spark is actually a bit slower.
On Mon, Apr 27, 2015 at 7:34 PM bit1...@163.commailto:bit1...@163.com 
bit1...@163.commailto:bit1...@163.com wrote:
Hi,

I am frequently asked why spark is also much faster than Hadoop MapReduce on 
disk (without the use of memory cache). I have no convencing answer for this 
question, could you guys elaborate on this? Thanks!





Re: Why Spark is much faster than Hadoop MapReduce even on disk

2015-04-28 Thread Koert Kuipers
our experience is that unless you can benefit from spark features such as
co-partitioning that allow for more efficient execution that spark is
slightly slower for disk to disk.
On Apr 27, 2015 10:34 PM, bit1...@163.com bit1...@163.com wrote:

 Hi,

 I am frequently asked why spark is also much faster than Hadoop MapReduce
 on disk (without the use of memory cache). I have no convencing answer for
 this question, could you guys elaborate on this? Thanks!

 --




Re: A problem of using spark streaming to capture network packets

2015-04-28 Thread Lin Hao Xu
1. The full command line is written in a shell script:

LIB=/home/spark/.m2/repository

/opt/spark/bin/spark-submit \
--class spark.pcap.run.TestPcapSpark \
--jars
$LIB/org/pcap4j/pcap4j-core/1.4.0/pcap4j-core-1.4.0.jar,$LIB/org/pcap4j/pcap4j-packetfactory-static/1.4.0/pcap4j-packetfactory-static-1.4.0.jar,$LIB/
org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar,$LIB/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar,$LIB/net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar
 \
/home/spark/napa/napa.jar

2. And we run this script with sudo, if you do not use sudo, then you
cannot access network interface.

3. We also tested ListPcapNetworkInterface nifs = Pcaps.findAllDevs() in
a standard Java program, it really worked like a champion.

Best regards,

Lin Hao XU
IBM Research China
Email: xulin...@cn.ibm.com
My Flickr: http://www.flickr.com/photos/xulinhao/sets



From:   Dean Wampler deanwamp...@gmail.com
To: Hai Shan Wu/China/IBM@IBMCN
Cc: user user@spark.apache.org, Lin Hao Xu/China/IBM@IBMCN
Date:   2015/04/28 20:07
Subject:Re: A problem of using spark streaming to capture network
packets



It's probably not your code.

What's the full command line you use to submit the job?

Are you sure the job on the cluster has access to the network interface?
Can you test the receiver by itself without Spark? For example, does this
line work as expected:

ListPcapNetworkInterface nifs = Pcaps.findAllDevs();

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition (O'Reilly)
Typesafe
@deanwampler
http://polyglotprogramming.com

On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu wuh...@cn.ibm.com wrote:
  Hi Everyone

  We use pcap4j to capture network packets and then use spark streaming to
  analyze captured packets. However, we met a strange problem.

  If we run our application on spark locally (for example, spark-submit
  --master local[2]), then the program runs successfully.

  If we run our application on spark standalone cluster, then the program
  will tell us that NO NIFs found.

  I also attach two test files for clarification.

  So anyone can help on this? Thanks in advance!


  (See attached file: PcapReceiver.java)(See attached file:
  TestPcapSpark.java)

  Best regards,

  - Haishan

  Haishan Wu (吴海珊)

  IBM Research - China
  Tel: 86-10-58748508
  Fax: 86-10-58748330
  Email: wuh...@cn.ibm.com
  Lotus Notes: Hai Shan Wu/China/IBM


  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org


Re: Spark streaming - textFileStream/fileStream - Get file name

2015-04-28 Thread Saisai Shao
I think currently there's no API in Spark Streaming you can use to get the
file names for file input streams. Actually it is not trivial to support
this, may be you could file a JIRA with wishes you want the community to
support, so anyone who is interested can take a crack on this.

Thanks
Jerry


2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net:

 Hi Forum,

 Using spark streaming and listening to the files in HDFS using
 textFileStream/fileStream methods, how do we get the fileNames which are
 read by these methods?

 I used textFileStream which has file contents in JavaDStream and I got no
 success with fileStream as it is throwing me a compilation error with spark
 version 1.3.1.

 Can someone please tell me if we have an API function or any other way to
 get the file names that these streaming methods read?

 Thanks
 Lokesh



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: HBase HTable constructor hangs

2015-04-28 Thread Ted Yu
Can you give us more information ?
Such as hbase release, Spark release.

If you can pastebin jstack of the hanging HTable process, that would help.

BTW I used http://search-hadoop.com/?q=spark+HBase+HTable+constructor+hangs
and saw a very old thread with this subject.

Cheers

On Tue, Apr 28, 2015 at 7:12 PM, tridib tridib.sama...@live.com wrote:

 I am exactly having same issue. I am running hbase and spark in docker
 container.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/HBase-HTable-constructor-hangs-tp4926p22696.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: A problem of using spark streaming to capture network packets

2015-04-28 Thread Lin Hao Xu
btw, from spark web ui, the acl is marked with root

Best regards,

Lin Hao XU
IBM Research China
Email: xulin...@cn.ibm.com
My Flickr: http://www.flickr.com/photos/xulinhao/sets



From:   Dean Wampler deanwamp...@gmail.com
To: Lin Hao Xu/China/IBM@IBMCN
Cc: Hai Shan Wu/China/IBM@IBMCN, user user@spark.apache.org
Date:   2015/04/29 09:40
Subject:Re: A problem of using spark streaming to capture network
packets



Are the tasks on the slaves also running as root? If not, that might
explain the problem.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition (O'Reilly)
Typesafe
@deanwampler
http://polyglotprogramming.com

On Tue, Apr 28, 2015 at 8:30 PM, Lin Hao Xu xulin...@cn.ibm.com wrote:
  1. The full command line is written in a shell script:

  LIB=/home/spark/.m2/repository

  /opt/spark/bin/spark-submit \
  --class spark.pcap.run.TestPcapSpark \
  --jars
  
$LIB/org/pcap4j/pcap4j-core/1.4.0/pcap4j-core-1.4.0.jar,$LIB/org/pcap4j/pcap4j-packetfactory-static/1.4.0/pcap4j-packetfactory-static-1.4.0.jar,$LIB/

  
org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar,$LIB/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar,$LIB/net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar
 \
  /home/spark/napa/napa.jar

  2. And we run this script with sudo, if you do not use sudo, then you
  cannot access network interface.

  3. We also tested ListPcapNetworkInterface nifs = Pcaps.findAllDevs()
  in a standard Java program, it really worked like a champion.

  Best regards,

  Lin Hao XU
  IBM Research China
  Email: xulin...@cn.ibm.com
  My Flickr: http://www.flickr.com/photos/xulinhao/sets

  Inactive hide details for Dean Wampler ---2015/04/28 20:07:54---It's
  probably not your code. What's the full command line you uDean Wampler
  ---2015/04/28 20:07:54---It's probably not your code. What's the full
  command line you use to submit the job?

  From: Dean Wampler deanwamp...@gmail.com
  To: Hai Shan Wu/China/IBM@IBMCN
  Cc: user user@spark.apache.org, Lin Hao Xu/China/IBM@IBMCN
  Date: 2015/04/28 20:07
  Subject: Re: A problem of using spark streaming to capture network
  packets




  It's probably not your code.

  What's the full command line you use to submit the job?

  Are you sure the job on the cluster has access to the network interface?
  Can you test the receiver by itself without Spark? For example, does this
  line work as expected:

  ListPcapNetworkInterface nifs = Pcaps.findAllDevs();

  dean

  Dean Wampler, Ph.D.
  Author: Programming Scala, 2nd Edition (O'Reilly)
  Typesafe
  @deanwampler
  http://polyglotprogramming.com

  On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu wuh...@cn.ibm.com wrote:
Hi Everyone

We use pcap4j to capture network packets and then use spark
streaming to analyze captured packets. However, we met a strange
problem.

If we run our application on spark locally (for example,
spark-submit --master local[2]), then the program runs
successfully.

If we run our application on spark standalone cluster, then the
program will tell us that NO NIFs found.

I also attach two test files for clarification.

So anyone can help on this? Thanks in advance!


(See attached file: PcapReceiver.java)(See attached file:
TestPcapSpark.java)

Best regards,

- Haishan

Haishan Wu (吴海珊)

IBM Research - China
Tel: 86-10-58748508
Fax: 86-10-58748330
Email: wuh...@cn.ibm.com
Lotus Notes: Hai Shan Wu/China/IBM


-

To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org







Re: Serialization error

2015-04-28 Thread madhvi

Thankyou Deepak.It worked.
Madhvi
On Tuesday 28 April 2015 01:39 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote:


val conf = new SparkConf()

  .setAppName(detail)

  .set(spark.serializer, 
org.apache.spark.serializer.KryoSerializer)


  .set(spark.kryoserializer.buffer.mb, 
arguments.get(buffersize).get)


  .set(spark.kryoserializer.buffer.max.mb, 
arguments.get(maxbuffersize).get)


  .set(spark.driver.maxResultSize, 
arguments.get(maxResultSize).get)


.registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key]))


Can you try this ?


On Tue, Apr 28, 2015 at 11:11 AM, madhvi madhvi.gu...@orkash.com 
mailto:madhvi.gu...@orkash.com wrote:


Hi,

While connecting to accumulo through spark by making sparkRDD I am
getting the following error:
 object not serializable (class: org.apache.accumulo.core.data.Key)

This is due to the 'key' class of accumulo which does not
implement serializable interface.How it can be solved and accumulo
can be used with spark

Thanks
Madhvi

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org




--
Deepak





Re: 1.3.1: Persisting RDD in parquet - Conflicting partition column names

2015-04-28 Thread ayan guha
Can you show your code please?
On 28 Apr 2015 13:20, sranga sra...@gmail.com wrote:

 Hi

 I am getting the following error when persisting an RDD in parquet format
 to
 an S3 location. This is code that was working in the 1.2 version. The
 version that it is failing to work is 1.3.1.
 Any help is appreciated.

 Caused by: java.lang.AssertionError: assertion failed: Conflicting
 partition
 column names detected:
 ArrayBuffer(batch_id)
 ArrayBuffer()
 at scala.Predef$.assert(Predef.scala:179)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$.resolvePartitions(newParquet.scala:933)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$.parsePartitions(newParquet.scala:851)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$7.apply(newParquet.scala:311)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$7.apply(newParquet.scala:303)
 at scala.Option.getOrElse(Option.scala:120)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:303)
 at
 org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:692)
 at

 org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:129)
 at
 org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)
 at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)
 at
 org.apache.spark.sql.DataFrame.saveAsParquetFile(DataFrame.scala:995)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/1-3-1-Persisting-RDD-in-parquet-Conflicting-partition-column-names-tp22678.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to add jars to standalone pyspark program

2015-04-28 Thread Fabian Böhnlein
Can you specifiy 'running via PyCharm'. how are you executing the 
script, with spark-submit?


In PySpark I guess you used --jars databricks-csv.jar. With spark-submit 
you might need the additional --driver-class-path databricks-csv.jar.


Both parameters cannot be set via the SparkConf object.

Cheers,
Fabian

On 04/28/2015 10:06 AM, mj wrote:

Hi,

I'm trying to figure out how to use a third party jar inside a python
program which I'm running via PyCharm in order to debug it. I am normally
able to run spark code in python such as this:

 spark_conf = SparkConf().setMaster('local').setAppName('test')
 sc = SparkContext(conf=spark_conf)
 cars = sc.textFile('c:/cars.csv')
 print cars.count()
 sc.stop()

The code I'm trying to run is below - it uses the databricks spark csv jar.
I can get it working fine in pyspark shell using the packages argument, but
I can't figure out how to get it to work via PyCharm.

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext

spark_conf = SparkConf().setMaster('local').setAppName('test')
sc = SparkContext(conf=spark_conf)

sqlContext = SQLContext(sc)
df = sqlContext.load(source=com.databricks.spark.csv, header=true, path
= c:/cars.csv, delimiter='\t')
df.select(year)

The error message I'm getting is:
py4j.protocol.Py4JJavaError: An error occurred while calling o20.load.
: java.lang.RuntimeException: Failed to load class for data source:
com.databricks.spark.csv
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:194)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:205)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:685)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)


I presume I need to set the spark classpath somehow but I'm not sure of the
right way to do it. Any advice/guidance would be appreciated.

Thanks,

Mark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jars-to-standalone-pyspark-program-tp22685.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: A problem of using spark streaming to capture network packets

2015-04-28 Thread Dean Wampler
It's probably not your code.

What's the full command line you use to submit the job?

Are you sure the job on the cluster has access to the network interface?
Can you test the receiver by itself without Spark? For example, does this
line work as expected:

ListPcapNetworkInterface nifs = Pcaps.findAllDevs();

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu wuh...@cn.ibm.com wrote:

 Hi Everyone

 We use pcap4j to capture network packets and then use spark streaming to
 analyze captured packets. However, we met a strange problem.

 If we run our application on spark locally (for example, spark-submit
 --master local[2]), then the program runs successfully.

 If we run our application on spark standalone cluster, then the program
 will tell us that NO NIFs found.

 I also attach two test files for clarification.

 So anyone can help on this? Thanks in advance!


 *(See attached file: PcapReceiver.java)**(See attached file:
 TestPcapSpark.java)*

 Best regards,

 - Haishan

 Haishan Wu (吴海珊)

 IBM Research - China
 Tel: 86-10-58748508
 Fax: 86-10-58748330
 Email: wuh...@cn.ibm.com
 Lotus Notes: Hai Shan Wu/China/IBM


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: submitting to multiple masters

2015-04-28 Thread michal.klo...@gmail.com
According to the docs it should go like this:
spark://host1:port1,host2:port2


https://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper

Thanks
M


 On Apr 28, 2015, at 8:13 AM, James King jakwebin...@gmail.com wrote:
 
 I have multiple masters running and I'm trying to submit an application using
 
 spark-1.3.0-bin-hadoop2.4/bin/spark-submit  
 
 with this config (i.e. a comma separated list of master urls)
 
   --master spark://master01:7077,spark://master02:7077
 
 
 But getting this exception 
 
 Exception in thread main org.apache.spark.SparkException: Invalid master 
 URL: spark://spark://master02:7077
 
 
 What am I doing wrong?
 
 Many Thanks
 jk


Spark partitioning question

2015-04-28 Thread Marius Danciu
Hello all,

I have the following Spark (pseudo)code:

rdd = mapPartitionsWithIndex(...)
.mapPartitionsToPair(...)
.groupByKey()
.sortByKey(comparator)
.partitionBy(myPartitioner)
.mapPartitionsWithIndex(...)
.mapPartitionsToPair( *f* )

The input data has 2 input splits (yarn 2.6.0).
myPartitioner partitions all the records on partition 0, which is correct,
so the intuition is that f provided to the last transformation
(mapPartitionsToPair) would run sequentially inside a single yarn
container. However from yarn logs I do see that both yarn containers are
processing records from the same partition ... and *sometimes*  the over
all job fails (due to the code in f which expects a certain order of
records) and yarn container 1 receives the records as expected, whereas
yarn container 2 receives a subset of records ... for a reason I cannot
explain and f fails.

The overall behavior of this job is that sometimes it succeeds and
sometimes it fails ... apparently due to inconsistent propagation of sorted
records to yarn containers.


If any of this makes any sense to you, please let me know what I am missing.



Best,
Marius


Re: How to add jars to standalone pyspark program

2015-04-28 Thread ayan guha
Its a windows thing. Please escape front slash in string. Basically it is
not able to find the file
On 28 Apr 2015 22:09, Fabian Böhnlein fabian.boehnl...@gmail.com wrote:

 Can you specifiy 'running via PyCharm'. how are you executing the script,
 with spark-submit?

 In PySpark I guess you used --jars databricks-csv.jar. With spark-submit
 you might need the additional --driver-class-path databricks-csv.jar.

 Both parameters cannot be set via the SparkConf object.

 Cheers,
 Fabian

 On 04/28/2015 10:06 AM, mj wrote:

 Hi,

 I'm trying to figure out how to use a third party jar inside a python
 program which I'm running via PyCharm in order to debug it. I am normally
 able to run spark code in python such as this:

  spark_conf = SparkConf().setMaster('local').setAppName('test')
  sc = SparkContext(conf=spark_conf)
  cars = sc.textFile('c:/cars.csv')
  print cars.count()
  sc.stop()

 The code I'm trying to run is below - it uses the databricks spark csv
 jar.
 I can get it working fine in pyspark shell using the packages argument,
 but
 I can't figure out how to get it to work via PyCharm.

 from pyspark.sql import SQLContext
 from pyspark import SparkConf, SparkContext

 spark_conf = SparkConf().setMaster('local').setAppName('test')
 sc = SparkContext(conf=spark_conf)

 sqlContext = SQLContext(sc)
 df = sqlContext.load(source=com.databricks.spark.csv, header=true,
 path
 = c:/cars.csv, delimiter='\t')
 df.select(year)

 The error message I'm getting is:
 py4j.protocol.Py4JJavaError: An error occurred while calling o20.load.
 : java.lang.RuntimeException: Failed to load class for data source:
 com.databricks.spark.csv
 at scala.sys.package$.error(package.scala:27)
 at

 org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:194)
 at
 org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:205)
 at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697)
 at org.apache.spark.sql.SQLContext.load(SQLContext.scala:685)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:483)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
 at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at py4j.Gateway.invoke(Gateway.java:259)
 at
 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Thread.java:745)


 I presume I need to set the spark classpath somehow but I'm not sure of
 the
 right way to do it. Any advice/guidance would be appreciated.

 Thanks,

 Mark.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jars-to-standalone-pyspark-program-tp22685.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: StandardScaler failing with OOM errors in PySpark

2015-04-28 Thread Rok Roskar
That's exactly what I'm saying -- I specify the memory options using spark
options, but this is not reflected in how the JVM is created. No matter
which memory settings I specify, the JVM for the driver is always made with
512Mb of memory. So I'm not sure if this is a feature or a bug?

rok

On Mon, Apr 27, 2015 at 6:54 PM, Xiangrui Meng men...@gmail.com wrote:

 You might need to specify driver memory in spark-submit instead of
 passing JVM options. spark-submit is designed to handle different
 deployments correctly. -Xiangrui

 On Thu, Apr 23, 2015 at 4:58 AM, Rok Roskar rokros...@gmail.com wrote:
  ok yes, I think I have narrowed it down to being a problem with driver
  memory settings. It looks like the application master/driver is not being
  launched with the settings specified:
 
  For the driver process on the main node I see -XX:MaxPermSize=128m
 -Xms512m
  -Xmx512m as options used to start the JVM, even though I specified
 
  'spark.yarn.am.memory', '5g'
  'spark.yarn.am.memoryOverhead', '2000'
 
  The info shows that these options were read:
 
  15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with
 7120 MB
  memory including 2000 MB overhead
 
  Is there some reason why these options are being ignored and instead
  starting the driver with just 512Mb of heap?
 
  On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar rokros...@gmail.com wrote:
 
  the feature dimension is 800k.
 
  yes, I believe the driver memory is likely the problem since it doesn't
  crash until the very last part of the tree aggregation.
 
  I'm running it via pyspark through YARN -- I have to run in client mode
 so
  I can't set spark.driver.memory -- I've tried setting the
  spark.yarn.am.memory and overhead parameters but it doesn't seem to
 have an
  effect.
 
  Thanks,
 
  Rok
 
  On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote:
 
   What is the feature dimension? Did you set the driver memory?
 -Xiangrui
  
   On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote:
   I'm trying to use the StandardScaler in pyspark on a relatively small
   (a few
   hundred Mb) dataset of sparse vectors with 800k features. The fit
   method of
   StandardScaler crashes with Java heap space or Direct buffer memory
   errors.
   There should be plenty of memory around -- 10 executors with 2 cores
   each
   and 8 Gb per core. I'm giving the executors 9g of memory and have
 also
   tried
   lots of overhead (3g), thinking it might be the array creation in the
   aggregators that's causing issues.
  
   The bizarre thing is that this isn't always reproducible -- sometimes
   it
   actually works without problems. Should I be setting up executors
   differently?
  
   Thanks,
  
   Rok
  
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
   Sent from the Apache Spark User List mailing list archive at
   Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 



Re: spark-defaults.conf

2015-04-28 Thread James King
So no takers regarding why spark-defaults.conf is not being picked up.

Here is another one:

If Zookeeper is configured in Spark why do we need to start a slave like
this:

spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh 1 spark://somemaster:7077

i.e. why do we need to specify the master url explicitly

Shouldn't Spark just consult with ZK and us the active master?

Or is ZK only used during failure?


On Mon, Apr 27, 2015 at 1:53 PM, James King jakwebin...@gmail.com wrote:

 Thanks.

 I've set SPARK_HOME and SPARK_CONF_DIR appropriately in .bash_profile

 But when I start worker like this

 spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh

 I still get

 failed to launch org.apache.spark.deploy.worker.Worker:
  Default is conf/spark-defaults.conf.
   15/04/27 11:51:33 DEBUG Utils: Shutdown hook called





 On Mon, Apr 27, 2015 at 1:15 PM, Zoltán Zvara zoltan.zv...@gmail.com
 wrote:

 You should distribute your configuration file to workers and set the
 appropriate environment variables, like HADOOP_HOME, SPARK_HOME,
 HADOOP_CONF_DIR, SPARK_CONF_DIR.

 On Mon, Apr 27, 2015 at 12:56 PM James King jakwebin...@gmail.com
 wrote:

 I renamed spark-defaults.conf.template to spark-defaults.conf
 and invoked

 spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh

 But I still get

 failed to launch org.apache.spark.deploy.worker.Worker:
 --properties-file FILE   Path to a custom Spark properties file.
  Default is conf/spark-defaults.conf.

 But I'm thinking it should pick up the default spark-defaults.conf from
 conf dir

 Am I expecting or doing something wrong?

 Regards
 jk






submitting to multiple masters

2015-04-28 Thread James King
I have multiple masters running and I'm trying to submit an application
using

spark-1.3.0-bin-hadoop2.4/bin/spark-submit

with this config (i.e. a comma separated list of master urls)

  --master spark://master01:7077,spark://master02:7077


But getting this exception

Exception in thread main org.apache.spark.SparkException: Invalid master
URL: spark://spark://master02:7077


What am I doing wrong?

Many Thanks
jk


Re: MLLib SVMWithSGD is failing for large dataset

2015-04-28 Thread sarathkrishn...@gmail.com
Hi,

I'm just calling the standard SVMWithSGD implementation of Spark's MLLib.
I'm not using any method like collect.

Thanks,
Sarath

On Tue, Apr 28, 2015 at 4:35 PM, ai he heai0...@gmail.com wrote:

 Hi Sarath,

 It might be questionable to set num-executors as 64 if you only has 8
 nodes. Do you use any action like collect which will overwhelm the
 driver since you have a large dataset?

 Thanks

 On Tue, Apr 28, 2015 at 10:50 AM, sarath sarathkrishn...@gmail.com
 wrote:
 
  I am trying to train a large dataset consisting of 8 million data points
 and
  20 million features using SVMWithSGD. But it is failing after running for
  some time. I tried increasing num-partitions, driver-memory,
  executor-memory, driver-max-resultSize. Also I tried by reducing the
 size of
  dataset from 8 million to 25K (keeping number of features same 20 M). But
  after using the entire 64GB driver memory for 20 to 30 min it failed.
 
  I'm using a cluster of 8 nodes (each with 8 cores and 64G RAM).
  executor-memory - 60G
  driver-memory - 60G
  num-executors - 64
  And other default settings
 
  This is the error log :
 
  15/04/20 11:51:09 WARN NativeCodeLoader: Unable to load native-hadoop
  library for your platform... using builtin-java classes where applicable
  15/04/20 11:51:29 WARN BLAS: Failed to load implementation from:
  com.github.fommil.netlib.NativeSystemBLAS
  15/04/20 11:51:29 WARN BLAS: Failed to load implementation from:
  com.github.fommil.netlib.NativeRefBLAS
  15/04/20 11:56:11 WARN TransportChannelHandler: Exception in connection
 from
  xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
  java.io.IOException: Connection reset by peer
  at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
  at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
  ...
  15/04/20 11:56:11 ERROR TransportResponseHandler: Still have 7 requests
  outstanding when connection from xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
 is
  closed
  15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting
 block
  fetches
  java.io.IOException: Connection reset by peer
  at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
  at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
  ...
  15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting
 block
  fetches
  java.io.IOException: Connection reset by peer
  at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
  at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
  ...
  15/04/20 11:56:12 ERROR RetryingBlockFetcher: Exception while beginning
  fetch of 1 outstanding blocks
  java.io.IOException: Failed to connect to
  xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
  at
 
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
  at
 
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
  at
 
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
  at
 
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
  at
 
 org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
  at
 
 org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
  at
 
 org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
  at
 
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:290)
  at
 
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  at
 
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
  at
 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
  at
 org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:91)
  at
 
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
  at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
  at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
  at
 
 

Re: A problem of using spark streaming to capture network packets

2015-04-28 Thread Lin Hao Xu
Actually, to simplify this problem, we run our program on a single machine
with 4 slave workers. Since on a single machine, I think all slave workers
are ran with root privilege.

BTW, if we have a cluster, how to make sure slaves on remote machines run
program as root?

Best regards,

Lin Hao XU
IBM Research China
Email: xulin...@cn.ibm.com
My Flickr: http://www.flickr.com/photos/xulinhao/sets



From:   Dean Wampler deanwamp...@gmail.com
To: Lin Hao Xu/China/IBM@IBMCN
Cc: Hai Shan Wu/China/IBM@IBMCN, user user@spark.apache.org
Date:   2015/04/29 09:40
Subject:Re: A problem of using spark streaming to capture network
packets



Are the tasks on the slaves also running as root? If not, that might
explain the problem.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition (O'Reilly)
Typesafe
@deanwampler
http://polyglotprogramming.com

On Tue, Apr 28, 2015 at 8:30 PM, Lin Hao Xu xulin...@cn.ibm.com wrote:
  1. The full command line is written in a shell script:

  LIB=/home/spark/.m2/repository

  /opt/spark/bin/spark-submit \
  --class spark.pcap.run.TestPcapSpark \
  --jars
  
$LIB/org/pcap4j/pcap4j-core/1.4.0/pcap4j-core-1.4.0.jar,$LIB/org/pcap4j/pcap4j-packetfactory-static/1.4.0/pcap4j-packetfactory-static-1.4.0.jar,$LIB/

  
org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar,$LIB/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar,$LIB/net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar
 \
  /home/spark/napa/napa.jar

  2. And we run this script with sudo, if you do not use sudo, then you
  cannot access network interface.

  3. We also tested ListPcapNetworkInterface nifs = Pcaps.findAllDevs()
  in a standard Java program, it really worked like a champion.

  Best regards,

  Lin Hao XU
  IBM Research China
  Email: xulin...@cn.ibm.com
  My Flickr: http://www.flickr.com/photos/xulinhao/sets

  Inactive hide details for Dean Wampler ---2015/04/28 20:07:54---It's
  probably not your code. What's the full command line you uDean Wampler
  ---2015/04/28 20:07:54---It's probably not your code. What's the full
  command line you use to submit the job?

  From: Dean Wampler deanwamp...@gmail.com
  To: Hai Shan Wu/China/IBM@IBMCN
  Cc: user user@spark.apache.org, Lin Hao Xu/China/IBM@IBMCN
  Date: 2015/04/28 20:07
  Subject: Re: A problem of using spark streaming to capture network
  packets




  It's probably not your code.

  What's the full command line you use to submit the job?

  Are you sure the job on the cluster has access to the network interface?
  Can you test the receiver by itself without Spark? For example, does this
  line work as expected:

  ListPcapNetworkInterface nifs = Pcaps.findAllDevs();

  dean

  Dean Wampler, Ph.D.
  Author: Programming Scala, 2nd Edition (O'Reilly)
  Typesafe
  @deanwampler
  http://polyglotprogramming.com

  On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu wuh...@cn.ibm.com wrote:
Hi Everyone

We use pcap4j to capture network packets and then use spark
streaming to analyze captured packets. However, we met a strange
problem.

If we run our application on spark locally (for example,
spark-submit --master local[2]), then the program runs
successfully.

If we run our application on spark standalone cluster, then the
program will tell us that NO NIFs found.

I also attach two test files for clarification.

So anyone can help on this? Thanks in advance!


(See attached file: PcapReceiver.java)(See attached file:
TestPcapSpark.java)

Best regards,

- Haishan

Haishan Wu (吴海珊)

IBM Research - China
Tel: 86-10-58748508
Fax: 86-10-58748330
Email: wuh...@cn.ibm.com
Lotus Notes: Hai Shan Wu/China/IBM


-

To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org







Re: Re: Spark streaming - textFileStream/fileStream - Get file name

2015-04-28 Thread bit1...@163.com
Looks to me  that the same thing also applies to the SparkContext.textFile or 
SparkContext.wholeTextFile, there is no way in RDD to figure out the file 
information where the data in RDD is from 



bit1...@163.com
 
From: Saisai Shao
Date: 2015-04-29 10:10
To: lokeshkumar
CC: spark users
Subject: Re: Spark streaming - textFileStream/fileStream - Get file name
I think currently there's no API in Spark Streaming you can use to get the file 
names for file input streams. Actually it is not trivial to support this, may 
be you could file a JIRA with wishes you want the community to support, so 
anyone who is interested can take a crack on this.

Thanks
Jerry


2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net:
Hi Forum,

Using spark streaming and listening to the files in HDFS using
textFileStream/fileStream methods, how do we get the fileNames which are
read by these methods?

I used textFileStream which has file contents in JavaDStream and I got no
success with fileStream as it is throwing me a compilation error with spark
version 1.3.1.

Can someone please tell me if we have an API function or any other way to
get the file names that these streaming methods read?

Thanks
Lokesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Best practices on testing Spark jobs

2015-04-28 Thread Michal Michalski
Hi,

I have two questions regarding testing Spark jobs:

1. Is it possible to use Mockito for that purpose? I tried to use it, but
it looks like there are no interactions with mocks. I didn't dive into the
details of how Mockito works, but I guess it might be because of the
serialization and how Spark distributes tasks. I'm not sure about it though
and I'm looking for confirmation.

2. If not mockito, what's the alternative? What's the recommended way to
test Spark jobs? Should I manually create mocks by e.g. extending all the
classes I'd normally mock and changing the implementation of some methods?
I don't like this idea but I can't really see any other options now.


Kind regards,
Michał Michalski,
michal.michal...@boxever.com


Re: hive-thriftserver maven artifact

2015-04-28 Thread Ted Yu
Credit goes to Misha Chernetsov (see SPARK-4925)

FYI

On Tue, Apr 28, 2015 at 8:25 AM, Marco marco@gmail.com wrote:

 Thx Ted for the info !

 2015-04-27 23:51 GMT+02:00 Ted Yu yuzhih...@gmail.com:

 This is available for 1.3.1:

 http://mvnrepository.com/artifact/org.apache.spark/spark-hive-thriftserver_2.10

 FYI

 On Mon, Feb 16, 2015 at 7:24 AM, Marco marco@gmail.com wrote:

 Ok, so will it be only available for the next version (1.30)?

 2015-02-16 15:24 GMT+01:00 Ted Yu yuzhih...@gmail.com:

 I searched for 'spark-hive-thriftserver_2.10' on this page:
 http://mvnrepository.com/artifact/org.apache.spark

 Looks like it is not published.

 On Mon, Feb 16, 2015 at 5:44 AM, Marco marco@gmail.com wrote:

 Hi,

 I am referring to https://issues.apache.org/jira/browse/SPARK-4925
 (Hive Thriftserver Maven Artifact). Can somebody point me (URL) to the
 artifact in a public repository ? I have not found it @Maven Central.

 Thanks,
 Marco





 --
 Viele Grüße,
 Marco





 --
 Viele Grüße,
 Marco



Re: Best practices on testing Spark jobs

2015-04-28 Thread Sourav Chandra
Hi,

Can you give some tutorials/examples how to write test case based on the
mentioned framework?

Thanks,
Sourav

On Tue, Apr 28, 2015 at 9:22 PM, Silvio Fiorito 
silvio.fior...@granturing.com wrote:

  Sorry that’s correct, I was thinking you were maybe trying to mock
 certain aspects of Spark core to write your tests. This is a library to
 help write unit tests by managing the SparkContext and StreamingContext. So
 you can test your transformations as necessary. More importantly on the
 streaming side it really helps simplify running tests on batch outputs.

  If you’re having serialization issues you may need to look at using
 transient lazy initializers, to see if that helps?

   From: Michal Michalski
 Date: Tuesday, April 28, 2015 at 11:42 AM
 To: Silvio Fiorito
 Cc: user
 Subject: Re: Best practices on testing Spark jobs

   Thanks Silvio. I might be missing something, but it looks like this
 project is a kind of a framework for setting up Spark for a testing, but
 after taking a quick look at the code it doesn't seem like it's solving the
 problem with mocking which is my main concern now - am I wrong?

  Kind regards,
 Michał Michalski,
 michal.michal...@boxever.com

 On 28 April 2015 at 16:35, Silvio Fiorito silvio.fior...@granturing.com
 wrote:

   Hi Michal,

  Please try spark-testing-base by Holden. I’ve used it and it works well
 for unit testing batch and streaming jobs

  https://github.com/holdenk/spark-testing-base

  Thanks,
 Silvio

   From: Michal Michalski
 Date: Tuesday, April 28, 2015 at 11:32 AM
 To: user
 Subject: Best practices on testing Spark jobs

   Hi,

  I have two questions regarding testing Spark jobs:

  1. Is it possible to use Mockito for that purpose? I tried to use it,
 but it looks like there are no interactions with mocks. I didn't dive into
 the details of how Mockito works, but I guess it might be because of the
 serialization and how Spark distributes tasks. I'm not sure about it though
 and I'm looking for confirmation.

  2. If not mockito, what's the alternative? What's the recommended way
 to test Spark jobs? Should I manually create mocks by e.g. extending all
 the classes I'd normally mock and changing the implementation of some
 methods? I don't like this idea but I can't really see any other options
 now.


  Kind regards,
 Michał Michalski,
 michal.michal...@boxever.com





-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: Best practices on testing Spark jobs

2015-04-28 Thread Silvio Fiorito
Sorry that’s correct, I was thinking you were maybe trying to mock certain 
aspects of Spark core to write your tests. This is a library to help write unit 
tests by managing the SparkContext and StreamingContext. So you can test your 
transformations as necessary. More importantly on the streaming side it really 
helps simplify running tests on batch outputs.

If you’re having serialization issues you may need to look at using transient 
lazy initializers, to see if that helps?

From: Michal Michalski
Date: Tuesday, April 28, 2015 at 11:42 AM
To: Silvio Fiorito
Cc: user
Subject: Re: Best practices on testing Spark jobs

Thanks Silvio. I might be missing something, but it looks like this project is 
a kind of a framework for setting up Spark for a testing, but after taking a 
quick look at the code it doesn't seem like it's solving the problem with 
mocking which is my main concern now - am I wrong?

Kind regards,
Michał Michalski,
michal.michal...@boxever.commailto:michal.michal...@boxever.com

On 28 April 2015 at 16:35, Silvio Fiorito 
silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote:
Hi Michal,

Please try spark-testing-base by Holden. I’ve used it and it works well for 
unit testing batch and streaming jobs

https://github.com/holdenk/spark-testing-base

Thanks,
Silvio

From: Michal Michalski
Date: Tuesday, April 28, 2015 at 11:32 AM
To: user
Subject: Best practices on testing Spark jobs

Hi,

I have two questions regarding testing Spark jobs:

1. Is it possible to use Mockito for that purpose? I tried to use it, but it 
looks like there are no interactions with mocks. I didn't dive into the details 
of how Mockito works, but I guess it might be because of the serialization and 
how Spark distributes tasks. I'm not sure about it though and I'm looking for 
confirmation.

2. If not mockito, what's the alternative? What's the recommended way to test 
Spark jobs? Should I manually create mocks by e.g. extending all the classes 
I'd normally mock and changing the implementation of some methods? I don't like 
this idea but I can't really see any other options now.


Kind regards,
Michał Michalski,
michal.michal...@boxever.commailto:michal.michal...@boxever.com



Re: Spark partitioning question

2015-04-28 Thread Silvio Fiorito
So the other issue could due to the fact that using mapPartitions after the 
partitionBy, you essentially lose the partitioning of the keys since Spark 
assumes the keys were altered in the map phase. So really the partitionBy gets 
lost after the mapPartitions, that’s why you need to do it again.


From: Marius Danciu
Date: Tuesday, April 28, 2015 at 9:53 AM
To: Silvio Fiorito, user
Subject: Re: Spark partitioning question

Thank you Silvio,

I am aware of groubBy limitations and this is subject for replacement.

I did try repartitionAndSortWithinPartitions but then I end up with maybe too 
much shuffling one from groupByKey and the other from repartition.

My expectation was that since N records are partitioned to the same partition 
...say 0, doing a mapPartition on the resulting RDD would place all records for 
partition 0 into a single on a single node. Seems to me that this is not quite 
the case since N can span to multiple HDFS blocks and subsequent mapPartition 
operation would be paralelized on multiple nodes. In my case I see 2 yarn 
containers receiving records during a mapPartition operation applied on the 
sorted partition. I need to test more but it seems that applying the same 
partitioner again right before the last mapPartition can help.

Best,
Marius

On Tue, Apr 28, 2015 at 4:40 PM Silvio Fiorito 
silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote:
Hi Marius,

What’s the expected output?

I would recommend avoiding the groupByKey if possible since it’s going to force 
all records for each key to go to an executor which may overload it.

Also if you need to sort and repartition, try using 
repartitionAndSortWithinPartitions to do it in one shot.

Thanks,
Silvio

From: Marius Danciu
Date: Tuesday, April 28, 2015 at 8:10 AM
To: user
Subject: Spark partitioning question

Hello all,

I have the following Spark (pseudo)code:

rdd = mapPartitionsWithIndex(...)
.mapPartitionsToPair(...)
.groupByKey()
.sortByKey(comparator)
.partitionBy(myPartitioner)
.mapPartitionsWithIndex(...)
.mapPartitionsToPair( f )

The input data has 2 input splits (yarn 2.6.0).
myPartitioner partitions all the records on partition 0, which is correct, so 
the intuition is that f provided to the last transformation 
(mapPartitionsToPair) would run sequentially inside a single yarn container. 
However from yarn logs I do see that both yarn containers are processing 
records from the same partition ... and sometimes  the over all job fails (due 
to the code in f which expects a certain order of records) and yarn container 1 
receives the records as expected, whereas yarn container 2 receives a subset of 
records ... for a reason I cannot explain and f fails.

The overall behavior of this job is that sometimes it succeeds and sometimes it 
fails ... apparently due to inconsistent propagation of sorted records to yarn 
containers.


If any of this makes any sense to you, please let me know what I am missing.



Best,
Marius


Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

2015-04-28 Thread Silvio Fiorito
If you need to keep the keys, you can use aggregateByKey to calculate an avg of 
the values:

val step1 = data.aggregateByKey((0.0, 0))((a, b) = (a._1 + b, a._2 + 1), (a, 
b) = (a._1 + b._1, a._2 + b._2))
val avgByKey = step1.mapValues(i = i._1/i._2)

Essentially, what this is doing is passing an initializer for sum and count, 
then summing each pair of values and counting the number of values. The last 
argument is to combine the results of each partition, if the data was spread 
across partitions. The result is a tuple of sum and count for each key.

Use mapValues to keep your partitioning by keys intact and minimize a full 
shuffle for downstream keyed operations. It just calculates the avg for each 
key.

From: Todd Nist
Date: Tuesday, April 28, 2015 at 10:20 AM
To: subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

Can you simply apply the 
https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.util.StatCounter
 to this?  You should be able to do something like this:

val stats = RDD.map(x = x._2).stats()

-Todd

On Tue, Apr 28, 2015 at 10:00 AM, 
subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io 
subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io wrote:
Hello Friends:

I generated a Pair RDD with K/V pairs, like so:


 rdd1.take(10) # Show a small sample.
 [(u'2013-10-09', 7.60117302052786),
 (u'2013-10-10', 9.322709163346612),
 (u'2013-10-10', 28.264462809917358),
 (u'2013-10-07', 9.664429530201343),
 (u'2013-10-07', 12.461538461538463),
 (u'2013-10-09', 20.76923076923077),
 (u'2013-10-08', 11.842105263157894),
 (u'2013-10-13', 32.32514177693762),
 (u'2013-10-13', 26.246),
 (u'2013-10-13', 10.693069306930692)]

Now from the above RDD, I would like to calculate an average of the VALUES for 
each KEY.
I can do so as shown here, which does work:

 countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of 
 countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
 rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerator (i.e. the 
 SUM).
 rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide 
 each SUM by it's denominator (i.e. COUNT)
 print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

But I wonder if the above semantics/approach is the optimal one; or whether 
perhaps there is a single API call
that handles common use case.

Improvement thoughts welcome. =:)

Thank you,
nmv



Re: hive-thriftserver maven artifact

2015-04-28 Thread Marco
Thx Ted for the info !

2015-04-27 23:51 GMT+02:00 Ted Yu yuzhih...@gmail.com:

 This is available for 1.3.1:

 http://mvnrepository.com/artifact/org.apache.spark/spark-hive-thriftserver_2.10

 FYI

 On Mon, Feb 16, 2015 at 7:24 AM, Marco marco@gmail.com wrote:

 Ok, so will it be only available for the next version (1.30)?

 2015-02-16 15:24 GMT+01:00 Ted Yu yuzhih...@gmail.com:

 I searched for 'spark-hive-thriftserver_2.10' on this page:
 http://mvnrepository.com/artifact/org.apache.spark

 Looks like it is not published.

 On Mon, Feb 16, 2015 at 5:44 AM, Marco marco@gmail.com wrote:

 Hi,

 I am referring to https://issues.apache.org/jira/browse/SPARK-4925
 (Hive Thriftserver Maven Artifact). Can somebody point me (URL) to the
 artifact in a public repository ? I have not found it @Maven Central.

 Thanks,
 Marco





 --
 Viele Grüße,
 Marco





-- 
Viele Grüße,
Marco


Re: Best practices on testing Spark jobs

2015-04-28 Thread Silvio Fiorito
Hi Michal,

Please try spark-testing-base by Holden. I’ve used it and it works well for 
unit testing batch and streaming jobs

https://github.com/holdenk/spark-testing-base

Thanks,
Silvio

From: Michal Michalski
Date: Tuesday, April 28, 2015 at 11:32 AM
To: user
Subject: Best practices on testing Spark jobs

Hi,

I have two questions regarding testing Spark jobs:

1. Is it possible to use Mockito for that purpose? I tried to use it, but it 
looks like there are no interactions with mocks. I didn't dive into the details 
of how Mockito works, but I guess it might be because of the serialization and 
how Spark distributes tasks. I'm not sure about it though and I'm looking for 
confirmation.

2. If not mockito, what's the alternative? What's the recommended way to test 
Spark jobs? Should I manually create mocks by e.g. extending all the classes 
I'd normally mock and changing the implementation of some methods? I don't like 
this idea but I can't really see any other options now.


Kind regards,
Michał Michalski,
michal.michal...@boxever.commailto:michal.michal...@boxever.com


Re: Re: Spark streaming - textFileStream/fileStream - Get file name

2015-04-28 Thread Vadim Bichutskiy
I was wondering about the same thing.

Vadim
ᐧ

On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com wrote:

 Looks to me  that the same thing also applies to the SparkContext.textFile
 or SparkContext.wholeTextFile, there is no way in RDD to figure out the
 file information where the data in RDD is from

 --
 bit1...@163.com


 *From:* Saisai Shao sai.sai.s...@gmail.com
 *Date:* 2015-04-29 10:10
 *To:* lokeshkumar lok...@dataken.net
 *CC:* spark users user@spark.apache.org
 *Subject:* Re: Spark streaming - textFileStream/fileStream - Get file name
 I think currently there's no API in Spark Streaming you can use to get the
 file names for file input streams. Actually it is not trivial to support
 this, may be you could file a JIRA with wishes you want the community to
 support, so anyone who is interested can take a crack on this.

 Thanks
 Jerry


 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net:

 Hi Forum,

 Using spark streaming and listening to the files in HDFS using
 textFileStream/fileStream methods, how do we get the fileNames which are
 read by these methods?

 I used textFileStream which has file contents in JavaDStream and I got no
 success with fileStream as it is throwing me a compilation error with
 spark
 version 1.3.1.

 Can someone please tell me if we have an API function or any other way to
 get the file names that these streaming methods read?

 Thanks
 Lokesh



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





RE: HBase HTable constructor hangs

2015-04-28 Thread Tridib Samanta
I am using Spark 1.2.0 and HBase 0.98.1-cdh5.1.0.
 
Here is the jstack trace. Complete stack trace attached.
 
Executor task launch worker-1 #58 daemon prio=5 os_prio=0 
tid=0x7fd3d0445000 nid=0x488 waiting on condition [0x7fd4507d9000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
 at java.lang.Thread.sleep(Native Method)
 at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:152)
 - locked 0xf8cb7258 (a 
org.apache.hadoop.hbase.client.RpcRetryingCaller)
 at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:705)
 at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1102)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1162)
 - locked 0xf84ac0b0 (a java.lang.Object)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011)
 at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326)
 at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192)
 at org.apache.hadoop.hbase.client.HTable.init(HTable.java:150)
 at com.mypackage.storeTuples(CubeStoreService.java:59)
 at 
com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:23)
 at 
com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:13)
 at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195)
 at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195)
 at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
 at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
 at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
 at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Executor task launch worker-0 #57 daemon prio=5 os_prio=0 
tid=0x7fd3d0443800 nid=0x487 waiting for monitor entry [0x7fd4506d8000]
   java.lang.Thread.State: BLOCKED (on object monitor)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1156)
 - waiting to lock 0xf84ac0b0 (a java.lang.Object)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011)
 at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326)
 at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192)
 at org.apache.hadoop.hbase.client.HTable.init(HTable.java:150)
 at com.mypackage.storeTuples(CubeStoreService.java:59)
 at 
com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:23)
 at 
com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:13)
 at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195)
 at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195)
 at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
 at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
 at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
 at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 
Date: Tue, 28 Apr 2015 19:35:26 -0700
Subject: Re: HBase HTable constructor hangs
From: yuzhih...@gmail.com
To: tridib.sama...@live.com
CC: user@spark.apache.org

Can you give us more information ?Such as hbase release, Spark release.
If you can pastebin jstack of the hanging HTable process, that would help.
BTW I used 

How to stream all data out of a Kafka topic once, then terminate job?

2015-04-28 Thread dgoldenberg
Hi,

I'm wondering about the use-case where you're not doing continuous,
incremental streaming of data out of Kafka but rather want to publish data
once with your Producer(s) and consume it once, in your Consumer, then
terminate the consumer Spark job.

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(...));

The batchDuration parameter is The time interval at which streaming data
will be divided into batches. Can this be worked somehow to cause Spark
Streaming to just get all the available data, then let all the RDD's within
the Kafka discretized stream get processed, and then just be done and
terminate, rather than wait another period and try and process any more data
from Kafka?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Re: Spark streaming - textFileStream/fileStream - Get file name

2015-04-28 Thread Saisai Shao
I think it might be useful in Spark Streaming's file input stream, but not
sure is it useful in SparkContext#textFile, since we specify the file by
our own, so why we still need to know the file name.

I will open up a JIRA to mention about this feature.

Thanks
Jerry


2015-04-29 10:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com:

 I was wondering about the same thing.

 Vadim
 ᐧ

 On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com wrote:

 Looks to me  that the same thing also applies to the
 SparkContext.textFile or SparkContext.wholeTextFile, there is no way in RDD
 to figure out the file information where the data in RDD is from

 --
 bit1...@163.com


 *From:* Saisai Shao sai.sai.s...@gmail.com
 *Date:* 2015-04-29 10:10
 *To:* lokeshkumar lok...@dataken.net
 *CC:* spark users user@spark.apache.org
 *Subject:* Re: Spark streaming - textFileStream/fileStream - Get file
 name
 I think currently there's no API in Spark Streaming you can use to get
 the file names for file input streams. Actually it is not trivial to
 support this, may be you could file a JIRA with wishes you want the
 community to support, so anyone who is interested can take a crack on this.

 Thanks
 Jerry


 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net:

 Hi Forum,

 Using spark streaming and listening to the files in HDFS using
 textFileStream/fileStream methods, how do we get the fileNames which are
 read by these methods?

 I used textFileStream which has file contents in JavaDStream and I got no
 success with fileStream as it is throwing me a compilation error with
 spark
 version 1.3.1.

 Can someone please tell me if we have an API function or any other way to
 get the file names that these streaming methods read?

 Thanks
 Lokesh



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Weird error/exception

2015-04-28 Thread Vadim Bichutskiy
I was having this issue when my batch interval was very big -- like 5
minutes. When my batch interval is
smaller, I don't get this exception. Can someone explain to me why this
might be happening?

Vadim
ᐧ

On Tue, Apr 28, 2015 at 4:26 PM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 I am using Spark Streaming to monitor an S3 bucket. Everything appears to
 be fine. But every batch interval I get the following:

 *15/04/28 16:12:36 WARN HttpMethodReleaseInputStream: Attempting to
 release HttpMethod in finalize() as its response data stream has gone out
 of scope. This attempt will not always succeed and cannot be relied upon!
 Please ensure response data streams are always fully consumed or closed to
 avoid HTTP connection starvation.*

 *15/04/28 16:12:36 WARN HttpMethodReleaseInputStream: Successfully
 released HttpMethod in finalize(). You were lucky this time... Please
 ensure response data streams are always fully consumed or closed.*

 *Traceback (most recent call last):*

 *  File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/daemon.py,
 line 162, in manager*

 *code = worker(sock)*

 *  File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/daemon.py,
 line 60, in worker*

 *worker_main(infile, outfile)*

 *  File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/worker.py,
 line 126, in main*

 *if read_int(infile) == SpecialLengths.END_OF_STREAM:*

 *  File
 /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/serializers.py, line
 528, in read_int*

 *raise EOFError*

 *EOFError*

 Does anyone know the cause of this and how to fix it?

 Thanks,

 Vadim
 ᐧ



External Application Run Status

2015-04-28 Thread Nastooh Avessta (navesta)
Hi
In a multi-node setup, I am invoking a number external apps, through 
Runtime.getRuntime.exec from rdd.map function, and would like to track their 
completion status. Evidently, such calls spawn a separate thread, which is not 
tracked by the standalone scheduler, i.e., reduce or collect are called prior 
to completion of these tasks. I am wondering if :

1-  there is way to delay calls to reduce/collect, till their respective 
external app threads are completed

2-  and if not, is it possible to query the node id and pid of such threads
Cheers,

[http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

Nastooh Avessta
ENGINEER.SOFTWARE ENGINEERING
nave...@cisco.com
Phone: +1 604 647 1527

Cisco Systems Limited
595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
VANCOUVER
BRITISH COLUMBIA
V7X 1J1
CA
Cisco.comhttp://www.cisco.com/





[Think before you print.]Think before you print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.
For corporate legal information go to:
http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. 
Phone: 416-306-7000; Fax: 416-306-7099. 
Preferenceshttp://www.cisco.com/offer/subscribe/?sid=000478326 - 
Unsubscribehttp://www.cisco.com/offer/unsubscribe/?sid=000478327 - 
Privacyhttp://www.cisco.com/web/siteassets/legal/privacy.html



Re: Re: Spark streaming - textFileStream/fileStream - Get file name

2015-04-28 Thread bit1...@163.com
For the SparkContext#textFile, if a directory is given as the path parameter  
,then it will pick up the files in the directory, so the same thing will occur.



bit1...@163.com
 
From: Saisai Shao
Date: 2015-04-29 10:54
To: Vadim Bichutskiy
CC: bit1...@163.com; lokeshkumar; user
Subject: Re: Re: Spark streaming - textFileStream/fileStream - Get file name
I think it might be useful in Spark Streaming's file input stream, but not sure 
is it useful in SparkContext#textFile, since we specify the file by our own, so 
why we still need to know the file name.

I will open up a JIRA to mention about this feature.

Thanks
Jerry


2015-04-29 10:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com:
I was wondering about the same thing.

Vadim
ᐧ

On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com wrote:
Looks to me  that the same thing also applies to the SparkContext.textFile or 
SparkContext.wholeTextFile, there is no way in RDD to figure out the file 
information where the data in RDD is from 



bit1...@163.com
 
From: Saisai Shao
Date: 2015-04-29 10:10
To: lokeshkumar
CC: spark users
Subject: Re: Spark streaming - textFileStream/fileStream - Get file name
I think currently there's no API in Spark Streaming you can use to get the file 
names for file input streams. Actually it is not trivial to support this, may 
be you could file a JIRA with wishes you want the community to support, so 
anyone who is interested can take a crack on this.

Thanks
Jerry


2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net:
Hi Forum,

Using spark streaming and listening to the files in HDFS using
textFileStream/fileStream methods, how do we get the fileNames which are
read by these methods?

I used textFileStream which has file contents in JavaDStream and I got no
success with fileStream as it is throwing me a compilation error with spark
version 1.3.1.

Can someone please tell me if we have an API function or any other way to
get the file names that these streaming methods read?

Thanks
Lokesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org






Re: HBase HTable constructor hangs

2015-04-28 Thread Ted Yu
How did you distribute hbase-site.xml to the nodes ?

Looks like HConnectionManager couldn't find the hbase:meta server.

Cheers

On Tue, Apr 28, 2015 at 9:19 PM, Tridib Samanta tridib.sama...@live.com
wrote:

 I am using Spark 1.2.0 and HBase 0.98.1-cdh5.1.0.

 Here is the jstack trace. Complete stack trace attached.

 Executor task launch worker-1 #58 daemon prio=5 os_prio=0
 tid=0x7fd3d0445000 nid=0x488 waiting on condition [0x7fd4507d9000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
  at java.lang.Thread.sleep(Native Method)
  at
 org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:152)
  - locked 0xf8cb7258 (a
 org.apache.hadoop.hbase.client.RpcRetryingCaller)
  at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:705)
  at
 org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144)
  at
 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1102)
  at
 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1162)
  - locked 0xf84ac0b0 (a java.lang.Object)
  at
 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054)
  at
 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011)
  at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326)
  at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192)
  at org.apache.hadoop.hbase.client.HTable.init(HTable.java:150)
  at com.mypackage.storeTuples(CubeStoreService.java:59)
  at
 com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:23)
  at
 com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:13)
  at
 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195)
  at
 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195)
  at
 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
  at
 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
  at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
  at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
  at org.apache.spark.scheduler.Task.run(Task.scala:56)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
 Executor task launch worker-0 #57 daemon prio=5 os_prio=0
 tid=0x7fd3d0443800 nid=0x487 waiting for monitor entry
 [0x7fd4506d8000]
java.lang.Thread.State: BLOCKED (on object monitor)
  at
 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1156)
  - waiting to lock 0xf84ac0b0 (a java.lang.Object)
  at
 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054)
  at
 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011)
  at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326)
  at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192)
  at org.apache.hadoop.hbase.client.HTable.init(HTable.java:150)
  at com.mypackage.storeTuples(CubeStoreService.java:59)
  at
 com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:23)
  at
 com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:13)
  at
 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195)
  at
 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195)
  at
 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
  at
 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
  at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
  at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
  at org.apache.spark.scheduler.Task.run(Task.scala:56)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

 --
 Date: Tue, 28 Apr 2015 19:35:26 -0700
 

RE: HBase HTable constructor hangs

2015-04-28 Thread Tridib Samanta
I am 100% sure how it's picking up the configuration. I copied the 
hbase-site.xml in hdfs/spark cluster (single machine). I also included 
hbase-site.xml in spark-job jar files. spark-job jar file also have yarn-site 
and mapred-site and core-site.xml in it.
 
One interesting thing is, when I run the spark-job jar as standalone and 
execute the HBase client from a main method, it works fine. Same client unable 
to connect/hangs when the jar is distributed in spark.
 
Thanks
Tridib
 
Date: Tue, 28 Apr 2015 21:25:41 -0700
Subject: Re: HBase HTable constructor hangs
From: yuzhih...@gmail.com
To: tridib.sama...@live.com
CC: user@spark.apache.org

How did you distribute hbase-site.xml to the nodes ?
Looks like HConnectionManager couldn't find the hbase:meta server.
Cheers
On Tue, Apr 28, 2015 at 9:19 PM, Tridib Samanta tridib.sama...@live.com wrote:



I am using Spark 1.2.0 and HBase 0.98.1-cdh5.1.0.
 
Here is the jstack trace. Complete stack trace attached.
 
Executor task launch worker-1 #58 daemon prio=5 os_prio=0 
tid=0x7fd3d0445000 nid=0x488 waiting on condition [0x7fd4507d9000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
 at java.lang.Thread.sleep(Native Method)
 at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:152)
 - locked 0xf8cb7258 (a 
org.apache.hadoop.hbase.client.RpcRetryingCaller)
 at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:705)
 at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1102)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1162)
 - locked 0xf84ac0b0 (a java.lang.Object)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011)
 at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326)
 at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192)
 at org.apache.hadoop.hbase.client.HTable.init(HTable.java:150)
 at com.mypackage.storeTuples(CubeStoreService.java:59)
 at 
com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:23)
 at 
com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:13)
 at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195)
 at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195)
 at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
 at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
 at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
 at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Executor task launch worker-0 #57 daemon prio=5 os_prio=0 
tid=0x7fd3d0443800 nid=0x487 waiting for monitor entry [0x7fd4506d8000]
   java.lang.Thread.State: BLOCKED (on object monitor)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1156)
 - waiting to lock 0xf84ac0b0 (a java.lang.Object)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011)
 at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326)
 at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192)
 at org.apache.hadoop.hbase.client.HTable.init(HTable.java:150)
 at com.mypackage.storeTuples(CubeStoreService.java:59)
 at 
com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:23)
 at 
com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:13)
 at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195)
 at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195)
 at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
 at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
 at 

How to run self-build spark on EC2?

2015-04-28 Thread Bo Fu
Hi all,

I have an issue. I added some timestamps in Spark source code and built it 
using:

mvn package -DskipTests

I checked the new version in my own computer and it works. However, when I ran 
spark on EC2, the spark code EC2 machines ran is the original version.

Anyone knows how to deploy the changed spark source code into EC2?
Thx a lot


Bo Fu

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



solr in spark

2015-04-28 Thread Jeetendra Gangele
Does anyone tried using solr inside spark?
below is the project describing it.
https://github.com/LucidWorks/spark-solr.

I have a requirement in which I want to index 20 millions companies name
and then search as and when new data comes in. the output should be list of
companies matching the query.

Spark has inbuilt elastic search but for this purpose Elastic search is not
a good option since this is totally text search problem?

Elastic search is good  for filtering and grouping.

Does any body used solr inside spark?

Regards
jeetendra


Re: solr in spark

2015-04-28 Thread Jeetendra Gangele
Thanks for reply.

Elastic search index will be within my Cluster? or I need the separate host
the elastic search?


On 28 April 2015 at 22:03, Nick Pentreath nick.pentre...@gmail.com wrote:

 I haven't used Solr for a long time, and haven't used Solr in Spark.

 However, why do you say Elasticsearch is not a good option ...? ES
 absolutely supports full-text search and not just filtering and grouping
 (in fact it's original purpose was and still is text search, though
 filtering, grouping and aggregation are heavily used).
 http://www.elastic.co/guide/en/elasticsearch/guide/master/full-text-search.html



 On Tue, Apr 28, 2015 at 6:27 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Does anyone tried using solr inside spark?
 below is the project describing it.
 https://github.com/LucidWorks/spark-solr.

 I have a requirement in which I want to index 20 millions companies name
 and then search as and when new data comes in. the output should be list of
 companies matching the query.

 Spark has inbuilt elastic search but for this purpose Elastic search is
 not a good option since this is totally text search problem?

 Elastic search is good  for filtering and grouping.

 Does any body used solr inside spark?

 Regards
 jeetendra





Re: solr in spark

2015-04-28 Thread Nick Pentreath
I haven't used Solr for a long time, and haven't used Solr in Spark.

However, why do you say Elasticsearch is not a good option ...? ES
absolutely supports full-text search and not just filtering and grouping
(in fact it's original purpose was and still is text search, though
filtering, grouping and aggregation are heavily used).
http://www.elastic.co/guide/en/elasticsearch/guide/master/full-text-search.html



On Tue, Apr 28, 2015 at 6:27 PM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Does anyone tried using solr inside spark?
 below is the project describing it.
 https://github.com/LucidWorks/spark-solr.

 I have a requirement in which I want to index 20 millions companies name
 and then search as and when new data comes in. the output should be list of
 companies matching the query.

 Spark has inbuilt elastic search but for this purpose Elastic search is
 not a good option since this is totally text search problem?

 Elastic search is good  for filtering and grouping.

 Does any body used solr inside spark?

 Regards
 jeetendra




Re: solr in spark

2015-04-28 Thread andy petrella
AFAIK Datastax is heavily looking at it. they have a good integration of
Cassandra with it. the next was clearly to have a strong combination of the
three in one of the coming releases

Le mar. 28 avr. 2015 18:28, Jeetendra Gangele gangele...@gmail.com a
écrit :

 Does anyone tried using solr inside spark?
 below is the project describing it.
 https://github.com/LucidWorks/spark-solr.

 I have a requirement in which I want to index 20 millions companies name
 and then search as and when new data comes in. the output should be list of
 companies matching the query.

 Spark has inbuilt elastic search but for this purpose Elastic search is
 not a good option since this is totally text search problem?

 Elastic search is good  for filtering and grouping.

 Does any body used solr inside spark?

 Regards
 jeetendra




PySpark: slicing issue with dataframes

2015-04-28 Thread Ali Bajwa
Hi experts,

Trying to use the slicing functionality in strings as part of a Spark
program (PySpark) I get this error:

 Code 

import pandas as pd
from pyspark.sql import SQLContext
hc = SQLContext(sc)
A = pd.DataFrame({'Firstname': ['James', 'Ali', 'Daniel'], 'Lastname':
['Jones', 'Bajwa', 'Day']})
a = hc.createDataFrame(A)
print A

b = a.select(a.Firstname[:2])
print b.toPandas()
c = a.select(a.Lastname[2:])
print c.toPandas()

Output:

 Firstname Lastname
0 JamesJones
1   AliBajwa
2Daniel  Day
  SUBSTR(Firstname, 0, 2)
0  Ja
1  Al
2  Da

---
Py4JError Traceback (most recent call last)
ipython-input-17-6ee5d7d069ce in module()
 10 b = a.select(a.Firstname[:2])
 11 print b.toPandas()
--- 12 c = a.select(a.Lastname[2:])
 13 print c.toPandas()

/home/jupyter/spark-1.3.1/python/pyspark/sql/dataframe.pyc in substr(self,
startPos, length)
   1089 raise TypeError(Can not mix the type)
   1090 if isinstance(startPos, (int, long)):
- 1091 jc = self._jc.substr(startPos, length)
   1092 elif isinstance(startPos, Column):
   1093 jc = self._jc.substr(startPos._jc, length._jc)

/home/jupyter/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
-- 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:

/home/jupyter/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
302 raise Py4JError(
303 'An error occurred while calling {0}{1}{2}.
Trace:\n{3}\n'.
-- 304 format(target_id, '.', name, value))
305 else:
306 raise Py4JError(

Py4JError: An error occurred while calling o1887.substr. Trace:
py4j.Py4JException: Method substr([class java.lang.Integer, class
java.lang.Long]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

Looks like X[:2] works but X[2:] fails with the error above
Anyone else have this issue?

Clearly I can use substr() to workaround this, but if this is a confirmed
bug we should open a JIRA.

Thanks,
Ali


Re: Question regarding join with multiple columns with pyspark

2015-04-28 Thread Ali Bajwa
Thanks again Ayan! To close the loop on this issue, I have filed the below
JIRA to track the issue:
https://issues.apache.org/jira/browse/SPARK-7197



On Fri, Apr 24, 2015 at 8:21 PM, ayan guha guha.a...@gmail.com wrote:

 I just tested, your observation in DataFrame API is correct. It behaves
 weirdly in case of multiple column join.  (Maybe we should report a Jira?)

 Solution: You can go back to our good old composite key field
 concatenation method. Not ideal, but workaround. (Of course you can use
 realSQL as well, as shown below)

 set up Data:

 a =
 [[1993,1,100],[1993,2,200],[1994,1,1000],[1994,3,3000],[2000,1,1]]
 b = [[1993,1,A],[1994,1,AA],[2000,1,AAA]]
 YM1 = sc.parallelize(a).map(lambda tup: Row(yr=int(tup[0]),mn =
 int(tup[1]), price = int(tup[2]),joiningKey=str(tup[0])+~+str(tup[1])))
 YM2 = sc.parallelize(b).map(lambda tup: Row(yr=int(tup[0]),mn =
 int(tup[1]), name = str(tup[2]),joiningKey=str(tup[0])+~+str(tup[1])))
 print YM1.collect()
 print YM2.collect()

 YM1DF = ssc.createDataFrame(YM1)
 YM2DF = ssc.createDataFrame(YM2)

 print YM1DF.printSchema()
 print YM2DF.printSchema()

 This DOES NOT WORK ---

 YMJN = YM1DF.join(YM2DF,YM1DF.yr==YM2DF.yr and
 YM1DF.mn==YM2DF.mn,inner)
 print YMJN.printSchema()
 for l in YMJN.collect():
 print l

 Row(joiningKey=u'1993~1', mn=1, price=100, yr=1993, joiningKey=u'1993~1',
 mn=1, name=u'A', yr=1993)
 Row(joiningKey=u'1994~1', mn=1, price=100, yr=1994, joiningKey=u'1994~1',
 mn=1, name=u'AA', yr=1994)
 Row(joiningKey=u'2000~1', mn=1, price=100, yr=2000, joiningKey=u'2000~1',
 mn=1, name=u'AAA', yr=2000)
 Row(joiningKey=u'1993~1', mn=1, price=1000, yr=1993, joiningKey=u'1993~1',
 mn=1, name=u'A', yr=1993)
 Row(joiningKey=u'1994~1', mn=1, price=1000, yr=1994, joiningKey=u'1994~1',
 mn=1, name=u'AA', yr=1994)
 Row(joiningKey=u'2000~1', mn=1, price=1000, yr=2000, joiningKey=u'2000~1',
 mn=1, name=u'AAA', yr=2000)
 Row(joiningKey=u'1993~1', mn=1, price=1, yr=1993,
 joiningKey=u'1993~1', mn=1, name=u'A', yr=1993)
 Row(joiningKey=u'1994~1', mn=1, price=1, yr=1994,
 joiningKey=u'1994~1', mn=1, name=u'AA', yr=1994)
 Row(joiningKey=u'2000~1', mn=1, price=1, yr=2000,
 joiningKey=u'2000~1', mn=1, name=u'AAA', yr=2000)

 -

 SQL Solution - works as expected

 YM1DF.registerTempTable(ymdf1)
 YM2DF.registerTempTable(ymdf2)
 YMJNS = ssc.sql(select * from ymdf1 inner join ymdf2 on
 ymdf1.yr=ymdf2.yr and ymdf1.mn=ymdf2.mn)
 print YMJNS.printSchema()
 for l in YMJNS.collect():
 print l

 Row(joiningKey=u'1994~1', mn=1, price=1000, yr=1994, joiningKey=u'1994~1',
 mn=1, name=u'AA', yr=1994)
 Row(joiningKey=u'2000~1', mn=1, price=1, yr=2000,
 joiningKey=u'2000~1', mn=1, name=u'AAA', yr=2000)
 Row(joiningKey=u'1993~1', mn=1, price=100, yr=1993, joiningKey=u'1993~1',
 mn=1, name=u'A', yr=1993)

 -

 Field concat method, works as well

 YMJNA = YM1DF.join(YM2DF,YM1DF.joiningKey==YM2DF.joiningKey,inner)
 print YMJNA.printSchema()
 for l in YMJNA.collect():
 print l

 Row(joiningKey=u'1994~1', mn=1, price=1000, yr=1994, joiningKey=u'1994~1',
 mn=1, name=u'AA', yr=1994)
 Row(joiningKey=u'1993~1', mn=1, price=100, yr=1993, joiningKey=u'1993~1',
 mn=1, name=u'A', yr=1993)
 Row(joiningKey=u'2000~1', mn=1, price=1, yr=2000,
 joiningKey=u'2000~1', mn=1, name=u'AAA', yr=2000)

 On Sat, Apr 25, 2015 at 10:18 AM, Ali Bajwa ali.ba...@gmail.com wrote:

 Any ideas on this? Any sample code to join 2 data frames on two columns?

 Thanks
 Ali

 On Apr 23, 2015, at 1:05 PM, Ali Bajwa ali.ba...@gmail.com wrote:

  Hi experts,
 
  Sorry if this is a n00b question or has already been answered...
 
  Am trying to use the data frames API in python to join 2 dataframes
  with more than 1 column. The example I've seen in the documentation
  only shows a single column - so I tried this:
 
  Example code
 
  import pandas as pd
  from pyspark.sql import SQLContext
  hc = SQLContext(sc)
  A = pd.DataFrame({'year': ['1993', '2005', '1994'], 'month': ['5',
  '12', '12'], 'value': [100, 200, 300]})
  a = hc.createDataFrame(A)
  B = pd.DataFrame({'year': ['1993', '1993'], 'month': ['12', '12'],
  'value': [101, 102]})
  b = hc.createDataFrame(B)
 
  print Pandas  # try with Pandas
  print A
  print B
  print pd.merge(A, B, on=['year', 'month'], how='inner')
 
  print Spark
  print a.toPandas()
  print b.toPandas()
  print a.join(b, a.year==b.year and a.month==b.month, 'inner').toPandas()
 
 
  *Output
 
  Pandas
   month  value  year
  0 5100  1993
  112200  2005
  212300  1994
 
   month  value  year
  012101  1993
  112102  1993
 
  Empty DataFrame
 
  Columns: [month, value_x, year, value_y]
 
  Index: []
 
  Spark
   month  value  year
  0 5100  1993

Re: How to deploy self-build spark source code on EC2

2015-04-28 Thread Nicholas Chammas
[-dev] [+user]

This is a question for the user list, not the dev list.

Use the --spark-version and --spark-git-repo options to specify your own
repo and hash to deploy.

Source code link.
https://github.com/apache/spark/blob/268c419f1586110b90e68f98cd000a782d18828c/ec2/spark_ec2.py#L189-L195

Nick

On Tue, Apr 28, 2015 at 12:14 PM Bo Fu b...@uchicago.edu
http://mailto:b...@uchicago.edu wrote:

Hi all,

 I have an issue. I added some timestamps in Spark source code and built it
 using:

 mvn package -DskipTests

 I checked the new version in my own computer and it works. However, when I
 ran spark on EC2, the spark code EC2 machines ran is the original version.

 Anyone knows how to deploy the changed spark source code into EC2?
 Thx a lot


 Bo Fu

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org

  ​


Spark - Timeout Issues - OutOfMemoryError

2015-04-28 Thread ๏̯͡๏
I have a SparkApp that runs completes in 45 mins for 5 files (5*750MB size)
and it takes 16 executors to do so.

I wanted to run it against 10 files of each input type (10*3 files as there
are three inputs that are transformed). [Input1 = 10*750 MB,
Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors.

I see multiple
5/04/28 09:23:31 WARN executor.Executor: Issue communicating with driver in
heartbeater
org.apache.spark.SparkException: Error sending message [message =
Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22,
phxaishdc9dn1048.stratus.phx.ebay.com, 39505))]
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
... 1 more


When i searched deeper, i found OOM error.
15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: Removing block
manager BlockManagerId(17, phxdpehdc9dn2643.stratus.phx.ebay.com, 36819)
15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: Removing
BlockManager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com,
48304) with no recent heart beats: 121200ms exceeds 12ms
15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: Removing block
manager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com, 48304)
15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in thread
task-result-getter-3
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2245)
at java.util.Arrays.copyOf(Arrays.java:2219)
at java.util.ArrayList.grow(ArrayList.java:242)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
at java.util.ArrayList.add(ArrayList.java:440)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
at
org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
at
org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Exception in thread task-result-getter-3 java.lang.OutOfMemoryError: Java
heap space
at java.util.Arrays.copyOf(Arrays.java:2245)
at java.util.Arrays.copyOf(Arrays.java:2219)
at java.util.ArrayList.grow(ArrayList.java:242)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
at java.util.ArrayList.add(ArrayList.java:440)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
at

Spark streaming - textFileStream/fileStream - Get file name

2015-04-28 Thread lokeshkumar
Hi Forum,

Using spark streaming and listening to the files in HDFS using
textFileStream/fileStream methods, how do we get the fileNames which are
read by these methods?

I used textFileStream which has file contents in JavaDStream and I got no
success with fileStream as it is throwing me a compilation error with spark
version 1.3.1.

Can someone please tell me if we have an API function or any other way to
get the file names that these streaming methods read?

Thanks
Lokesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Weird error/exception

2015-04-28 Thread Vadim Bichutskiy
I am using Spark Streaming to monitor an S3 bucket. Everything appears to
be fine. But every batch interval I get the following:

*15/04/28 16:12:36 WARN HttpMethodReleaseInputStream: Attempting to release
HttpMethod in finalize() as its response data stream has gone out of scope.
This attempt will not always succeed and cannot be relied upon! Please
ensure response data streams are always fully consumed or closed to avoid
HTTP connection starvation.*

*15/04/28 16:12:36 WARN HttpMethodReleaseInputStream: Successfully released
HttpMethod in finalize(). You were lucky this time... Please ensure
response data streams are always fully consumed or closed.*

*Traceback (most recent call last):*

*  File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/daemon.py,
line 162, in manager*

*code = worker(sock)*

*  File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/daemon.py,
line 60, in worker*

*worker_main(infile, outfile)*

*  File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/worker.py,
line 126, in main*

*if read_int(infile) == SpecialLengths.END_OF_STREAM:*

*  File
/Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/serializers.py, line
528, in read_int*

*raise EOFError*

*EOFError*

Does anyone know the cause of this and how to fix it?

Thanks,

Vadim
ᐧ


RE: Scalability of group by

2015-04-28 Thread Ulanov, Alexander
Richard,

The same problem is with sort.

I have enough disk space and tmp folder. The errors in logs tell out of memory. 
I wonder what does it hold in memory?

Alexander

From: Richard Marscher [mailto:rmarsc...@localytics.com]
Sent: Tuesday, April 28, 2015 7:34 AM
To: Ulanov, Alexander
Cc: user@spark.apache.org
Subject: Re: Scalability of group by

Hi,

I can offer a few ideas to investigate in regards to your issue here. I've run 
into resource issues doing shuffle operations with a much smaller dataset than 
2B. The data is going to be saved to disk by the BlockManager as part of the 
shuffle and then redistributed across the cluster as relevant to the group by. 
So the data is going to be replicated during the operation.

I might suggest trying to allocate more memory for your executors in your 
cluster. You might also want to look into configuring more explicitly the 
shuffle functionality 
(https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior). 
Check the disk usage on the worker nodes, in our case we actually had small 
disk space to start with and were running out of temporary space for the 
shuffle operation.

I believe you should also be able to find more clear errors in logs from the 
worker nodes if you haven't checked yet.

On Mon, Apr 27, 2015 at 10:02 PM, Ulanov, Alexander 
alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote:
It works on a smaller dataset of 100 rows. Probably I could find the size when 
it fails using binary search. However, it would not help me because I need to 
work with 2B rows.

From: ayan guha [mailto:guha.a...@gmail.commailto:guha.a...@gmail.com]
Sent: Monday, April 27, 2015 6:58 PM
To: Ulanov, Alexander
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Scalability of group by


Hi

Can you test on a smaller dataset to identify if it is cluster issue or scaling 
issue in spark
On 28 Apr 2015 11:30, Ulanov, Alexander 
alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote:
Hi,

I am running a group by on a dataset of 2B of RDD[Row [id, time, value]] in 
Spark 1.3 as follows:
“select id, time, first(value) from data group by id, time”

My cluster is 8 nodes with 16GB RAM and one worker per node. Each executor is 
allocated with 5GB of memory. However, all executors are being lost during the 
query execution and I get “ExecutorLostFailure”.

Could you suggest what might be the reason for it? Could it be that “group by” 
is implemented as RDD.groupBy so it holds the group by result in memory? What 
is the workaround?

Best regards, Alexander



Re: Spark SQL 1.3.1 saveAsParquetFile will output tachyon file with different block size

2015-04-28 Thread Calvin Jia
Hi,

You can apply this patch https://github.com/apache/spark/pull/5354 and
recompile.

Hope this helps,
Calvin

On Tue, Apr 28, 2015 at 1:19 PM, sara mustafa eng.sara.must...@gmail.com
wrote:

 Hi Zhang,

 How did you compile Spark 1.3.1 with Tachyon? when i changed Tachyon
 version
 to 0.6.3 in core/pom.xml, make-distribution.sh and try to compile again,
 many compilation errors raised.

 Thanks,




 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-1-3-1-saveAsParquetFile-will-output-tachyon-file-with-different-block-size-tp11561p11870.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




Re: Initial tasks in job take time

2015-04-28 Thread Anshul Singhle
yes
On 29 Apr 2015 03:31, ayan guha guha.a...@gmail.com wrote:

 Are your driver running on the same m/c as master?
 On 29 Apr 2015 03:59, Anshul Singhle ans...@betaglide.com wrote:

 Hi,

 I'm running short spark jobs on rdds cached in memory. I'm also using a
 long running job context. I want to be able to complete my jobs (on the
 cached rdd) in under 1 sec.
 I'm getting the following job times with about 15 GB of data distributed
 across 6 nodes. Each executor has about 20GB of memory available. My
 context has about 26 cores in total.

 If number of partitions  no of cores -

 Some jobs run in 3s others take about 6s -- the time difference can be
 explained by GC time.

 If number of partitions = no of cores -

 All jobs run in 4s. The initial tasks of each stage on every executor
 take about 1s.

 If partitions  cores -

 Jobs take more time. The initial tasks of each stage on every executor
 take about 1s. The other tasks run in 45-50 ms each. However, since the
 initial tasks again take about 1s each, the total time in this case is
 about 6s which is more than the previous case.

 Clearly the limiting factor here is the initial set of tasks. For every
 case, these tasks take 1s to run, no matter the amount of partitions. Hence
 best results are obtained with partitions = cores, because in that case.
 every core gets 1 task which takes 1s to run.
 In this case, I get 0 GC time. The only explanation is scheduling delay
 which is about 0.2 - 0.3 seconds. I looked at my task size and result size
 and that has no bearing on this delay. Also, I'm not getting the task size
 warnings in the logs.

 For what I can understand, the first time a task runs on a core, it takes
 1s to run. Is this normal?

 Is it possible to get sub-second latencies?

 Can something be done about the scheduler delay?

 What other things can I look at to reduce this time?

 Regards,

 Anshul





rdd.count with 100 elements taking 1 second to run

2015-04-28 Thread Anshul Singhle
Hi,

I'm running the following code in my cluster (standalone mode) via spark
shell -

val rdd = sc.parallelize(1 to 100)
rdd.count

This takes around 1.2s to run.

Is this expected or am I configuring something wrong?

I'm using about 30 cores with 512MB executor memory

As expected, GC time is negligible. I'm just getting some scheduler delay
and 1s to launch the task

Thanks,

Anshul


Re: Initial tasks in job take time

2015-04-28 Thread ayan guha
Are your driver running on the same m/c as master?
On 29 Apr 2015 03:59, Anshul Singhle ans...@betaglide.com wrote:

 Hi,

 I'm running short spark jobs on rdds cached in memory. I'm also using a
 long running job context. I want to be able to complete my jobs (on the
 cached rdd) in under 1 sec.
 I'm getting the following job times with about 15 GB of data distributed
 across 6 nodes. Each executor has about 20GB of memory available. My
 context has about 26 cores in total.

 If number of partitions  no of cores -

 Some jobs run in 3s others take about 6s -- the time difference can be
 explained by GC time.

 If number of partitions = no of cores -

 All jobs run in 4s. The initial tasks of each stage on every executor take
 about 1s.

 If partitions  cores -

 Jobs take more time. The initial tasks of each stage on every executor
 take about 1s. The other tasks run in 45-50 ms each. However, since the
 initial tasks again take about 1s each, the total time in this case is
 about 6s which is more than the previous case.

 Clearly the limiting factor here is the initial set of tasks. For every
 case, these tasks take 1s to run, no matter the amount of partitions. Hence
 best results are obtained with partitions = cores, because in that case.
 every core gets 1 task which takes 1s to run.
 In this case, I get 0 GC time. The only explanation is scheduling delay
 which is about 0.2 - 0.3 seconds. I looked at my task size and result size
 and that has no bearing on this delay. Also, I'm not getting the task size
 warnings in the logs.

 For what I can understand, the first time a task runs on a core, it takes
 1s to run. Is this normal?

 Is it possible to get sub-second latencies?

 Can something be done about the scheduler delay?

 What other things can I look at to reduce this time?

 Regards,

 Anshul





Metric collection

2015-04-28 Thread Giovanni Paolo Gibilisco
Hi,
I would like to collect some metrics from spark and plot them with
graphite. I managed to do that withe the metrics provided by the
or.apache.park.metrics.source.JvmSource but I would like to know if there
are other sources available beside this one.
Best,
Giovanni


Spark Sql: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient

2015-04-28 Thread LinQili
Hi all.
I was launching a spark sql job on my own machine, not on the spark cluster 
machines, and failed. The excpetion info is:
15/04/28 16:28:04 INFO yarn.ApplicationMaster: Final app status: FAILED, 
exitCode: 15, (reason: User class threw exception: java.lang.RuntimeException: 
Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient)
Exception in thread Driver java.lang.RuntimeException: 
java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346)
at 
org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235)
at 
org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:231)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.sql.hive.HiveContext.x$3$lzycompute(HiveContext.scala:231)
at org.apache.spark.sql.hive.HiveContext.x$3(HiveContext.scala:229)
at 
org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:229)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:229)
at 
org.apache.spark.sql.hive.HiveMetastoreCatalog.init(HiveMetastoreCatalog.scala:55)
at 
org.apache.spark.sql.hive.HiveContext$$anon$2.init(HiveContext.scala:253)
at 
org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:253)
at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:253)
at 
org.apache.spark.sql.hive.HiveContext$$anon$4.init(HiveContext.scala:263)
at 
org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:263)
at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:262)
at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
at com.nd.huayuedu.DoExecute$.doExecute(DoExecute.scala:16)
at com.nd.huayuedu.HiveFromSpark$.main(HiveFromSpark.scala:30)
at com.nd.huayuedu.HiveFromSpark.main(HiveFromSpark.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:441)
Caused by: java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72)
at 
org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340)
... 27 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410)
... 32 more

Caused by: javax.jdo.JDOFatalUserException: Class 
org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found.
NestedThrowables:
java.lang.ClassNotFoundException: 
org.datanucleus.api.jdo.JDOPersistenceManagerFactory
at 
javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310)
at 
org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339)
at 
org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:248)
at 
org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:223)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:70)
at 
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
at 
org.apache.hadoop.hive.metastore.RawStoreProxy.init(RawStoreProxy.java:58)
at 

[Spark SQL] Problems creating a table in specified schema/database

2015-04-28 Thread James Aley
Hey all,

I'm trying to create tables from existing Parquet data in different
schemata. The following isn't working for me:

CREATE DATABASE foo;

CREATE TABLE foo.bar
USING com.databricks.spark.avro
OPTIONS (path '...');

-- Error: org.apache.spark.sql.AnalysisException: cannot recognize input
near 'USING' 'com' '.' in table name; line 1 pos 13 (state=,code=0)

I also tried

USE foo;
CREATE TABLE bar
USING com.databricks.spark.avro
OPTIONS (path '...');

-- Creates the table successfully, but in the default.* schema.


This is on Spark 1.3.1, running on YARN, Hive 0.13.1. Any suggestions?
Should this work?


James.


Re: New JIRA - [SQL] Can't remove columns from DataFrame or save DataFrame from a join due to duplicate columns

2015-04-28 Thread ayan guha
Alias function not in python yet. I suggest to write SQL if your data suits
it
On 28 Apr 2015 14:42, Don Drake dondr...@gmail.com wrote:

 https://issues.apache.org/jira/browse/SPARK-7182

 Can anyone suggest a workaround for the above issue?

 Thanks.

 -Don

 --
 Donald Drake
 Drake Consulting
 http://www.drakeconsulting.com/
 http://www.MailLaunder.com/




Re: Serialization error

2015-04-28 Thread ๏̯͡๏
val conf = new SparkConf()

  .setAppName(detail)

  .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)

  .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize
).get)

  .set(spark.kryoserializer.buffer.max.mb, arguments.get(
maxbuffersize).get)

  .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get)

.registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key]))


Can you try this ?

On Tue, Apr 28, 2015 at 11:11 AM, madhvi madhvi.gu...@orkash.com wrote:

 Hi,

 While connecting to accumulo through spark by making sparkRDD I am getting
 the following error:
  object not serializable (class: org.apache.accumulo.core.data.Key)

 This is due to the 'key' class of accumulo which does not implement
 serializable interface.How it can be solved and accumulo can be used with
 spark

 Thanks
 Madhvi

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Deepak


Re: solr in spark

2015-04-28 Thread Nick Pentreath
Depends on your use case and search volume. Typically you'd have a dedicated ES 
cluster if your app is doing a lot of real time indexing and search.




If it's only for spark integration then you could colocate ES and spark



—
Sent from Mailbox

On Tue, Apr 28, 2015 at 6:41 PM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Thanks for reply.
 Elastic search index will be within my Cluster? or I need the separate host
 the elastic search?
 On 28 April 2015 at 22:03, Nick Pentreath nick.pentre...@gmail.com wrote:
 I haven't used Solr for a long time, and haven't used Solr in Spark.

 However, why do you say Elasticsearch is not a good option ...? ES
 absolutely supports full-text search and not just filtering and grouping
 (in fact it's original purpose was and still is text search, though
 filtering, grouping and aggregation are heavily used).
 http://www.elastic.co/guide/en/elasticsearch/guide/master/full-text-search.html



 On Tue, Apr 28, 2015 at 6:27 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Does anyone tried using solr inside spark?
 below is the project describing it.
 https://github.com/LucidWorks/spark-solr.

 I have a requirement in which I want to index 20 millions companies name
 and then search as and when new data comes in. the output should be list of
 companies matching the query.

 Spark has inbuilt elastic search but for this purpose Elastic search is
 not a good option since this is totally text search problem?

 Elastic search is good  for filtering and grouping.

 Does any body used solr inside spark?

 Regards
 jeetendra




Re: JAVA_HOME problem

2015-04-28 Thread Marcelo Vanzin
Are you using a Spark build that matches your YARN cluster version?

That seems like it could happen if you're using a Spark built against
a newer version of YARN than you're running.

On Thu, Apr 2, 2015 at 12:53 AM, 董帅阳 917361...@qq.com wrote:
 spark 1.3.0


 spark@pc-zjqdyyn1:~ tail /etc/profile
 export JAVA_HOME=/usr/jdk64/jdk1.7.0_45
 export PATH=$PATH:$JAVA_HOME/bin

 #
 # End of /etc/profile
 #‍


 But ERROR LOG

 Container: container_1427449644855_0092_02_01 on pc-zjqdyy04_45454
 
 LogType: stderr
 LogLength: 61
 Log Contents:
 /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory

 LogType: stdout
 LogLength: 0
 Log Contents:‍



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Initial tasks in job take time

2015-04-28 Thread Anshul Singhle
Hi,

I'm running short spark jobs on rdds cached in memory. I'm also using a
long running job context. I want to be able to complete my jobs (on the
cached rdd) in under 1 sec.
I'm getting the following job times with about 15 GB of data distributed
across 6 nodes. Each executor has about 20GB of memory available. My
context has about 26 cores in total.

If number of partitions  no of cores -

Some jobs run in 3s others take about 6s -- the time difference can be
explained by GC time.

If number of partitions = no of cores -

All jobs run in 4s. The initial tasks of each stage on every executor take
about 1s.

If partitions  cores -

Jobs take more time. The initial tasks of each stage on every executor take
about 1s. The other tasks run in 45-50 ms each. However, since the initial
tasks again take about 1s each, the total time in this case is about 6s
which is more than the previous case.

Clearly the limiting factor here is the initial set of tasks. For every
case, these tasks take 1s to run, no matter the amount of partitions. Hence
best results are obtained with partitions = cores, because in that case.
every core gets 1 task which takes 1s to run.
In this case, I get 0 GC time. The only explanation is scheduling delay
which is about 0.2 - 0.3 seconds. I looked at my task size and result size
and that has no bearing on this delay. Also, I'm not getting the task size
warnings in the logs.

For what I can understand, the first time a task runs on a core, it takes
1s to run. Is this normal?

Is it possible to get sub-second latencies?

Can something be done about the scheduler delay?

What other things can I look at to reduce this time?

Regards,

Anshul


default number of reducers

2015-04-28 Thread Shushant Arora
In Normal MR job can I configure ( cluster wide) default number of reducers
- if I don't specify any reducers in my job


Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

2015-04-28 Thread Silvio Fiorito
The initializer is a tuple (0, 0) it seems you just have 0

From: subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io
Organization: PRISMALYTICS, LLC.
Reply-To: subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io
Date: Tuesday, April 28, 2015 at 1:28 PM
To: Silvio Fiorito, Todd Nist
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

Thank you Todd, Silvio...

I had to stare at Silvio's answer for a while.

If I'm interpreting the aggregateByKey() statement correctly ...
   (Within-Partition Reduction Step)
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a SCALAR that holds the next Value

  (Cross-Partition Reduction Step)
  a: is a TUPLE that holds: (runningSum, runningCount).
  b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).

Under that interpretation, I tried to write  run the Python equivalent, like 
so:
  rdd1.aggregateByKey(0, lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: 
(a[0] + b[0], a[1] + b[1]))

Sadly, it didn't work, yielding the following exception which indicates that 
the indexing above is incorrect:
lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)
File input, line 1, in lambda
TypeError: 'int' object has no attribute '__getitem__'

Sidenote: Surprisingly, there isn't much documentation -- at least not for 
Python -- for this useful aggregateByKey()
method and use case; although I will be sure to write a gist today, once I get 
this working. :)

I think I'm nearly there though, so...
(1) Is my written interpretation above about of what (a,b) are correct?
(2) If yes, what then, is getting passed in the Python case?

I guess I'm looking for the Python equivalent to the first statement in 
Silvio's answer (below). But my
reasoning to deconstruct and reconstruct is missing something.

Thanks again!




On 04/28/2015 11:26 AM, Silvio Fiorito wrote:
If you need to keep the keys, you can use aggregateByKey to calculate an avg of 
the values:

val step1 = data.aggregateByKey((0.0, 0))((a, b) = (a._1 + b, a._2 + 1), (a, 
b) = (a._1 + b._1, a._2 + b._2))
val avgByKey = step1.mapValues(i = i._1/i._2)

Essentially, what this is doing is passing an initializer for sum and count, 
then summing each pair of values and counting the number of values. The last 
argument is to combine the results of each partition, if the data was spread 
across partitions. The result is a tuple of sum and count for each key.

Use mapValues to keep your partitioning by keys intact and minimize a full 
shuffle for downstream keyed operations. It just calculates the avg for each 
key.

From: Todd Nist
Date: Tuesday, April 28, 2015 at 10:20 AM
To: subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

Can you simply apply the 
https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.util.StatCounter
 to this?  You should be able to do something like this:

val stats = RDD.map(x = x._2).stats()

-Todd

On Tue, Apr 28, 2015 at 10:00 AM, 
subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io 
subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io wrote:
Hello Friends:

I generated a Pair RDD with K/V pairs, like so:


 rdd1.take(10) # Show a small sample.
 [(u'2013-10-09', 7.60117302052786),
 (u'2013-10-10', 9.322709163346612),
 (u'2013-10-10', 28.264462809917358),
 (u'2013-10-07', 9.664429530201343),
 (u'2013-10-07', 12.461538461538463),
 (u'2013-10-09', 20.76923076923077),
 (u'2013-10-08', 11.842105263157894),
 (u'2013-10-13', 32.32514177693762),
 (u'2013-10-13', 26.246),
 (u'2013-10-13', 10.693069306930692)]

Now from the above RDD, I would like to calculate an average of the VALUES for 
each KEY.
I can do so as shown here, which does work:

 countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of 
 countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
 rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerator (i.e. the 
 SUM).
 rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide 
 each SUM by it's denominator (i.e. COUNT)
 print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

But I wonder if the above semantics/approach is the optimal one; or whether 
perhaps there is a single API call
that handles common use case.

Improvement thoughts welcome. =:)

Thank you,
nmv


--
Sincerely yours,
Team PRISMALYTICS

PRISMALYTICS, LLC.http://www.prismalytics.com/| 
www.prismalytics.comhttp://www.prismalytics.com/
P: 212.882.1276tel:212.882.1276 |  
subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io
[Follow Us:  

Code For Loading Graph from Edge Tuple File

2015-04-28 Thread geek2
Hi Everyone,

Does anyone have example code for generating a graph from a file of edge
name-edge name tuples? I've seen the example where a Graph is generated from
an RDD of triplets composed of edge longs, but I'd like to see an example
where a graph is built from a edge name-edge -name file such as main
street,first street\\n US Interstate, first street\\n, etc...

Thanks

--John





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Code-For-Loading-Graph-from-Edge-Tuple-File-tp22693.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

2015-04-28 Thread subscripti...@prismalytics.io

Thank you Todd, Silvio...

I had to stare at Silvio's answer for a while.

_If I'm interpreting the aggregateByKey() statement__correctly ...
_   (Within-Partition Reduction Step)
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a SCALAR that holds the next Value

  (Cross-Partition Reduction Step)
  a: is a TUPLE that holds: (runningSum, runningCount).
  b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).

Under that interpretation, I tried to write  run the Python equivalent, 
like so:
  rdd1.aggregateByKey(0, lambda a,b: (a[0] + b, a[1] + 1), lambda 
a,b: (a[0] + b[0], a[1] + b[1]))


Sadly, it didn't work, yielding the following exception which indicates 
that the indexing above is incorrect:

lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)
File input, line 1, in lambda
TypeError: 'int' object has no attribute '__getitem__'

/Sidenote: Surprisingly, there isn't much documentation -- at least not 
for Python -- for this useful aggregateByKey()//
//method and use case; although I will be sure to write a //g//ist 
today, once I get this working. :)/

_
__I think I'm nearly there though, so..._
(1) Is my written interpretation above about of what (a,b) are correct?
(2) If yes, what then, is getting passed in the Python case?

I guess I'm looking for the Python equivalent to the first statement in 
Silvio's answer (below). But my

reasoning to deconstruct and reconstruct is missing something.

Thanks again!




On 04/28/2015 11:26 AM, Silvio Fiorito wrote:
If you need to keep the keys, you can use aggregateByKey to calculate 
an avg of the values:


val step1 = data.aggregateByKey((0.0, 0))((a, b) = (a._1 + b, a._2 + 
1), (a, b) = (a._1 + b._1, a._2 + b._2))

val avgByKey = step1.mapValues(i = i._1/i._2)

Essentially, what this is doing is passing an initializer for sum and 
count, then summing each pair of values and counting the number of 
values. The last argument is to combine the results of each partition, 
if the data was spread across partitions. The result is a tuple of sum 
and count for each key.


Use mapValues to keep your partitioning by keys intact and minimize a 
full shuffle for downstream keyed operations. It just calculates the 
avg for each key.


From: Todd Nist
Date: Tuesday, April 28, 2015 at 10:20 AM
To: subscripti...@prismalytics.io mailto:subscripti...@prismalytics.io
Cc: user@spark.apache.org mailto:user@spark.apache.org
Subject: Re: Calculating the averages for each KEY in a Pairwise (K,V) 
RDD ...


Can you simply apply the 
https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.util.StatCounter 
to this?  You should be able to do something like this:


val stats = RDD.map(x = x._2).stats()

-Todd

On Tue, Apr 28, 2015 at 10:00 AM, subscripti...@prismalytics.io 
mailto:subscripti...@prismalytics.io subscripti...@prismalytics.io 
mailto:subscripti...@prismalytics.io wrote:


Hello Friends:

I generated a Pair RDD with K/V pairs, like so:


 rdd1.take(10) # Show a small sample.
 [(u'2013-10-09', 7.60117302052786),
 (u'2013-10-10', 9.322709163346612),
 (u'2013-10-10', 28.264462809917358),
 (u'2013-10-07', 9.664429530201343),
 (u'2013-10-07', 12.461538461538463),
 (u'2013-10-09', 20.76923076923077),
 (u'2013-10-08', 11.842105263157894),
 (u'2013-10-13', 32.32514177693762),
 (u'2013-10-13', 26.246),
 (u'2013-10-13', 10.693069306930692)]

Now from the above RDD, I would like to calculate an average of
the VALUES for each KEY.
I can do so as shown here, which does work:
*
***countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE
OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08':
69, ... snip ...}
 rdd1 = rdd1.reduceByKey(operator.add) # Calculate the
numerator (i.e. the SUM).
 rdd1 = rdd1.map(lambda x: (x[0],
x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's
denominator (i.e. COUNT)
 print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

But I wonder if the above semantics/approach is the optimal one;
or whether perhaps there is a single API call
that handles common use case.

Improvement thoughts welcome. =:)

Thank you,
nmv




--
PRISMALYTICS Sincerely yours,
Team PRISMALYTICS

PRISMALYTICS, LLC. http://www.prismalytics.com/ | www.prismalytics.com 
http://www.prismalytics.com/
P: 212.882.1276 tel:212.882.1276 | subscripti...@prismalytics.io 
mailto:subscripti...@prismalytics.io
Follow Us: https://www.LinkedIn.com/company/prismalytics 
https://www.linkedin.com/company/prismalytics


Prismalytics, LLC. http://www.prismalytics.com/
data analytics to literally count on


How to run customized Spark on EC2?

2015-04-28 Thread Bo Fu
Hi experts,

I have an issue. I added some timestamps in Spark source code and built it 
using:

mvn package -DskipTests

I checked the new version in my own computer and it works. However, when I ran 
spark on EC2, the spark code EC2 machines ran is the original version.

Anyone knows how to deploy the changed spark source code into EC2?
Thx a lot


Bo Fu
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to setup this false streaming problem

2015-04-28 Thread Toni Cebrián

Hi,

  Just new to Spark and in need of some help for framing the problem I 
have. A problem well stated is half solved it's the saying :)


  Let's say that I have a DStream[String] basically containing Json of 
some measurements from IoT devices. In order to keep it simple say that 
after unmarshalling I have data like:


   case class Measurement(val deviceId:Long, val timestamp:Date, val 
measurement:Double)


I need to use DStreams because there is some interest in monitoring 
real-time the measurements of devices. So let's say that I have a 
dashboard with hourly granularity, past hours are consolidated but 
current hour must be kept updated on every input.


The problem is that the Time that matters is the timestamp in the Json 
not the receiving timestamp by Spark so I think that I have to keep a 
stateful DStream like the one described 
http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ 
. I have two questions here:


1. Once a given hour is gone, I'd like to flush the consolidated stream
   into a DB. I think the strategy is to have a Stream with key-values
   where the key is (userID, truncateByHour(timestamp)) and reducing
   over the values. But it seems to me that with this approach Spark
   has lost any sense of time, how would you flush all the RDDs with
   timestamps between 00:00:00 and 00:59:59 for instance? Maybe I'm
   missing some function in the API
2. How do you deal with events that come with timestamps in the past,
   is it a matter of ignoring them, finding a trade-off between memory
   and how long the stateful DStream is? But then, who is the one
   poping the mature time slices from the stateful stream.

For me Spark Streaming would be the most natural way to face this 
problem, but maybe a simple Spark processing run every minute could keep 
easily the sorting by time of external events.


I'd like to hear your thoughts.
Toni


MLLib SVMWithSGD is failing for large dataset

2015-04-28 Thread sarath

I am trying to train a large dataset consisting of 8 million data points and
20 million features using SVMWithSGD. But it is failing after running for
some time. I tried increasing num-partitions, driver-memory,
executor-memory, driver-max-resultSize. Also I tried by reducing the size of
dataset from 8 million to 25K (keeping number of features same 20 M). But
after using the entire 64GB driver memory for 20 to 30 min it failed.

I'm using a cluster of 8 nodes (each with 8 cores and 64G RAM).
executor-memory - 60G
driver-memory - 60G
num-executors - 64
And other default settings

This is the error log :

15/04/20 11:51:09 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/04/20 11:51:29 WARN BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeSystemBLAS
15/04/20 11:51:29 WARN BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeRefBLAS
15/04/20 11:56:11 WARN TransportChannelHandler: Exception in connection from
xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
...
15/04/20 11:56:11 ERROR TransportResponseHandler: Still have 7 requests
outstanding when connection from xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 is
closed
15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting block
fetches
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
...
15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting block
fetches
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
...
15/04/20 11:56:12 ERROR RetryingBlockFetcher: Exception while beginning
fetch of 1 outstanding blocks 
java.io.IOException: Failed to connect to
xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:290)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at 
org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:91)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused:
xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at

Spark 1.3.1 JavaStreamingContext - fileStream compile error

2015-04-28 Thread lokeshkumar

Hi Forum 

I am facing below compile error when using the fileStream method of the
JavaStreamingContext class. 
I have copied the code from JavaAPISuite.java test class of spark test code. 

The error message is 


 
The method fileStream(String, ClassK, ClassV, ClassF,
FunctionPath,Boolean, boolean) in the type JavaStreamingContext is not
applicable for the arguments (String, ClassLongWritable, ClassText,
ClassTextInputFormat, new FunctionPath,Boolean(){}, boolean) 

 

Please help me to find a solution for this. 

http://apache-spark-user-list.1001560.n3.nabble.com/file/n22683/47.png 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-JavaStreamingContext-fileStream-compile-error-tp22683.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming: JavaDStream compute method NPE

2015-04-28 Thread Himanshu Mehra
Hi Puneith,

Please provide the code if you may. It will be helpful.

Thank you,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-JavaDStream-compute-method-NPE-tp22676p22684.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JAVA_HOME problem

2015-04-28 Thread sourabh chaki
I was able to solve this problem hard coding the JAVA_HOME inside
org.apache.spark.deploy.yarn.Client.scala class.




*val commands = prefixEnv ++ Seq(--
YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) +
/bin/java, -server++ /usr/java/jdk1.7.0_51/bin/java, -server)*

Somehow {{JAVA_HOME}}  was not getting resolved in the node of yarn
container. This change has fixed the problem. Now I am getting a new
error.

*Container: container_1430123808466_36297_02_01
===
LogType: stderr
LogLength: 87
Log Contents:
Error: Could not find or load main class
org.apache.spark.deploy.yarn.ExecutorLauncher

LogType: stdout
LogLength: 0
Log Contents:*

Looks like now classpath variables are not resolved in yarn node. I
have mapreduce jobs running in the same cluster  working without any
problem. Any pointer why this could happen?


Thanks

Sourabh


On Fri, Apr 24, 2015 at 3:52 PM, sourabh chaki chaki.sour...@gmail.com
wrote:

 Yes Akhil. This is the same issue. I have updated my comment in that
 ticket.

 Thanks
 Sourabh

 On Fri, Apr 24, 2015 at 12:02 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Isn't this related to this
 https://issues.apache.org/jira/browse/SPARK-6681

 Thanks
 Best Regards

 On Fri, Apr 24, 2015 at 11:40 AM, sourabh chaki chaki.sour...@gmail.com
 wrote:

 I am also facing the same problem with spark 1.3.0 and yarn-client and
 yarn-cluster mode. Launching yarn container failed and this is the error in
 stderr:

 Container: container_1429709079342_65869_01_01

 ===
 LogType: stderr
 LogLength: 61
 Log Contents:
 /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory

 LogType: stdout
 LogLength: 0
 Log Contents:

 I have added JAVA_HOME in hadoop-env.sh as well spark-env.sh
 grep JAVA_HOME /etc/hadoop/conf.cloudera.yarn/hadoop-env.sh
 export JAVA_HOME=/usr/java/default
 export PATH=$PATH:$JAVA_HOME/bin/java

 grep JAVA_HOME /var/spark/spark-1.3.0-bin-hadoop2.4/conf/spark-env.sh
 export JAVA_HOME=/usr/java/default

 I could see another thread for the same problem but I dont see any
 solution.

 http://stackoverflow.com/questions/29170280/java-home-error-with-upgrade-to-spark-1-3-0
  Any pointer will be helpful.

 Thanks
 Sourabh


 On Thu, Apr 2, 2015 at 1:23 PM, 董帅阳 917361...@qq.com wrote:

 spark 1.3.0


 spark@pc-zjqdyyn1:~ tail /etc/profile
 export JAVA_HOME=/usr/jdk64/jdk1.7.0_45
 export PATH=$PATH:$JAVA_HOME/bin

 #
 # End of /etc/profile
 #‍


 But ERROR LOG

 Container: container_1427449644855_0092_02_01 on pc-zjqdyy04_45454
 
 LogType: stderr
 LogLength: 61
 Log Contents:
 /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory

 LogType: stdout
 LogLength: 0
 Log Contents:‍







Re: Spark 1.3.1 JavaStreamingContext - fileStream compile error

2015-04-28 Thread Akhil Das
How about:

JavaPairDStreamLongWritable, Text input =
  jssc.fileStream(inputDirectory, LongWritable.class, Text.class,
TextInputFormat.class);

See the complete example over here
https://github.com/databricks/learning-spark/blob/98c4d57f3a611269823c53d9249ab4fbb3065a0c/src/main/java/com/oreilly/learningsparkexamples/java/logs/ReadTransferStats.java

Thanks
Best Regards

On Tue, Apr 28, 2015 at 11:32 AM, lokeshkumar lok...@dataken.net wrote:


 Hi Forum

 I am facing below compile error when using the fileStream method of the
 JavaStreamingContext class.
 I have copied the code from JavaAPISuite.java test class of spark test
 code.

 The error message is


 
 The method fileStream(String, ClassK, ClassV, ClassF,
 FunctionPath,Boolean, boolean) in the type JavaStreamingContext is not
 applicable for the arguments (String, ClassLongWritable, ClassText,
 ClassTextInputFormat, new FunctionPath,Boolean(){}, boolean)

 

 Please help me to find a solution for this.

 http://apache-spark-user-list.1001560.n3.nabble.com/file/n22683/47.png



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-JavaStreamingContext-fileStream-compile-error-tp22683.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to debug Spark on Yarn?

2015-04-28 Thread Steve Loughran

On 27 Apr 2015, at 07:51, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:

Spark 1.3

1. View stderr/stdout from executor from Web UI: when the job is running i 
figured out the executor that am suppose to see, and those two links show 4 
special characters on browser.

2. Tail on Yarn logs:


/apache/hadoop/bin/yarn logs -applicationId  application_1429087638744_151059 | 
less

Threw me: Application has not completed. Logs are only available after an 
application completes


Any other ideas that i can try ?


There's some stuff on log streaming of running Apps on Hadoop 2.6+ which can 
stream logs of running apps to HDFS. I don't know if spark supports that (I 
haven't tested it) so won't give the details right now.

You can go from the RM to the node managers running the containers, and view 
the logs that way.


From some other notes of mine:




One configuration to aid debugging is tell the nodemanagers to keep data for a 
short period after containers finish

!-- 10 minutes after a failure to see what is left in the directory--
property
  nameyarn.nodemanager.delete.debug-delay-sec/name
  value600/value
/property


You can then retrieve logs by either the web UI, or by connecting to the server 
(usually by ssh) and retrieve the logs from the log directory

We also recommend making sure that YARN kills processes

!--time before the process gets a -9 --
property
  nameyarn.nodemanager.sleep-delay-before-sigkill.ms/name
  value3/value
/property




Re: java.lang.UnsupportedOperationException: empty collection

2015-04-28 Thread Robineast
I've tried running your code through spark-shell on both 1.3.0 (pre-built for
Hadoop 2.4 and above) and a recently built snapshot of master. Both work
fine. Running on OS X yosemite. What's your configuration?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-UnsupportedOperationException-empty-collection-tp22677p22686.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Serialization error

2015-04-28 Thread madhvi

On Tuesday 28 April 2015 01:39 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote:


val conf = new SparkConf()

  .setAppName(detail)

  .set(spark.serializer, 
org.apache.spark.serializer.KryoSerializer)


  .set(spark.kryoserializer.buffer.mb, 
arguments.get(buffersize).get)


  .set(spark.kryoserializer.buffer.max.mb, 
arguments.get(maxbuffersize).get)


  .set(spark.driver.maxResultSize, 
arguments.get(maxResultSize).get)


.registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key]))


Can you try this ?


On Tue, Apr 28, 2015 at 11:11 AM, madhvi madhvi.gu...@orkash.com 
mailto:madhvi.gu...@orkash.com wrote:


Hi,

While connecting to accumulo through spark by making sparkRDD I am
getting the following error:
 object not serializable (class: org.apache.accumulo.core.data.Key)

This is due to the 'key' class of accumulo which does not
implement serializable interface.How it can be solved and accumulo
can be used with spark

Thanks
Madhvi

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org




--
Deepak


Hi Deepak,

The snippet you proveide is of scala but I am working on java.I am tryng 
the same thing in java but please can you specify in detail what are the 
parameters you mentioned in that such as 'arguements'.


Thanks
Madhvi


Re: Understanding Spark's caching

2015-04-28 Thread Akhil Das
Option B would be fine, as in the SO itself the answer says, Since RDD
transformations merely build DAG descriptions without execution, in Option
A by the time you call unpersist, you still only have job descriptions and
not a running execution.

Also note, In Option A, you are not specifying any action anywhere.

Thanks
Best Regards

On Tue, Apr 28, 2015 at 12:58 AM, Eran Medan eran.me...@gmail.com wrote:

 Hi Everyone!

 I'm trying to understand how Spark's cache work.

 Here is my naive understanding, please let me know if I'm missing
 something:

 val rdd1 = sc.textFile(some data)
 rdd.cache() //marks rdd as cached
 val rdd2 = rdd1.filter(...)
 val rdd3 = rdd1.map(...)
 rdd2.saveAsTextFile(...)
 rdd3.saveAsTextFile(...)

 In the above, rdd1 will be loaded from disk (e.g. HDFS) only once. (when
 rdd2 is saved I assume) and then from cache (assuming there is enough RAM)
 when rdd3 is saved)

 Now here is my question. Let's say I want to cache rdd2 and rdd3 as they
 will both be used later on, but I don't need rdd1 after creating them.

 Basically there is duplication, isn't it? Since once rdd2 and rdd3 are
 calculated, I don't need rdd1 anymore, I should probably unpersist it,
 right? the question is when?

 *Will this work? (Option A)*

 val rdd1 = sc.textFile(some data)
 rdd.cache() //marks rdd as cached
 val rdd2 = rdd1.filter(...)
 val rdd3 = rdd1.map(...)
 rdd2.cache()
 rdd3.cache()
 rdd1.unpersist()

 Does spark add the unpersist call to the DAG? or is it done immediately?
 if it's done immediately, then basically rdd1 will be non cached when I
 read from rdd2 and rdd3, right?

 *Should I do it this way instead (Option B)?*

 val rdd1 = sc.textFile(some data)
 rdd.cache() //marks rdd as cached
 val rdd2 = rdd1.filter(...)
 val rdd3 = rdd1.map(...)

 rdd2.cache()
 rdd3.cache()

 rdd2.saveAsTextFile(...)
 rdd3.saveAsTextFile(...)

 rdd1.unpersist()

 *So the question is this:* Is Option A good enough? e.g. will rdd1 be
 still accessing the file only once? Or do I need to go with Option B?

 (see also
 http://stackoverflow.com/questions/29903675/understanding-sparks-caching)

 Thanks in advance



How to add jars to standalone pyspark program

2015-04-28 Thread mj
Hi,

I'm trying to figure out how to use a third party jar inside a python
program which I'm running via PyCharm in order to debug it. I am normally
able to run spark code in python such as this:

spark_conf = SparkConf().setMaster('local').setAppName('test')
sc = SparkContext(conf=spark_conf)
cars = sc.textFile('c:/cars.csv')
print cars.count()
sc.stop()

The code I'm trying to run is below - it uses the databricks spark csv jar.
I can get it working fine in pyspark shell using the packages argument, but
I can't figure out how to get it to work via PyCharm.

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext

spark_conf = SparkConf().setMaster('local').setAppName('test')
sc = SparkContext(conf=spark_conf)

sqlContext = SQLContext(sc)
df = sqlContext.load(source=com.databricks.spark.csv, header=true, path
= c:/cars.csv, delimiter='\t')
df.select(year)

The error message I'm getting is:
py4j.protocol.Py4JJavaError: An error occurred while calling o20.load.
: java.lang.RuntimeException: Failed to load class for data source:
com.databricks.spark.csv
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:194)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:205)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:685)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)


I presume I need to set the spark classpath somehow but I'm not sure of the
right way to do it. Any advice/guidance would be appreciated.

Thanks,

Mark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jars-to-standalone-pyspark-program-tp22685.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org